java – Hadoop Map Reduce For Google web graph

We have given the task of creating a map reduce function as a task. This function will output the nodes you can start from node n in 3 hops for each node n in the Google Web graph list (actual data can be found here: http://snap.stanford.edu/data/web-Google.html )

1 2 
1 3 
2 4 
3 4 
3 5 
4 1 
4 5 
4 6 
5 6

From the example above, the figure will be like this

In the simplified example above, for example, the path of node 1 is α [1 – > 2 – > 4 – > 1],[1 – > 2 – > 4 – > 5],[1 – > 2 – > 4 – > 6],[1 – > 3 – > 4 – > 1], [1 – > 3 – > 4 – > 5],[1 – > 3 – > 4 – > 6] και [1 – > 3 – > 5 – > 6] therefore, map reduction will output vertices 1,5,6 for node 1 ((a) each vertex can only be calculated once, and (b) we include the current vertex only when there is a circular path with length 3, such as examples [1 – > 2 – > 4 – > 1] and [1 – > 3 – > 4 – > 1]

I'm disappointed because I believe it requires knowledge of graph theory and algorithms, and we haven't been taught anything related to it

I would appreciate it if someone could give me the right direction to start I've studied the shortest path theory, but I'm not sure if it will be useful for this particular exercise

Thank you first and have a nice holiday

edit

I try to create an adjustment list, but I want to output it as "vertex ID" "node1 node2 node3 node4..." I see that in the output file, my reducer splits the list of each vertex ID into three pairs

For example, if I have a vertex a connected to Z, x, C, V, B, N, m, G, h, J, K, l, I output it as

Z,C

A V,N

A M,H

A J,L

Here are my mapper and reducer

public class AdjacentsListDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {



        Configuration conf = getConf();
        Job job = Job.getInstance(conf);
        job.setJobName("Test driver");
        job.setJarByClass(AdjacentsListDriver.class);

        String[] arg0 = new GenericOptionsParser(conf,args).getRemainingArgs();
        if (arg0.length != 2) {
            System.err.println("Usage: hadoop jar <my_jar> <input_dir> <output_dir>");
            System.exit(1);
        }

        Path in = new Path(arg0[0]);
        Path out = new Path(arg0[1]);

        FileInputFormat.setInputPaths(job,in);
        FileOutputFormat.setOutputPath(job,out);

        job.setMapperClass(ListMapper.class);
        job.setReducerClass(ListReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);    
        job.waitForCompletion(true);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(),new AdjacentsListDriver(),args);
        System.exit(res);

    }



}





/**
 * @author George
 * Theoretically this takes a key(vertexID) and maps all nodes that are connected to it in one hop....
 *
 */
public class ListMapper extends Mapper<LongWritable,Text,Text> {

    private Text vertexID = new Text();
    //private LongWritable vertice= new LongWritable(0);
    private Text vertice=new Text();

    public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException {

        String line = value.toString();
        StringTokenizer itr = new StringTokenizer(line,"\n");
        StringTokenizer itrInside;

        //vertice=new LongWritable(Long.valueOf(value.toString()).longValue());


        while (itr.hasMoreTokens()) {
            if(itr.countTokens() > 2){

            }//ignore first line ??
            else{
                itrInside=new StringTokenizer(itr.toString());
                vertexID.set(itr.nextToken());

                while(itrInside.hasMoreTokens()){               
                    vertice.set(itrInside.nextToken());
                    context.write(vertexID,value);
                }           
            }
        }

    }

}

@override
public class ListReducer extends Reducer<Text,Text>{
    public void reduce(Text key,Iterable<Text> values,InterruptedException {

        String vertices="";

        for (Text value : values) {
            if(vertices=="")
                vertices+=value.toString();
            else
                vertices=vertices+","+value.toString();         
        }

        Text value=new Text();
        value.set(vertices);
        context.write(key,value);

    }

}

Solution

Since this is your (homework) task, I will not include a Java / Hadoop solution, but I will try to make MapReduce's graphical computing concept clearer so that you can implement it yourself

For each vertex, you want exactly the N - hop vertex When viewing the shortest path algorithm, you are on the right path, but it can be solved more easily through simple breadth first search

However, when using MapReduce to process graphics, you need to have a deeper understanding of message passing between vertices Graph algorithms are commonly represented by multiple jobs, in which the map and reduce stages have the following assignments:

>Mapper: send messages to another vertex (usually used for each neighbor of a vertex) > reducer: Group incoming messages, add them to the core graph and reduce them, and sometimes send more messages

Each job will always follow the output of the previous job until you reach the result or abandon it

Data preparation

Before you really want to run the graph algorithm, make sure your data is in the form of adjacency list It will make the following iterative algorithms easier to implement

Therefore, you need to group them by vertex ID, not adjacent tuples These are some pseudo code:

map input tuple (X,Y): 
   emit (X,Y)

reduce input (X,Y[]) :
  emit (X,Y[])

Basically, you just group by vertex ID, so your input data is a key of its neighbor (vertex ID) (a list of vertex IDs that can be reached from that specific key vertex ID) If you want to save resources, you can use reducer as a combiner

algorithm

As I mentioned, you only need a breadth first search algorithm You will perform a breadth first search algorithm for each vertex in the graph. When you hit a neighbor, you will only increase a hop counter, which tells us the distance from the overhead point (this is the simplest case of the shortest path algorithm, that is, when the edge weight is 1)

Let me show you a simple picture and describe it with a simple chart Orange means visit, blue means no visit, and green means our result In parentheses is the hop counter

You see, in each iteration, we set a hopcounter for each vertex If we click on a new vertex, we will only add one If we reach the nth vertex, we will mark it for future search in some way

Distribution using MapReduce

Although breadth first search for each vertex seems wasteful, we can do better by parallelization Messages are delivered to the game Like the above picture, each vertex we get in the mapping step will initially send a message to the neighbor containing the following payload:

HopMessage: Origin (VertexID) | HopCounter(Integer)

In the first iteration, we will try to send a message to the neighbor to start the calculation Otherwise, we will only proxy charts or incoming messages

Therefore, in the first job after data preparation, map and reduce look like this:

map input (VertexID key,either HopMessage or List<VertexID> adjacents):
  if(iterationNumber == 1): // only in the first iteration to kick off
     for neighbour in adjacents:
        emit (neighbour,new HopMessage(key,0))
  emit (key,adjacents or HopMessage) // for joining in the reducer

Reducer now makes a simple connection between graphics and messages, mainly to obtain the neighbors of vertices, resulting in input (using my simple graphics):

1 2 // graph 
2 1 // hop message
2 3 // graph
3 1 // hop message
3 4 // graph
4 1 // hop message
4 - // graph

In the reducer step, we will forward the message to the neighbor again, and check whether the hop counter has reached 3.5 after increasing

reducer input(VertexID key,List<either HopMessage or List<VertexID> neighbours> values):
 for hopMessage in values:
    hopMessage.counter += 1
    if (hopMessage.counter == 3) 
       emit to some external resource (HopMessage.origin,key)
    else 
       for neighbour in neighbours of key:
          emit (neighbour,hopMessage)
    emit (key,neighbours)

As you can see, it can become very confusing: you need to manage two different types of messages, and then write some external resources that will track vertices with exactly 3 hops

As long as there are hopmessages to send, you can schedule iterative jobs This can easily lead to the problem of loops in the graph, because in this case, you will increase hopcounter infinitely So I suggest sending the complete traversal path with each message so far (very wasteful) or just limiting the number of iterations to be performed In the case of n = 3, no more than 3 job iterations are required

There are many blogs and projects that can provide you with examples of how to deal with every problem in Hadoop At least I've written about graphic operations in MapReduce in my blog. You can find some examples on my GitHub

Clean up output data

Finally, you will have a pile of files containing vertices – > vertex mapping You can reduce them as in preparation

Using the niftier method of pregel

A less cumbersome way to deal with graphics is to use the pregel method to express graphics calculation Pregel is using a vertex centered model to make it easier to express this breadth first calculation

The following is a simple example of the above algorithm using Apache HAMA:

public static class NthHopVertex extends Vertex<Text,NullWritable,HopMessage> {

    @Override
    public void compute(Iterable<HopMessage> messages) throws IOException {
      if (getSuperstepCount() == 0L) {
        HopMessage msg = new HopMessage();
        msg.origin = getVertexID().toString();
        msg.hopCounter = 0;
        sendMessageToNeighbors(msg);
      } else {
        for (HopMessage message : messages) {
          message.hopCounter += 1;
          if (message.hopCounter == 3) {
            getPeer().write(new Text(message.origin),getVertexID());
            VoteToHalt();
          } else {
            sendMessageToNeighbors(message);
          }

        }
      }
    }
  }

The new graph created by btw in your example is as follows:

1=[1,6]
2=[2,3,6]
3=[2,6]
4=[4,5]

The following is the complete HAMA graph implementation:

https://github.com/thomasjungblut/tjungblut-graph/blob/master/src/de/jungblut/graph/bsp/NthHop.java

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>