Java – how to design a system to send records and retry sending if no confirmation is received?
I'm working on a project where I need to consume a lot of records, and then I send these records to other systems using zeromq
This is the process:
>Store all incoming records in CHM of multiple threads Recording will be carried out at a very high speed. > Send these records from CHM to zeromq server from the background thread running every 1 minute. > After sending each record to the zeromq server, add them to the retry bucket so that after a specific time, if you have not received confirmation of this record, you can retry. > We also have a poller runnable thread that receives a confirmation from the zeromq server that these records have been received, so once I get the confirmation, I delete the record from the retry bucket so that it will not be retried. > Even if some records are sent twice, it doesn't matter, but it's best to do so
I don't know. The best way in my case is to minimize
Here is my processor class, one of them The add () method will be called by multiple threads to fill the dataholderbypartitionreference CHM in a thread safe manner Then, in the constructor of processor class, call sendtozeromq class, as shown below, start the background thread running every 30 seconds, and push the records from the same CHM to a group of zeromq servers
processor
public class Processor { private final scheduledexecutorservice executorService = Executors .newSingleThreadScheduledExecutor(); private final AtomicReference<ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference = new AtomicReference<>(new ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>>()); private static class Holder { private static final Processor INSTANCE = new Processor(); } public static Processor getInstance() { return Holder.INSTANCE; } private Processor() { executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { validateAndSendAllPartitions(dataHolderByPartitionReference .getAndSet(new ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>>())); } },30,TimeUnit.SECONDS); } private void validateAndSendAllPartitions( ConcurrentHashMap<Integer,ConcurrentLinkedQueue<DataHolder>> dataHolderByPartition) { // calling validateAndSend in parallel for each partition (which is map key) // generally there will be only 5-6 unique partitions max } private void validateAndSend(final int partition,final ConcurrentLinkedQueue<DataHolder> dataHolders) { Map<byte[],byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>(); int totalSize = 0; while (!dataHolders.isEmpty()) { ......... ......... SendToZeroMQ.getInstance().executeAsync(partition,clientKeyBytesAndProcessBytesHolder); } // calling again with remaining values SendToZeroMQ.getInstance().executeAsync(partition,clientKeyBytesAndProcessBytesHolder); } // called by multiple threads to populate dataHolderByPartitionReference CHM public void add(final int partition,final DataHolder holder) { // store records in dataHolderByPartitionReference in a thread safe way } }
The following is my sendtozeromq class, which sends a record to a group of zeromq servers and retries accordingly according to the confirmation delivery
>First, it will send a record to the zeromq server. > Then it will add the same record retrybucket, which will be retried later, depending on whether a confirmation is received. > In the same class, I start a background thread, run it every 1 minute, and send records again. These records are still in the retry bucket. > The same class also starts the responsepoller thread, which will always run to see which records have been confirmed (we sent before). Therefore, once the records are confirmed, the responsepoller thread will delete these records from the retrybucket, so don't try again
SendToZeroMQ
public class SendToZeroMQ { // do I need these two scheduledexecutorservice or one is sufficient to start my both the thread? private final scheduledexecutorservice executorServicePoller = Executors .newSingleThreadScheduledExecutor(); private final scheduledexecutorservice executorService = Executors .newSingleThreadScheduledExecutor(); private final Cache<Long,byte[]> retryBucket = CacheBuilder.newBuilder().maximumSize(10000000) .removalListener(RemovalListeners.asynchronous(new CustomListener(),executorService)) .build(); private static class Holder { private static final SendToZeroMQ INSTANCE = new SendToZeroMQ(); } public static SendToZeroMQ getInstance() { return Holder.INSTANCE; } private SendToZeroMQ() { executorServicePoller.submit(new ResponsePoller()); executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { for (Entry<Long,byte[]> entry : retryBucket.asMap().entrySet()) { executeAsync(entry.getKey(),entry.getValue()); } } },1,TimeUnit.MINUTES); } public boolean executeAsync(final long address,final byte[] encodedByteArray) { Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket(); if (!liveSockets.isPresent()) { return false; } return executeAsync(address,encodedByteArray,liveSockets.get().getSocket()); } public boolean executeAsync(final long address,final byte[] encodedByteArray,final Socket socket) { ZMsg msg = new ZMsg(); msg.add(encodedByteArray); boolean sent = msg.send(socket); msg.destroy(); // add to retry bucket retryBucket.put(address,encodedByteArray); return sent; } public boolean executeAsync(final int partition,final Map<byte[],byte[]> clientKeyBytesAndProcessBytesHolder) { Optional<ZMQObj> liveSockets = PoolManager.getInstance().getNextSocket(); if (!liveSockets.isPresent()) { return false; } Map<Long,byte[]> addressToencodedByteArray = encode(partition,clientKeyBytesAndProcessBytesHolder); long address = addressToencodedByteArray.entrySet().iterator().next().getKey(); byte[] encodedByteArray = addressToencodedByteArray.entrySet().iterator().next().getValue(); return executeAsync(address,liveSockets.get().getSocket()); } private Map<Long,byte[]> encode(final int partition,byte[]> clientKeyBytesAndProcessBytesHolder) { // this address will be unique always long address = TestUtils.getAddress(); Frame frame = new Frame(............); byte[] packedByteArray = frame.serialize(); // this map will always have one entry in it. return ImmutableMap.of(address,packedByteArray); } public void removeFromRetryBucket(final long address) { retryBucket.invalidate(address); } }
The following is my responsepoller class, which waits for confirmation of all these records, which have been sent by other background threads If a confirmation is received, the confirmation information is deleted from the retry bucket to avoid retry
public class ResponsePoller implements Runnable { private static final Random random = new Random(); private static final int listenerPort = 8076; @Override public void run() { ZContext ctx = new ZContext(); Socket client = ctx.createSocket(ZMQ.PULL); // Set random identity to make tracing easier String identity = String.format("%04X-%04X",random.nextInt(),random.nextInt()); client.setIdentity(identity.getBytes(ZMQ.CHARSET)); client.bind("tcp://" + TestUtils.getIPAddress() + ":" + listenerPort); PollItem[] items = new PollItem[] {new PollItem(client,Poller.POLLIN)}; while (!Thread.currentThread().isInterrupted()) { // Tick once per second,pulling in arriving messages for (int centitick = 0; centitick < 100; centitick++) { ZMQ.poll(items,10); if (items[0].isReadable()) { ZMsg msg = ZMsg.recvMsg(client); Iterator<ZFrame> it = msg.iterator(); while (it.hasNext()) { ZFrame frame = it.next(); try { long address = TestUtils.getAddress(frame.getData()); // remove from retry bucket since we got the ackNowledgment for this record SendToZeroMQ.getInstance().removeFromRetryBucket(address); } catch (Exception ex) { // log error } finally { frame.destroy(); } } msg.destroy(); } } } ctx.destroy(); } }
Question:
>From a design point of view, what is the best way to design this problem, so all my logic can operate seamlessly? > I'm pretty sure there's a better way to design this problem than what I have - what can be a better way?
Solution
In my opinion, as long as you use TCP for underlying communication, you don't have to worry about data receiving confirmation in the "application layer"
In this case – since zeromq is built on TCP itself and further optimized, you don't have to worry about successful data transmission, as long as there are no exceptions in the transport layer (this will obviously rebound to your case)
The way I see your problem is - you are running the Kafka consumer thread, which will receive and rebound messages to another message queue (in this case, ZMQ, which is using TCP and ensuring successful message delivery, or throwing exceptions at a lower level)
The simplest solution I can think of is to use thread pool from within each consumer and try to send messages using ZMQ In case of any network error, as long as your application daemon is running, you can easily focus the message on future consumption or logging
In the proposed solution, I assume that the order of messages is not in the problem space You're not looking at complicated things