Fork and Join: Java Can Excel at Painless Parallel Programming Too!— turn

Original address: http://www.oracle.com/technetwork/articles/java/fork-join-422606.html

Concurrent Programming in Java

Plain Old Threads

  Thread thread = new Thread() { 
        @Override public void run() {
           System.out.println(">>> I am running in a separate thread!");
        }
   };
   thread.start();
   thread.join();

Rich Primitives with the java. util. concurrent Packages

Note: Due to the new integer literals introduced by Java SE 7,underscores can be inserted anywhere to improve readability (for example,1_000_000).

import java.util.*;
import java.util.concurrent.*;
import static java.util.Arrays.asList;

public class Sums {

static class Sum implements Callable<Long> {
    private final long from;
    private final long to;
        Sum(long from,long to) {
        this.from = from;
        this.to = to;
    }

    @Override
    public Long call() {
        long acc = 0;
        for (long i = from; i <= to; i++) {
            acc = acc + i;
        }
        return acc;
    }                
}

public static void main(String[] args) throws Exception {

    ExecutorService executor = Executors.newFixedThreadPool(2);
    List <Future<Long>> results = executor.invokeAll(asList(
        new Sum(0,10),new Sum(100,1_000),new Sum(10_000,1_000_000)
    ));
    executor.shutdown();

    for (Future<Long> result : results) {
        Sy<a href="https://www.jb51.cc/tag/stem/" target="_blank" class="keywords">stem</a>.out.println(result.get());
    }                
}    

}

Fork/Join Tasks

figure 1: Partial Sums over an Array of Integers

Additions for Supporting Parallelism

figure 2: Cooperation Among Fork and Join Tasks

Example: Counting Occurrences of a Word in Documents

class Document {
    private final List
  
  
   lines;

  
  
  
Document(List<String> lines) {
    this.lines = lines;
}

List<String> getLines() {
    return this.lines;
}

static Document fromFile(File file) throws IOException {
    List<String> lines = new LinkedList<>();
    try(BufferedReader reader = new BufferedReader(new FileReader(file))) {
        String line = reader.readLine();
        while (line != null) {
            lines.add(line);
            line = reader.readLine();
        }
    }
    return new Document(lines);
}

}

Note: If you are new to Java SE7,you should be surprised by the fromFile() method on two accounts:

class Folder {
    private final List
  
  
   subFolders;
    private final List
  
  
  
   
   
    documents;

   
   
   
Folder(List<Folder> subFolders,List<Document> documents) {
    this.subFolders = subFolders;
    this.documents = documents;
}

List<Folder> getSubFolders() {
    return this.subFolders;
}

List<Document> getDocuments() {
    return this.documents;
}

static Folder fromDirectory(File dir) throws IOException {
    List<Document> documents = new LinkedList<>();
    List<Folder> subFolders = new LinkedList<>();
    for (File entry : dir.listFiles()) {
        if (entry.isDirectory()) {
            subFolders.add(Folder.fromDirectory(entry));
        } else {
            documents.add(Document.fromFile(entry));
        }
    }
    return new Folder(subFolders,documents);
}

}

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class WordCounter {

String[] wordsIn(String line) {
    return line.trim().split("(\\s|\\p{Punct})+");
}

Long occurrencesCount(Document document,String searchedWord) {
    long count = 0;
    for (String line : document.getLines()) {
        for (String word : wordsIn(line)) {
            if (searchedWord.equals(word)) {
                count = count + 1;
            }
        }
    }
    return count;
}

}

figure 3: Fork/Join Word Counting Tasks

Let us begin with DocumentSearchTask,which counts the occurrences of a word in a document:

class DocumentSearchTask extends RecursiveTask
  
  
   {
    private final Document document;
    private final String searchedWord;

  
  
  
DocumentSearchTask(Document document,String searchedWord) {
    <a href="https://www.jb51.cc/tag/super/" target="_blank" class="keywords">super()</a>;
    this.document = document;
    this.searchedWord = searchedWord;
}

@Override
protected Long compute() {
    return occurrencesCount(document,searchedWord);
}

}

class FolderSearchTask extends RecursiveTask
  
  
   {
    private final Folder folder;
    private final String searchedWord;
    
    FolderSearchTask(Folder folder,String searchedWord) {
        
  super();
        this.folder = folder;
        this.searchedWord = searchedWord;
    }
    
    @Override
    protected Long compute() {
        long count = 0L;
        List
  
  
  
   
   
   
    
    
    > forks = new LinkedList<>();
        for (Folder subFolder : folder.getSubFolders()) {
            FolderSearchTask task = new FolderSearchTask(subFolder,searchedWord);
            forks.add(task);
            task.
    fork();
        }
        for (Document document : folder.getDocuments()) {
            DocumentSearchTask task = new DocumentSearchTask(document,searchedWord);
            forks.add(task);
            task.
    fork();
        }
        for (RecursiveTask
    
    
    
     
     
      task : forks) {
            count = count + task.join();
        }
        return count;
    }
}

    
    
    
   
   
   
  
  
  
 
 
 
private final ForkJoinPool forkJoinPool = new ForkJoinPool();

Long countOccurrencesInParallel(Folder folder,String searchedWord) {
return forkJoinPool.invoke(new FolderSearchTask(folder,searchedWord));
}

public static void main(String[] args) throws IOException {
    WordCounter wordCounter = new WordCounter();
    Folder folder = Folder.fromDirectory(new File(args[0]));
    System.out.println(wordCounter.countOccurrencesOnSingleThread(folder,args[1]));
}
Long countOccurrencesOnSingleThread(Folder folder,String searchedWord) {
    long count = 0;
    for (Folder subFolder : folder.getSubFolders()) {
        count = count + countOccurrencesOnSingleThread(subFolder,searchedWord);
    }
    for (Document document : folder.getDocuments()) {
        count = count + occurrencesCount(document,searchedWord);
    }
    return count;
}

Discussion

Table 1: Informal Test Execution Times and Speedup

Single-Thread Execution Time (ms)

Fork/Join Execution Time (ms)

Speedup

two

eighteen thousand seven hundred and ninety-eight

eleven thousand and twenty-six

one point seven zero four eight seven nine three seven six

four

nineteen thousand four hundred and seventy-three

eight thousand three hundred and twenty-nine

two point three three seven nine seven five seven four seven

eight

eighteen thousand nine hundred and eleven

four thousand two hundred and eight

four point four nine four zero five eight nine three five

twelve

nineteen thousand four hundred and ten

two thousand eight hundred and seventy-six

six point seven four eight nine five six eight eight five

figure 4: Speedup (Vertical Axis) with Respect to the Number of Cores (Horizontal Axis)

We Could have refined the computation to also fork tasks to operate not at the document level,but at the line level. This would have made it possible for concurrent tasks to operate on different lines of the same document. This would,be far-fetched. Indeed,a fork/join task should perform a “sufficient” amount of computation to overcome the fork/join thread pool and task management overhead. Working at the line level would be too trivial and hamper the efficiency of the approach.

The included source code also features another fork/join example based on the merge-sort algorithm over arrays of integers. This is interesting because it is implemented using RecursiveAction,the fork/join task that does not yield values on join()method invocations. Instead,tasks share mutable state: the array to be sorted. Again,experiments show a near-linear speedup in the number of cores.

Conclusion

This article discussed concurrent programming in Java with a strong focus on the new fork/join tasks provided by Java SE 7 for making it easier to write parallel programs. The article showed that rich primitives can be used and assembled to write high-performance programs that take advantage of multicore processors,all without having to deal with low-level manipulation of threads and shared state synchronization. The article illustrated the use of those new APIs on a word-occurrence counting example,which is both compelling and easy to grasp. A near-linear speedup was obtained in the number of cores in an informal test. These results show how useful the fork/join framework can be; because we neither had to change the code nor tweak it or the Java Virtual Machine to maximize hardware core utilization.

You can apply this technique to your own problems and data models,too. You should see sensible speedups as long as you can rewrite your algorithms in a “divide and conquer” fashion that is free of I/O work and locking.

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>