Java: design idea of stopping callable threads

I'm writing a program to do some batch processing Batch elements can be processed independently of each other, and we want to minimize the overall processing time Therefore, instead of looping through each element in the batch at once, I use executorservice and submit a callable object to it:

public void process(Batch batch)
    {
        ExecutorService execService = Executors.newCachedThreadPool();
        CopyOnWriteArrayList<Future<BatchElementStatus>> futures = new CopyOnWriteArrayList<Future<BatchElementStatus>>();

        for (BatchElement element : batch.getElement())
        {
            Future<MtaMigrationStatus> future = execService.submit(new ElementProcessor(batch.getID(),element));
            futures.add(future);
        }

        boolean done = false;

        while (!done)
        {
            for (Future<BatchElementStatus> future : futures)
            {
                try
                {
                    if (future.isDone())
                    {
                        futures.remove(future);
                    }
                }
                catch (Exception e)
                {
                    System.out.println(e.getMessage());
                }

                if (futures.size() == 0)
                {
                    done = true;
                }
            }
        }
    }

We hope to be able to cancel batch processing Because I don't use loops, if the cancel flag is set, I can't just check the top of each loop

We are using a JMS topic. Batchprocessor and elementprocessor will listen to it and tell them that the batch has been cancelled

There are many steps in elementprocess call(), some of which can then be safely stopped, but there is a non - returnable point The course has this basic design:

public class ElementProcessor implements Callable,MessageListener
{
    private cancelled = false;

    public void onMessage(Message msg)
    {
        // get message object
        cancelled = true;
    }

    public BatchElementStatus call()
    {
        String status = SUCCESS;

        if (!cancelled)
        {
            doSomehingOne();
        }
        else
        {
            doRollback();
            status = CANCELLED;
        }            

        if (!cancelled)
        {
            doSomehingTwo();
        }
        else
        {
            doRollback();
            status = CANCELLED;
        }            

        if (!cancelled)
        {
            doSomehingThree();
        }
        else
        {
            doRollback();
            status = CANCELLED;
        }            

        if (!cancelled)
        {
            doSomehingFour();
        }
        else
        {
            doRollback();
            status = CANCELLED;
        }

        // After this point,we cannot cancel or pause the processing

        doSomehingFive();
        doSomehingSix();

        return new BatchElementStatus("SUCCESS");
    }

}

I wonder if there is a better way to check whether the batch / element has been cancelled, rather than wrapping the method call / code block in the calling method of the if (! Cancel) statement

Any suggestions?

Solution

I don't think you can do better than you do now, but here's an alternative:

public BatchElementStatus call() {
    return callMethod(1);
}

private callMethod(int methodCounter) {
    if (cancelled) {
       doRollback();
       return new BatchElementStatus("FAIL");
    }
    switch (methodCounter) {
       case 1 : doSomethingOne(); break;
       case 2 : doSomethingTwo(); break;
       ...
       case 5 : doSomethingFive();
                doSomethingSix();
                return new BatchElementStatus("SUCCESS");
    }
    return callMethod(methodCounter + 1);
}

In addition, you want to cancel volatile because onmessage will be called from another thread But you may not want to use onmessage and cancel (see below)

Other points: 1) copyonwritearraylist < future < batchelementstatus > > futures should be just an ArrayList Using concurrent collections misleads us into thinking that futures are on many threads 2) While (! Done) should be replaced by while (! Futures. Isempty()) and deleted 3) You should probably just call future Cancel (true) instead of "messaging" cancel Then you must check (thread. Interrupted ()) instead of if (cancel) If you want to kill all the future, just call execservice shutdownNow(); Your task must handle interrupts for it to work properly

Edit:

You should use executorcompletionservice instead of your while (! Done) {for (... Futures) {...}} It will do what you want to do, and it may do better There is a complete example in the API

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>