Java & rabbitmq – queuing and multithreading – or couchbase as a job queue

I have a job distributor that publishes news on different channels

In addition, I want two (and later) consumers to work on different tasks and run on different machines (at present, I have only one and need to expand it)

Let's name these tasks (just examples):

Fibonacci > Random books (generate random sentences to write a Book)

These tasks take up to 2-3 hours and should be equally distributed to each consumer

Each consumer can use x parallel threads to handle these tasks So I said: (these numbers are just examples and will be replaced by variables)

>Machine 1 can consume 3 parallel jobs for Fibonacci and 5 parallel jobs for randombooks > machine 2 can consume 7 parallel jobs for Fibonacci and 3 parallel jobs for randombooks

How can I achieve it?

Do I have to start the x thread of each channel to listen to each consumer?

When do I need to confirm?

My current method for only one consumer is to start x threads for each task - each thread is a defaultconsumer that implements runnable In the handledelivery method, I call basicack (deliverytag, false) and execute the work

Further: I want to give some tasks to a special consumer How to achieve fair distribution in combination with the above?

This is my publishing code

String QUEUE_NAME = "FIBONACCI";

Channel channel = this.clientManager.getRabbitMQConnection().createChannel();

channel.queueDeclare(QUEUE_NAME,true,false,null);

channel.basicpublish("",QUEUE_NAME,MessageProperties.BASIC,Control.getBytes(this.getArgument()));

channel.close();

This is my consumer code

public final class Worker extends DefaultConsumer implements Runnable {
    @Override
    public void run() {

        try {
            this.getChannel().queueDeclare(this.jobType.toString(),null);
            this.getChannel().basicConsume(this.jobType.toString(),this);

            this.getChannel().basicQos(1);
        } catch (IOException e) {
            // catch something
        }
        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Control.getLogger().error("Exception!",e);
            }

        }
    }

    @Override
    public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] bytes) throws IOException {
        String routingKey = envelope.getRoutingKey();
        String contentType = properties.getContentType();
        this.getChannel().basicAck(deliveryTag,false); // Is this right?
        // Start new Thread for this task with my own ExecutorService

    }
}

In this case, the staff started twice: one for fibunacci and one for randombooks

UPDATE

As mentioned in the answer, rabbitmq is not the best solution, but couchbase or mongodb pull method will be the best I'm new to these systems. Can anyone explain to me how this will be achieved?

Solution

This is a conceptual point of view, how I will build it on couchbase

>You have some machines to process jobs, and some machines (which may be the same) to create jobs. > You can create a document for each job in a bucket in couchbase (and set its type to "job" or mix it with other data in it). > Each job description and the specific command to be completed may include creation time, expiration time (if a specific time expires) and some generated work value The value of this work will be arbitrary units Every consumer's job will know how many units can be made at a time and how many units can be used (because other staff may be working) Therefore, a machine has 10 work units and 6 work units. You can query 4 work units or less. > There is a view in couchbase that the incremental update of the map / reduction work, I think you only need the map stage here You can write a view so that you can query the time entered by the system and the number of work units in time In this way, you can get "the latest work of more than 4 work units" This query, with the release of capacity, will first get the most work, although you can get the largest overdue work. If not, then the largest overdue work (expiration is the difference between the current time and the work due date) > the couchbase view allows such a very complex query When they are updated gradually, they are not completely real-time Therefore, you will not find a job, but a list of job candidates. > Therefore, the next step will be to get the candidate list and check the second location - which may be a capsule of a lock file (e.g. RAM cache, non persistent) Locking files will have multiple stages (here, you can use crdt to do some partition parsing logic or any method that best suits your needs). > Since this bucket is RAM based, it is faster than the view and has less lag from the overall state If there is no lock file, a file with the status flag "temporary" is created. > If another worker gets the same job and sees the lock file, it can skip the candidate and execute the next one in the list. > If, for some reason, two staff members try to create a lock file for the same work, there will be a conflict In case of conflict, you can calm down Or you can have logic that each staff member may place a random number or some priority numbers when updating the locked file (crdt resolution so that these powers are equal to brothers and sisters can be merged). > After the specified time (about a few seconds), the staff checks the locked file. If no ethnic resolution change is made, it changes the status of the locked file from "temporary" to "take" > and then it updates the status of the job itself to "taken" or some such, so that it will not appear in the view when other staff are looking for available jobs. > Finally, you need to add another step. In order to obtain the job seekers I have done above, you can find the occupied job through special query, but the job involved has died (e.g. overdue work) One way to know when a staff member dies is that the locked file placed in the membrane barrel should have an expiration time, which will eventually make it disappear This time may be very short. The staff just need to touch it to update the expired (this is supported in couchbase API) > if the staff dies, the locked file will disappear eventually, and the isolated work will be marked as "cancelled", but the file is not locked. This is a condition that the staff looking for work can find

Therefore, in a word, each staff member queries the isolated jobs. If there is any check to see whether there is a lock file in turn, one will be created and the normal lock protocol described above will be followed If there are no orphaned jobs, it looks for overdue jobs and follows the locking protocol If there is no expired work, it only needs the oldest work and follows the locking protocol

Of course, if your system is not "overdue", and if timeliness is not important, it can also be done, instead of using the oldest work, you can use other methods

Another approach might be to create a random value between 1 and N, where n is a fairly large number, such as 4 × The number of workers, and each job is marked with this value Every time a worker looks for a job, he can roll the dice to see if there is any such job If not, it will do so again until it finds a job with that number In this way, instead of multiple workers competing for a few "oldest" or highest priority jobs, rather than more possibilities of lock contention, they will be dispersed At the expense of time, it is more random than FIFO

The random method can also be applied when the load value needs to be accommodated (because a single machine does not bear too much load), instead of using the oldest candidate, just randomly select a list of feasible jobs and try to do so

Edit add:

In step 12, I said "a random number may be placed". I mean, if the staff knows the priority (for example, what needs to be done most), they can put a number representing this into the file If there is no concept of "need" work, then they can roll the dice They updated the file with the role of dice Then they can all look at it and look at the other scrolls If they lose, then they will calm down and another staff member knows it has it In this way, you can solve which staff member does not need complex agreements or negotiations I assume that two staff members encounter the same lock file here. It can use two lock files and one to find all these files If after a period of time, no worker rolls over a higher number (the new worker will know that others are already rolling when he thinks of his work, so they will skip it) you can safely know that you are the only staff working hard

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>