Fork and Join: Java Can Excel at Painless Parallel Programming Too!— turn
Original address: http://www.oracle.com/technetwork/articles/java/fork-join-422606.html
Concurrent Programming in Java
Plain Old Threads
Thread thread = new Thread() { @Override public void run() { System.out.println(">>> I am running in a separate thread!"); } }; thread.start(); thread.join();
Rich Primitives with the java. util. concurrent Packages
Note: Due to the new integer literals introduced by Java SE 7,underscores can be inserted anywhere to improve readability (for example,1_000_000).
import java.util.*; import java.util.concurrent.*; import static java.util.Arrays.asList;public class Sums {
static class Sum implements Callable<Long> { private final long from; private final long to; Sum(long from,long to) { this.from = from; this.to = to; } @Override public Long call() { long acc = 0; for (long i = from; i <= to; i++) { acc = acc + i; } return acc; } } public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newFixedThreadPool(2); List <Future<Long>> results = executor.invokeAll(asList( new Sum(0,10),new Sum(100,1_000),new Sum(10_000,1_000_000) )); executor.shutdown(); for (Future<Long> result : results) { Sy<a href="https://www.jb51.cc/tag/stem/" target="_blank" class="keywords">stem</a>.out.println(result.get()); } }
}
Fork/Join Tasks
figure 1: Partial Sums over an Array of Integers
Additions for Supporting Parallelism
figure 2: Cooperation Among Fork and Join Tasks
Example: Counting Occurrences of a Word in Documents
class Document { private final Listlines; Document(List<String> lines) { this.lines = lines; } List<String> getLines() { return this.lines; } static Document fromFile(File file) throws IOException { List<String> lines = new LinkedList<>(); try(BufferedReader reader = new BufferedReader(new FileReader(file))) { String line = reader.readLine(); while (line != null) { lines.add(line); line = reader.readLine(); } } return new Document(lines); }
}
Note: If you are new to Java SE7,you should be surprised by the fromFile() method on two accounts:
class Folder { private final ListsubFolders; private final List documents; Folder(List<Folder> subFolders,List<Document> documents) { this.subFolders = subFolders; this.documents = documents; } List<Folder> getSubFolders() { return this.subFolders; } List<Document> getDocuments() { return this.documents; } static Folder fromDirectory(File dir) throws IOException { List<Document> documents = new LinkedList<>(); List<Folder> subFolders = new LinkedList<>(); for (File entry : dir.listFiles()) { if (entry.isDirectory()) { subFolders.add(Folder.fromDirectory(entry)); } else { documents.add(Document.fromFile(entry)); } } return new Folder(subFolders,documents); }
}
import java.io.*; import java.util.*; import java.util.concurrent.*;public class WordCounter {
String[] wordsIn(String line) { return line.trim().split("(\\s|\\p{Punct})+"); } Long occurrencesCount(Document document,String searchedWord) { long count = 0; for (String line : document.getLines()) { for (String word : wordsIn(line)) { if (searchedWord.equals(word)) { count = count + 1; } } } return count; }
}
figure 3: Fork/Join Word Counting Tasks
Let us begin with DocumentSearchTask,which counts the occurrences of a word in a document:
class DocumentSearchTask extends RecursiveTask{ private final Document document; private final String searchedWord; DocumentSearchTask(Document document,String searchedWord) { <a href="https://www.jb51.cc/tag/super/" target="_blank" class="keywords">super()</a>; this.document = document; this.searchedWord = searchedWord; } @Override protected Long compute() { return occurrencesCount(document,searchedWord); }
}
class FolderSearchTask extends RecursiveTask{ private final Folder folder; private final String searchedWord; FolderSearchTask(Folder folder,String searchedWord) { super(); this.folder = folder; this.searchedWord = searchedWord; } @Override protected Long compute() { long count = 0L; List > forks = new LinkedList<>(); for (Folder subFolder : folder.getSubFolders()) { FolderSearchTask task = new FolderSearchTask(subFolder,searchedWord); forks.add(task); task. fork(); } for (Document document : folder.getDocuments()) { DocumentSearchTask task = new DocumentSearchTask(document,searchedWord); forks.add(task); task. fork(); } for (RecursiveTask task : forks) { count = count + task.join(); } return count; } } private final ForkJoinPool forkJoinPool = new ForkJoinPool();Long countOccurrencesInParallel(Folder folder,String searchedWord) {
return forkJoinPool.invoke(new FolderSearchTask(folder,searchedWord));
}public static void main(String[] args) throws IOException { WordCounter wordCounter = new WordCounter(); Folder folder = Folder.fromDirectory(new File(args[0])); System.out.println(wordCounter.countOccurrencesOnSingleThread(folder,args[1])); }Long countOccurrencesOnSingleThread(Folder folder,String searchedWord) { long count = 0; for (Folder subFolder : folder.getSubFolders()) { count = count + countOccurrencesOnSingleThread(subFolder,searchedWord); } for (Document document : folder.getDocuments()) { count = count + occurrencesCount(document,searchedWord); } return count; }Discussion
Table 1: Informal Test Execution Times and Speedup
Single-Thread Execution Time (ms)
Fork/Join Execution Time (ms)
Speedup
two
eighteen thousand seven hundred and ninety-eight
eleven thousand and twenty-six
one point seven zero four eight seven nine three seven six
four
nineteen thousand four hundred and seventy-three
eight thousand three hundred and twenty-nine
two point three three seven nine seven five seven four seven
eight
eighteen thousand nine hundred and eleven
four thousand two hundred and eight
four point four nine four zero five eight nine three five
twelve
nineteen thousand four hundred and ten
two thousand eight hundred and seventy-six
six point seven four eight nine five six eight eight five
figure 4: Speedup (Vertical Axis) with Respect to the Number of Cores (Horizontal Axis)
We Could have refined the computation to also fork tasks to operate not at the document level,but at the line level. This would have made it possible for concurrent tasks to operate on different lines of the same document. This would,be far-fetched. Indeed,a fork/join task should perform a “sufficient” amount of computation to overcome the fork/join thread pool and task management overhead. Working at the line level would be too trivial and hamper the efficiency of the approach.
The included source code also features another fork/join example based on the merge-sort algorithm over arrays of integers. This is interesting because it is implemented using RecursiveAction,the fork/join task that does not yield values on join()method invocations. Instead,tasks share mutable state: the array to be sorted. Again,experiments show a near-linear speedup in the number of cores.
Conclusion
This article discussed concurrent programming in Java with a strong focus on the new fork/join tasks provided by Java SE 7 for making it easier to write parallel programs. The article showed that rich primitives can be used and assembled to write high-performance programs that take advantage of multicore processors,all without having to deal with low-level manipulation of threads and shared state synchronization. The article illustrated the use of those new APIs on a word-occurrence counting example,which is both compelling and easy to grasp. A near-linear speedup was obtained in the number of cores in an informal test. These results show how useful the fork/join framework can be; because we neither had to change the code nor tweak it or the Java Virtual Machine to maximize hardware core utilization.
You can apply this technique to your own problems and data models,too. You should see sensible speedups as long as you can rewrite your algorithms in a “divide and conquer” fashion that is free of I/O work and locking.