java – Hadoop Map Reduce For Google web graph
We have given the task of creating a map reduce function as a task. This function will output the nodes you can start from node n in 3 hops for each node n in the Google Web graph list (actual data can be found here: http://snap.stanford.edu/data/web-Google.html )
1 2 1 3 2 4 3 4 3 5 4 1 4 5 4 6 5 6
From the example above, the figure will be like this
In the simplified example above, for example, the path of node 1 is α [1 – > 2 – > 4 – > 1],[1 – > 2 – > 4 – > 5],[1 – > 2 – > 4 – > 6],[1 – > 3 – > 4 – > 1], [1 – > 3 – > 4 – > 5],[1 – > 3 – > 4 – > 6] και [1 – > 3 – > 5 – > 6] therefore, map reduction will output vertices 1,5,6 for node 1 ((a) each vertex can only be calculated once, and (b) we include the current vertex only when there is a circular path with length 3, such as examples [1 – > 2 – > 4 – > 1] and [1 – > 3 – > 4 – > 1]
I'm disappointed because I believe it requires knowledge of graph theory and algorithms, and we haven't been taught anything related to it
I would appreciate it if someone could give me the right direction to start I've studied the shortest path theory, but I'm not sure if it will be useful for this particular exercise
Thank you first and have a nice holiday
edit
I try to create an adjustment list, but I want to output it as "vertex ID" "node1 node2 node3 node4..." I see that in the output file, my reducer splits the list of each vertex ID into three pairs
For example, if I have a vertex a connected to Z, x, C, V, B, N, m, G, h, J, K, l, I output it as
Z,C
A V,N
A M,H
A J,L
Here are my mapper and reducer
public class AdjacentsListDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = Job.getInstance(conf); job.setJobName("Test driver"); job.setJarByClass(AdjacentsListDriver.class); String[] arg0 = new GenericOptionsParser(conf,args).getRemainingArgs(); if (arg0.length != 2) { System.err.println("Usage: hadoop jar <my_jar> <input_dir> <output_dir>"); System.exit(1); } Path in = new Path(arg0[0]); Path out = new Path(arg0[1]); FileInputFormat.setInputPaths(job,in); FileOutputFormat.setOutputPath(job,out); job.setMapperClass(ListMapper.class); job.setReducerClass(ListReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(),new AdjacentsListDriver(),args); System.exit(res); } } /** * @author George * Theoretically this takes a key(vertexID) and maps all nodes that are connected to it in one hop.... * */ public class ListMapper extends Mapper<LongWritable,Text,Text> { private Text vertexID = new Text(); //private LongWritable vertice= new LongWritable(0); private Text vertice=new Text(); public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line,"\n"); StringTokenizer itrInside; //vertice=new LongWritable(Long.valueOf(value.toString()).longValue()); while (itr.hasMoreTokens()) { if(itr.countTokens() > 2){ }//ignore first line ?? else{ itrInside=new StringTokenizer(itr.toString()); vertexID.set(itr.nextToken()); while(itrInside.hasMoreTokens()){ vertice.set(itrInside.nextToken()); context.write(vertexID,value); } } } } } @override public class ListReducer extends Reducer<Text,Text>{ public void reduce(Text key,Iterable<Text> values,InterruptedException { String vertices=""; for (Text value : values) { if(vertices=="") vertices+=value.toString(); else vertices=vertices+","+value.toString(); } Text value=new Text(); value.set(vertices); context.write(key,value); } }
Solution
Since this is your (homework) task, I will not include a Java / Hadoop solution, but I will try to make MapReduce's graphical computing concept clearer so that you can implement it yourself
For each vertex, you want exactly the N - hop vertex When viewing the shortest path algorithm, you are on the right path, but it can be solved more easily through simple breadth first search
However, when using MapReduce to process graphics, you need to have a deeper understanding of message passing between vertices Graph algorithms are commonly represented by multiple jobs, in which the map and reduce stages have the following assignments:
>Mapper: send messages to another vertex (usually used for each neighbor of a vertex) > reducer: Group incoming messages, add them to the core graph and reduce them, and sometimes send more messages
Each job will always follow the output of the previous job until you reach the result or abandon it
Data preparation
Before you really want to run the graph algorithm, make sure your data is in the form of adjacency list It will make the following iterative algorithms easier to implement
Therefore, you need to group them by vertex ID, not adjacent tuples These are some pseudo code:
map input tuple (X,Y): emit (X,Y) reduce input (X,Y[]) : emit (X,Y[])
Basically, you just group by vertex ID, so your input data is a key of its neighbor (vertex ID) (a list of vertex IDs that can be reached from that specific key vertex ID) If you want to save resources, you can use reducer as a combiner
algorithm
As I mentioned, you only need a breadth first search algorithm You will perform a breadth first search algorithm for each vertex in the graph. When you hit a neighbor, you will only increase a hop counter, which tells us the distance from the overhead point (this is the simplest case of the shortest path algorithm, that is, when the edge weight is 1)
Let me show you a simple picture and describe it with a simple chart Orange means visit, blue means no visit, and green means our result In parentheses is the hop counter
You see, in each iteration, we set a hopcounter for each vertex If we click on a new vertex, we will only add one If we reach the nth vertex, we will mark it for future search in some way
Distribution using MapReduce
Although breadth first search for each vertex seems wasteful, we can do better by parallelization Messages are delivered to the game Like the above picture, each vertex we get in the mapping step will initially send a message to the neighbor containing the following payload:
HopMessage: Origin (VertexID) | HopCounter(Integer)
In the first iteration, we will try to send a message to the neighbor to start the calculation Otherwise, we will only proxy charts or incoming messages
Therefore, in the first job after data preparation, map and reduce look like this:
map input (VertexID key,either HopMessage or List<VertexID> adjacents): if(iterationNumber == 1): // only in the first iteration to kick off for neighbour in adjacents: emit (neighbour,new HopMessage(key,0)) emit (key,adjacents or HopMessage) // for joining in the reducer
Reducer now makes a simple connection between graphics and messages, mainly to obtain the neighbors of vertices, resulting in input (using my simple graphics):
1 2 // graph 2 1 // hop message 2 3 // graph 3 1 // hop message 3 4 // graph 4 1 // hop message 4 - // graph
In the reducer step, we will forward the message to the neighbor again, and check whether the hop counter has reached 3.5 after increasing
reducer input(VertexID key,List<either HopMessage or List<VertexID> neighbours> values): for hopMessage in values: hopMessage.counter += 1 if (hopMessage.counter == 3) emit to some external resource (HopMessage.origin,key) else for neighbour in neighbours of key: emit (neighbour,hopMessage) emit (key,neighbours)
As you can see, it can become very confusing: you need to manage two different types of messages, and then write some external resources that will track vertices with exactly 3 hops
As long as there are hopmessages to send, you can schedule iterative jobs This can easily lead to the problem of loops in the graph, because in this case, you will increase hopcounter infinitely So I suggest sending the complete traversal path with each message so far (very wasteful) or just limiting the number of iterations to be performed In the case of n = 3, no more than 3 job iterations are required
There are many blogs and projects that can provide you with examples of how to deal with every problem in Hadoop At least I've written about graphic operations in MapReduce in my blog. You can find some examples on my GitHub
Clean up output data
Finally, you will have a pile of files containing vertices – > vertex mapping You can reduce them as in preparation
Using the niftier method of pregel
A less cumbersome way to deal with graphics is to use the pregel method to express graphics calculation Pregel is using a vertex centered model to make it easier to express this breadth first calculation
The following is a simple example of the above algorithm using Apache HAMA:
public static class NthHopVertex extends Vertex<Text,NullWritable,HopMessage> { @Override public void compute(Iterable<HopMessage> messages) throws IOException { if (getSuperstepCount() == 0L) { HopMessage msg = new HopMessage(); msg.origin = getVertexID().toString(); msg.hopCounter = 0; sendMessageToNeighbors(msg); } else { for (HopMessage message : messages) { message.hopCounter += 1; if (message.hopCounter == 3) { getPeer().write(new Text(message.origin),getVertexID()); VoteToHalt(); } else { sendMessageToNeighbors(message); } } } } }
The new graph created by btw in your example is as follows:
1=[1,6] 2=[2,3,6] 3=[2,6] 4=[4,5]
The following is the complete HAMA graph implementation:
https://github.com/thomasjungblut/tjungblut-graph/blob/master/src/de/jungblut/graph/bsp/NthHop.java