Redoing RabbitMQ’s tutorial – part 2

And here we go with RabbitMQ's tutorial 2: work queues:

In this one we'll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.

As previously, I'll just show the finished code. Please refer to the tutorial and to Net::RabbitMQ's documentation for the details. …new_task.pl

#!/usr/bin/perl

use strict ;
use warnings ;

use Net::RabbitMQ ;

{
    # closure to return a new channel ID every time we call nextchan
    my $nextchan = 1 ;
    sub nextchan { return $nextchan++ } ;
}

### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms = () ;
my %declare_opts = ( durable => 1, auto_delete => 0 ) ;
my $qname   = q{gravity.checks} ;
my $count   = $ARGV[0] ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################

my $mq      = Net::RabbitMQ->new() ;
my $chanID  = nextchan() ;

$mq->connect($qserver, %qparms) ;
$mq->channel_open($chanID) ;
$mq->queue_declare($chanID,$qname,%declare_opts,) ;

for (my $i = 1 ; $i <= $count ; $i++)
{
    my $sec     = 1+int(rand(10)) ;
    my $message = qq{This task will last for $sec seconds} ;

    $mq->publish($chanID,$qname,$message,{ exchange => "" },) ;

    print STDERR qq{Message "$message" sent to queue $qnamen} ;
}

$mq->disconnect ;

worker.pl

#!/usr/bin/perl

use strict ;
use warnings ;

use Net::RabbitMQ ;

{
    # closure to return a new channel ID every time we call nextchan
    my $nextchan = 1 ;
    sub nextchan { return $nextchan++ } ;
}

### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms = () ;
my %consume_opts = (
    consumer_tag => "worker_$$",
    no_ack       => 0,
    exclusive    => 0
    ) ;
my %declare_opts = ( durable => 1, auto_delete => 0 ) ;
my $qname   = q{gravity.checks} ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################
my $mq      = Net::RabbitMQ->new() ;
my $chanID  = nextchan() ;

$mq->connect($qserver, %qparms) ;
$mq->channel_open($chanID) ;
$mq->basic_qos($chanID,{ prefetch_count => 1 }) ;
$mq->queue_declare($chanID,$qname,%declare_opts,) ;
$mq->consume($chanID,$qname,%consume_opts) ;

# NOTE THAT recv() is BLOCKING!!! get wasn't!
while ( my $payload = $mq->recv() )
{
    last if not defined $payload ;
    my $body  = $payload->{body} ;
    my $dtag  = $payload->{delivery_tag} ;
    my ($sec) = ( $body =~ m{(d+)} ) ;
    print STDERR qq{Worker $$: Received from queue $qname: $bodyn} ;
    sleep $sec ;
    $mq->ack($chanID,$dtag,) ;
    print STDERR qq{Worker $$: Work done in $sec secondsn} ;
}

output

bronto@cooper:~/Lab/gravity/tutorial-2$ ./worker.pl & ./worker.pl &
[1] 8079
[2] 8080
bronto@cooper:~/Lab/gravity/tutorial-2$ ./new_task.pl 5
Message "This task will last for 8 seconds" sent to queue gravity.checks
Message "This task will last for 2 seconds" sent to queue gravity.checks
Message "This task will last for 6 seconds" sent to queue gravity.checks
Message "This task will last for 8 seconds" sent to queue gravity.checks
Message "This task will last for 6 seconds" sent to queue gravity.checks
Worker 8080: Received from queue gravity.checks: This task will last for 8 seconds
Worker 8079: Received from queue gravity.checks: This task will last for 2 seconds
bronto@cooper:~/Lab/gravity/tutorial-2$ Worker 8079: Work done in 2 seconds
Worker 8079: Received from queue gravity.checks: This task will last for 6 seconds
Worker 8080: Work done in 8 seconds
Worker 8080: Received from queue gravity.checks: This task will last for 8 seconds
Worker 8079: Work done in 6 seconds
Worker 8079: Received from queue gravity.checks: This task will last for 6 seconds
Worker 8079: Work done in 6 seconds
Worker 8080: Work done in 8 seconds

bronto@cooper:~/Lab/gravity/tutorial-2$ kill %1 %2
bronto@cooper:~/Lab/gravity/tutorial-2$ 
[1]-  Terminated              ./worker.pl
[2]+  Terminated              ./worker.pl
bronto@cooper:~/Lab/gravity/tutorial-2$ 

Note that:

  • only one call of consume() is needed, you don't have to repeat it before each recv();
  • recv() is blocking, the workers don't stop running when the queue was empty: I had to kill the workers to stop them.
Advertisement

2 thoughts on “Redoing RabbitMQ’s tutorial – part 2

    • Hi Demiurg. Apologies for not approving your comment earlier.

      The code in the post worked at the time I wrote it. Have you checked if the library has changed the signature of the methods in the meanwhile? That would easily explain the problem.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.