Multithreading – Perl asynchronous task for “any” code, no matter what it is?
I have been writing a "Checker" system to perform various "checks" on various services, systems, databases, files, etc Inspection is universal in nature and can be anything All inspections are reported in a common format for their pass or fail, whatever may be
It is written in modular OO, so developers can simply follow the framework and check independently of one of them Each object contains a shared report object. After they run the check, they only need $self - > {reporting '} – > report (params) The parameters are defined and it is assumed that the developer reports appropriately The report object then indexes the reports My main loader script contains the following entries:
my $reportingObject = new Checks::Reporting(params); my @checks; push @checks,new Checks::Check_One($reportingObject,params)); push @checks,params)); . . push @checks,new Checks::Check_N($reportingObject,params));
After completing the inspection and report, I have been doing:
foreach my $check (@checks) { $check->run_stuff(); } $reportingObject->finalize_report();
Now, because these checks are completely independent (don't worry about reporting objects), they can run in parallel As an improvement, I made:
my @threads; foreach my $check (@checks) { push @threads,async { $check->run_stuff(); } } foreach my $thread (@threads) { $thread->join; } #All threads are complete,and thus all checks are done $reportingObject->finalize_report();
As I said before, developers will write checks independently of each other Some tests are simple, others are not Simple checks may not have asynchronous code, but others may need to run asynchronously internally, for example
sub do_check { my @threads; my @list = @{$self->{'list'}}; foreach my $item (@list) { push @threads,async { #do_work_on_$item #return 1 or 0 for success or fail }; foreach my $thread (@threads) { my $res = $thread->join; if($res == 1) { $self->{'reporting'}->report(params_here); } } } }
As you can see, the threading model allows me to perform operations in a very vague way Each "check" runs independently in its own thread If a developer has an asynchronous operation, whatever it is, he just completes it independently in his own thread I want a model like this
Unfortunately, threads are slow and inefficient All asynchronous libraries have specific observers, such as io I don't want anything specific I want an event based model that allows me to simply start asynchronous tasks, no matter what they are, just tell them when to finish, so I can move on
I hope this will explain it and you can point me in the right direction
Solution
This seems well suited to the boss worker model:
>Some workers were created at the beginning of the project Make sure they all have access to the queue. > Queue up for multiple inspections as needed The staff will check the queues, dequeue them, execute them, and queue the results into the output queue. > Your main thread looks at the results of the output thread and does whatever it wants. > Join workers in the end block
If you want to put coderefs on the queue, you may want to see thread:: queue:: any
This is a fully operational example:
use strict; use feature 'say'; use threads; use threads::shared; use Thread::Queue::Any; use constant NUM_THREADS => 5; local $Storable::Deparse = 1; local $Storable::Eval = 1; # needed to serialize code my $check_q = Thread::Queue::Any->new; my $result_q = Thread::Queue::Any->new; # start the workers { my $running :shared = NUM_THREADS; my @threads = map threads->new(\&worker,$check_q,$result_q,\$running),1..NUM_THREADS; END { $_->join for @threads } } # enqueue the checks $check_q->enqueue($_) for sub {1},sub{2},sub{"hi"},sub{ die }; $check_q->enqueue(undef) for 1..NUM_THREADS; # end the queue while(defined( my $result = $result_q->dequeue )) { report($$result); } sub report { say shift // "Failed"; } sub worker { my ($in,$out,$running_ref) = @_; while (defined( my $check = $in->dequeue )) { my $result = eval { $check->() }; $out->enqueue(\$result); } # last thread closes the door lock $$running_ref; --$$running_ref || $out->enqueue(undef); }
This print
1 2 hi Failed
In a slightly random order