And here we go with RabbitMQ's tutorial 2: work queues:
In this one we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.
As previously, I'll just show the finished code. Please refer to the tutorial and to Net::RabbitMQ's documentation for the details. …new_task.pl
#!/usr/bin/perl use strict ; use warnings ; use Net::RabbitMQ ; { # closure to return a new channel ID every time we call nextchan my $nextchan = 1 ; sub nextchan { return $nextchan++ } ; } ### BEGIN CONFIGURABLE PARAMETERS ###################################### my $qserver = q{gravity} ; my %qparms = () ; my %declare_opts = ( durable => 1, auto_delete => 0 ) ; my $qname = q{gravity.checks} ; my $count = $ARGV[0] ; ### NO CONFIGURABLE PARAMETERS BELOW THIS LINE ######################### my $mq = Net::RabbitMQ->new() ; my $chanID = nextchan() ; $mq->connect($qserver, %qparms) ; $mq->channel_open($chanID) ; $mq->queue_declare($chanID,$qname,%declare_opts,) ; for (my $i = 1 ; $i <= $count ; $i++) { my $sec = 1+int(rand(10)) ; my $message = qq{This task will last for $sec seconds} ; $mq->publish($chanID,$qname,$message,{ exchange => "" },) ; print STDERR qq{Message "$message" sent to queue $qnamen} ; } $mq->disconnect ;
worker.pl
#!/usr/bin/perl use strict ; use warnings ; use Net::RabbitMQ ; { # closure to return a new channel ID every time we call nextchan my $nextchan = 1 ; sub nextchan { return $nextchan++ } ; } ### BEGIN CONFIGURABLE PARAMETERS ###################################### my $qserver = q{gravity} ; my %qparms = () ; my %consume_opts = ( consumer_tag => "worker_$$", no_ack => 0, exclusive => 0 ) ; my %declare_opts = ( durable => 1, auto_delete => 0 ) ; my $qname = q{gravity.checks} ; ### NO CONFIGURABLE PARAMETERS BELOW THIS LINE ######################### my $mq = Net::RabbitMQ->new() ; my $chanID = nextchan() ; $mq->connect($qserver, %qparms) ; $mq->channel_open($chanID) ; $mq->basic_qos($chanID,{ prefetch_count => 1 }) ; $mq->queue_declare($chanID,$qname,%declare_opts,) ; $mq->consume($chanID,$qname,%consume_opts) ; # NOTE THAT recv() is BLOCKING!!! get wasn't! while ( my $payload = $mq->recv() ) { last if not defined $payload ; my $body = $payload->{body} ; my $dtag = $payload->{delivery_tag} ; my ($sec) = ( $body =~ m{(d+)} ) ; print STDERR qq{Worker $$: Received from queue $qname: $bodyn} ; sleep $sec ; $mq->ack($chanID,$dtag,) ; print STDERR qq{Worker $$: Work done in $sec secondsn} ; }
output
bronto@cooper:~/Lab/gravity/tutorial-2$ ./worker.pl & ./worker.pl & [1] 8079 [2] 8080 bronto@cooper:~/Lab/gravity/tutorial-2$ ./new_task.pl 5 Message "This task will last for 8 seconds" sent to queue gravity.checks Message "This task will last for 2 seconds" sent to queue gravity.checks Message "This task will last for 6 seconds" sent to queue gravity.checks Message "This task will last for 8 seconds" sent to queue gravity.checks Message "This task will last for 6 seconds" sent to queue gravity.checks Worker 8080: Received from queue gravity.checks: This task will last for 8 seconds Worker 8079: Received from queue gravity.checks: This task will last for 2 seconds bronto@cooper:~/Lab/gravity/tutorial-2$ Worker 8079: Work done in 2 seconds Worker 8079: Received from queue gravity.checks: This task will last for 6 seconds Worker 8080: Work done in 8 seconds Worker 8080: Received from queue gravity.checks: This task will last for 8 seconds Worker 8079: Work done in 6 seconds Worker 8079: Received from queue gravity.checks: This task will last for 6 seconds Worker 8079: Work done in 6 seconds Worker 8080: Work done in 8 seconds bronto@cooper:~/Lab/gravity/tutorial-2$ kill %1 %2 bronto@cooper:~/Lab/gravity/tutorial-2$ [1]- Terminated ./worker.pl [2]+ Terminated ./worker.pl bronto@cooper:~/Lab/gravity/tutorial-2$
Note that:
- only one call of consume() is needed, you don't have to repeat it before each recv();
- recv() is blocking, the workers don't stop running when the queue was empty: I had to kill the workers to stop them.
not working, params should be hashref
Hi Demiurg. Apologies for not approving your comment earlier.
The code in the post worked at the time I wrote it. Have you checked if the library has changed the signature of the methods in the meanwhile? That would easily explain the problem.