Java – disruptor – eventhandlers not called
I was playing with the disruptor framework and found that my event handler was not called
This is my setup code:
private static final int BUFFER_SIZE = 1024 * 8; private final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor(); private void initializeDisruptor() { if (disruptor != null) return; disruptor = new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY,EXECUTOR,new SingleThreadedClaimStrategy(BUFFER_SIZE),new SleepingWaitStrategy()); disruptor.handleEventsWith(searchTermMatchingHandler) .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler); this.ringBuffer = disruptor.start(); }
Elsewhere, I posted some events I have tried the following two methods:
Event release method a:
private void handleStatus(final Status status) { long sequence = ringBuffer.next(); TwitterStatusReceivedEvent event = ringBuffer.get(sequence); event.setStatus(status); event.setSearchInstruments(searchInstruments); ringBuffer.publish(sequence); }
In this case, I found that the first EventHandler was called, but nothing else happened
Event release method B:
private void handleStatus(final Status status) { disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() { @Override public TwitterStatusReceivedEvent translateTo( TwitterStatusReceivedEvent event,long sequence) { event.setStatus(status); event.setSearchInstruments(searchInstruments); return event; } }); }
In this case, I found that no event handler was called at all
What on earth did I do wrong?
to update
This is my EventHandler How should I indicate that processing is complete?
public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> { @Override public void onEvent(TwitterStatusReceivedEvent event,long sequence,boolean endOfBatch) throws Exception { String statusText = event.getStatus().getText(); for (Instrument instrument : event.getSearchInstruments()) { if (statusText.contains(instrument.getSearchTerm())) { event.setMatchedInstrument(instrument); break; } } } }
Solution
Each event handler needs to run in its own thread, which will not exit until the destructor is closed Because you are using a single threaded execution program, only the first event handler that happens to execute will be executed (the disruptor class stores each handler in a HashMap so that the final run of the handler will be different)
If you switch to cachedthreadpool, you will find that it starts running You do not need to manage the serial number, because these are handled by the eventprocessor set and managed by the disruptor class for you It's perfectly right to deal only with every event you get