Java – forkjoinpool – why does the program throw outofmemoryerror?
I want to try forkjoinpool in Java 8, so I wrote a small program to search all files with names containing specific keywords in a given directory
Procedure:
public class DirectoryService { public static void main(String[] args) { FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR"); ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool(); List<String> files = pool.invoke(task); pool.shutdown(); System.out.println("Total no of files with hello" + files.size()); } } class FileSearchRecursiveTask extends RecursiveTask<List<String>> { private String path; public FileSearchRecursiveTask(String path) { this.path = path; } @Override protected List<String> compute() { File mainDirectory = new File(path); List<String> filetedFileList = new ArrayList<>(); List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>(); if(mainDirectory.isDirectory()) { System.out.println(Thread.currentThread() + " - Directory is " + mainDirectory.getName()); if(mainDirectory.canRead()) { File[] fileList = mainDirectory.listFiles(); for(File file : fileList) { System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath()); if(file.isDirectory()) { FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath()); recursiveTasks.add(task); task.fork(); } else { if (file.getName().contains("hello")) { System.out.println(file.getName()); filetedFileList.add(file.getName()); } } } } for(FileSearchRecursiveTask task : recursiveTasks) { filetedFileList.addAll(task.join()); } } return filetedFileList; } }
This program works normally when the directory does not have too many subdirectories and files, but if it is really large, it throws outofmemoryerror
My understanding is that the maximum number of threads (including compensation threads) is limited. Why are they this error? Did I miss anything in the program?
Caused by: java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486) at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2020) at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2057) at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390) at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719) at FileSearchRecursiveTask.compute(DirectoryService.java:51) at FileSearchRecursiveTask.compute(DirectoryService.java:20) at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1107) at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2046) at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390) at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719) at FileSearchRecursiveTask.compute(DirectoryService.java:51) at FileSearchRecursiveTask.compute(DirectoryService.java:20) at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
Solution
You should not spread new tasks beyond all recognition Basically, whenever possible, you should fork if another worker thread can get the forked job and evaluate it locally Then, once you have split a task, do not call join () after that. Although the underlying framework will start compensation threads to ensure that your job will continue, rather than just preventing all threads from waiting for subtasks, this will create a large number of threads that may exceed the function of the system
This is a revision of your code:
public class DirectoryService { public static void main(String[] args) { FileSearchRecursiveTask task = new FileSearchRecursiveTask(new File("./DIR")); List<String> files = task.invoke(); System.out.println("Total no of files with hello " + files.size()); } } class FileSearchRecursiveTask extends RecursiveTask<List<String>> { private static final int TARGET_SURPLUS = 3; private File path; public FileSearchRecursiveTask(File file) { this.path = file; } @Override protected List<String> compute() { File directory = path; if(directory.isDirectory() && directory.canRead()) { System.out.println(Thread.currentThread() + " - Directory is " + directory.getName()); return scan(directory); } return Collections.emptyList(); } private List<String> scan(File directory) { File[] fileList = directory.listFiles(); if(fileList == null || fileList.length == 0) return Collections.emptyList(); List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>(); List<String> filteredFileList = new ArrayList<>(); for(File file: fileList) { System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath()); if(file.isDirectory()) { if(getSurplusQueuedTaskCount() < TARGET_SURPLUS) { FileSearchRecursiveTask task = new FileSearchRecursiveTask(file); recursiveTasks.add(task); task.fork(); } else filteredFileList.addAll(scan(file)); } else if(file.getName().contains("hello")) { filteredFileList.add(file.getAbsolutePath()); } } for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) { FileSearchRecursiveTask task = recursiveTasks.get(ix); if(task.tryUnfork()) task.complete(scan(task.path)); } for(FileSearchRecursiveTask task: recursiveTasks) { filteredFileList.addAll(task.join()); } return filteredFileList; } }
The method of processing has been decomposed into the method of receiving directory as a parameter, so we can use it locally to obtain any directory that is not necessarily associated with the filesearchrecursivetask instance
The method then uses getsurplusqueuedtaskcount() to determine the number of locally queued tasks that are not picked up by other worker threads Make sure there is something to help balance the work However, if this number exceeds the threshold, the processing will be done locally and no more work is required
After local processing, it iterates the task and uses tryunfork () to identify jobs that have not been stolen by other worker threads and process them locally Trying the youngest job over and over again can improve your chances of finding some jobs
Only after that will it join all the sub jobs of () s, which are now completed or currently processed by another worker thread
Note that I changed the startup code to use the default pool This uses "CPU cores" minus a worker thread, plus the startup thread, the main thread in this example