MentaBlog

Simplicity is everything

Intro to Parallel Processing with MapReduce

 |  0 comments

When I first tried to learn about MapReduce I found it difficult to grasp the basic concepts so I decided to write a simple example that demonstrates its benefits in practice, the most important one being distributed computing or parallel processing. In this article I describe a simple problem and proceed to solve it with and without MapReduce. Then to finalize I show how MapReduce makes it straightforward to distribute the work in a cluster. (Note: If you find something in this article that it is not clear or can be made better, please leave a comment)

Ad: Coral Blocks: cutting-edge, ultra-low-latency, zero garbage Java components: CoralReactor, CoralLog, CoralQueue, CoralThreads and CoralBits.

The Problem

I have a set of text files with some contents that I want to index by word so I can return all files that contain a given word together with the number of occurrences of that word without any sorting. If I show the given INPUT and the expected OUTPUT it will probably be easier to understand:

Below are my text files and their contents, which to keep it simple are just a bunch of words:

file1.txt => "foo foo bar cat dog dog"
file2.txt => "foo house cat cat dog"
file3.txt => "foo foo foo bird"

The final result is a hash map indexing each word to the files it is present, with an additional occurrence counter for each file:

"foo" => { "file1.txt" => 2, "file3.txt" => 3, "file2.txt" => 1 }
"bar" => { "file1.txt" => 1 }
"cat" => { "file2.txt" => 2, "file1.txt" => 1 }
"dog" => { "file2.txt" => 1, "file1.txt" => 2 }
"house" => { "file2.txt" => 1 }
"bird" => { "file3.txt" => 1 }

With the hash map above it becomes trivial to quickly search the files by word.

Approach #1 – Without MapReduce

Map<String, String> input = new HashMap<String, String>();
input.put("file1.txt", "foo foo bar cat dog dog");
input.put("file2.txt", "foo house cat cat dog");
input.put("file3.txt", "foo foo foo bird");

Map<String, Map<String, Integer>> output = new HashMap<String, Map<String, Integer>>();

Iterator<Map.Entry<String, String>> inputIter = input.entrySet().iterator();
while(inputIter.hasNext()) {
	Map.Entry<String, String> entry = inputIter.next();
	String file = entry.getKey();
	String contents = entry.getValue();

	String[] words = contents.trim().split("\\s+");

	for(String word : words) {

		Map<String, Integer> files = output.get(word);
		if (files == null) {
			files = new HashMap<String, Integer>();
			output.put(word, files);
		}

		Integer occurrences = files.remove(file);
		if (occurrences == null) {
			files.put(file, 1);
		} else {
			files.put(file, occurrences.intValue() + 1);
		}
	}
}

As you can see we are just counting the words one by one and placing the results in a hash map, which is actually a map of maps.

Approach #2 – With MapReduce

So MapReduce takes the problem above and breaks it down in two independent phases: the map phase and the reduce phase. In practice there is a third pre-reduce phase called grouping, but the only phases that get distributed in the cluster are the map and the reduce phases.

Map

Map<String, Map<String, Integer>> output = new HashMap<String, Map<String, Integer>>();

List<MappedItem> mappedItems = new LinkedList<MappedItem>();

Iterator<Map.Entry<String, String>> inputIter = input.entrySet().iterator();
while(inputIter.hasNext()) {
	Map.Entry<String, String> entry = inputIter.next();
	String file = entry.getKey();
	String contents = entry.getValue();

	map(file, contents, mappedItems);
}

public static void map(String file, String contents, List<MappedItem> mappedItems) {
	String[] words = contents.trim().split("\\s+");
	for(String word: words) {
		mappedItems.add(new MappedItem(word, file));
	}
}

private static class MappedItem { 
	
	private final String word;
	private final String file;
	
	public MappedItem(String word, String file) {
		this.word = word;
		this.file = file;
	}

	public String getWord() {
		return word;
	}

	public String getFile() {
		return file;
	}
}

It is important to understand that the Map phase returns a list of key/value pairs. In our example the key is a word and the value is the file where this word was found. It is also important to notice that the list will have duplicates. For example the item ["foo", "file3.txt"] appears three times in the list because the word “foo” appears in the file three times. Below is what you should expect for the mapping phase output:

[["foo","file2.txt"], ["house","file2.txt"], ["cat","file2.txt"], ["cat","file2.txt"],
["dog","file2.txt"], ["foo","file1.txt"], ["foo","file1.txt"], ["bar","file1.txt"],
["cat","file1.txt"], ["dog","file1.txt"], ["dog","file1.txt"], ["foo","file3.txt"], 
["foo","file3.txt"], ["foo","file3.txt"], ["bird","file3.txt"]]

Group

The intermediate phase of MapReduce is the grouping phase where the map results are grouped and prepared for the reduce phase.

Map<String, List<String>> groupedItems = new HashMap<String, List<String>>();

Iterator<MappedItem> mappedIter = mappedItems.iterator();
while(mappedIter.hasNext()) {
	MappedItem item = mappedIter.next();
	String word = item.getWord();
	String file = item.getFile();
	List<String> list = groupedItems.get(word);
	if (list == null) {
		list = new LinkedList<String>();
		groupedItems.put(word, list);
	}
	list.add(file);
}

The output of the grouping phase is the output of the mapping phase without the duplicates, in other words, the list produced by the mapping phase becomes a map as you can see below:

{bird=[file3.txt], cat=[file2.txt, file2.txt, file1.txt], 
foo=[file2.txt, file1.txt, file1.txt, file3.txt, file3.txt, file3.txt], 
house=[file2.txt], bar=[file1.txt], dog=[file2.txt, file1.txt, file1.txt]}

Reduce

In the final reduce phase, the map entries produced by the grouping phase are reduced to the output we are looking for with the code below:

Iterator<Map.Entry<String, List<String>>> groupedIter 
					= groupedItems.entrySet().iterator();
while(groupedIter.hasNext()) {
	Map.Entry<String, List<String>> entry = groupedIter.next();
	String word = entry.getKey();
	List<String> list = entry.getValue();
	
	reduce(word, list, output);
}

public static void reduce(String word, List<String> list, 
					Map<String, Map<String, Integer>> output) {
	Map<String, Integer> reducedList = new HashMap<String, Integer>();
	for(String file: list) {
		Integer occurrences = reducedList.get(file);
		if (occurrences == null) {
			reducedList.put(file, 1);
		} else {
			reducedList.put(file, occurrences.intValue() + 1);
		}
	}
	output.put(word, reducedList);
}

The final output:

{bird={file3.txt=1}, cat={file2.txt=2, file1.txt=1}, 
foo={file2.txt=1, file1.txt=2, file3.txt=3}, house={file2.txt=1}, 
dog={file2.txt=1, file1.txt=2}, bar={file1.txt=1}}

Approach #3 – MapReduce Cluster

If you take a look in our previous MapReduce solution, you will notice that the map and reduce work can be easily broken down in independent jobs and distributed across a cluster of machines that can perform the work in parallel. Let’s change our code to make this point clear and introduce two callback interfaces that will allow us to be notified by the cluster when the work is ready:

public static interface MapCallback<E, V> {
	
	public void mapDone(E key, List<V> values);
}

public static interface ReduceCallback<E, K, V> {
	
	public void reduceDone(E e, Map<K,V> results);
}

Modifying our map and reduce functions to use these callbacks, we have:

public static void map(String file, String contents, 
				MapCallback<String, MappedItem> callback) {
	String[] words = contents.trim().split("\\s+");
	List<MappedItem> results = new ArrayList<MappedItem>(words.length);
	for(String word: words) {
		results.add(new MappedItem(word, file));
	}
	callback.mapDone(file, results);
}

public static void reduce(String word, List<String> list, 
			ReduceCallback<String, String, Integer> callback) {
	
	Map<String, Integer> reducedList = new HashMap<String, Integer>();
	for(String file: list) {
		Integer occurrences = reducedList.get(file);
		if (occurrences == null) {
			reducedList.put(file, 1);
		} else {
			reducedList.put(file, occurrences.intValue() + 1);
		}
	}
	callback.reduceDone(word, reducedList);
}

To simulate a cluster of machines we can use threads to process the work independently and in parallel. The threads can then use the callback interfaces to notify when the work is done, as below:

// MAP:

final Map<String, Map<String, Integer>> output 
			= new HashMap<String, Map<String, Integer>>();

final List<MappedItem> mappedItems = new LinkedList<MappedItem>();

final MapCallback<String, MappedItem> mapCallback 
			= new MapCallback<String, MappedItem>() {
	@Override
    public synchronized void mapDone(String file, List<MappedItem> results) {
        mappedItems.addAll(results);
    }
};

List<Thread> mapCluster = new ArrayList<Thread>(input.size());

Iterator<Map.Entry<String, String>> inputIter = input.entrySet().iterator();
while(inputIter.hasNext()) {
	Map.Entry<String, String> entry = inputIter.next();
	final String file = entry.getKey();
	final String contents = entry.getValue();
	
	Thread t = new Thread(new Runnable() {
		@Override
        public void run() {
			map(file, contents, mapCallback);
        }
	});
	mapCluster.add(t);
	t.start();
}

// wait for mapping phase to be over:
for(Thread t : mapCluster) {
	try {
		t.join();
	} catch(InterruptedException e) {
		throw new RuntimeException(e);
	}
}
// REDUCE:

final ReduceCallback<String, String, Integer> reduceCallback
			 = new ReduceCallback<String, String, Integer>() {
	@Override
    public synchronized void reduceDone(String k, Map<String, Integer> v) {
        output.put(k, v);
    }
};

List<Thread> reduceCluster = new ArrayList<Thread>(groupedItems.size());

Iterator<Map.Entry<String, List<String>>> groupedIter
						= groupedItems.entrySet().iterator();
while(groupedIter.hasNext()) {
	Map.Entry<String, List<String>> entry = groupedIter.next();
	final String word = entry.getKey();
	final List<String> list = entry.getValue();
	
	Thread t = new Thread(new Runnable() {
		@Override
        public void run() {
			reduce(word, list, reduceCallback);
		}
	});
	reduceCluster.add(t);
	t.start();
}

// wait for reducing phase to be over:
for(Thread t : reduceCluster) {
	try {
		t.join();
	} catch(InterruptedException e) {
		throw new RuntimeException(e);
	}
}

Notice that each job sent to a thread has a unique identifier. For the map phase it is the file and for the reduce phase it is the word. It would not be hard to simulate a cluster node failure by timing out a thread that is taking too long and then re-send the job to another thread. That’s what frameworks like Hadoop and MongoDB do.

Note: You can see the source code for the three approaches described in this article here.

Conclusion

MapReduce breaks the process of crunching data in two steps: map and reduce. The map step needs to be completed before the reduce step, but each step can be broken down in small pieces that are executed in parallel. When you have a large data set, the ability to use a cluster and scale horizontally becomes crucial. Frameworks like Hadoop and MongoDB can manage the execution of a MapReduce operation in a cluster of computers with support to fault-tolerance. The complexity becomes hidden from the developer who only has to worry about implementing the map and reduce functions to transform the data set in any way he wants to.

Leave a Reply    (moderated so note that SPAM will not be approved!)