Redoing RabbitMQ’s tutorial – part 3

Next step: «we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe"». … emit_log.pl

#!/usr/bin/perl

use strict ;
use warnings ;

use Net::RabbitMQ ;
use Data::Dumper ;

{
    # closure to return a new channel ID every time we call nextchan
    my $nextchan = 1 ;
    sub nextchan { return $nextchan++ } ;
}

### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms  = () ;
my %eparms  = ( exchange_type => "fanout" ) ;
my $ename   = q{gravity.xchecks} ;
my $count   = $ARGV[0] ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################

my $mq      = Net::RabbitMQ->new() ;
my $chanID  = nextchan() ;

$mq->connect($qserver, %qparms) ;
$mq->channel_open($chanID) ;
$mq->exchange_declare($chanID,$ename,%eparms) ;

for (my $i = 1 ; $i <= $count ; $i++)
{
    my $sec     = 1+int(rand(10)) ;
    my $message = qq{This task will last for $sec seconds} ;

    $mq->publish($chanID,"",$message,{ exchange => $ename },) ;

    print STDERR qq{Message "$message" sent to exchange $enamen} ;
}

$mq->disconnect ;

receive_logs.pl

#!/usr/bin/perl

use strict ;
use warnings ;

use Net::RabbitMQ ;

{
    # closure to return a new channel ID every time we call nextchan
    my $nextchan = 1 ;
    sub nextchan { return $nextchan++ } ;
}

### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms = () ;
my %eparms  = ( exchange_type => "fanout" ) ;
my %consume_opts = (
    consumer_tag => "worker_$$",
    no_ack       => 0,
    exclusive    => 0,
    ) ;
my %declare_opts = ( durable => 1, auto_delete => 1, exclusive => 1 ) ;
my $qname ;
my $ename = q{gravity.xchecks} ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################


my $mq      = Net::RabbitMQ->new() ;
my $chanID  = nextchan() ;

$mq->connect($qserver, %qparms) ;
$mq->channel_open($chanID) ;
$mq->exchange_declare($chanID,$ename,%eparms) ;

$qname = $mq->queue_declare($chanID,"",%declare_opts,) ;
$mq->queue_bind($chanID, $qname, $ename, "",) ;

print STDERR qq{Bound to queue $qnamen} ;

$mq->consume($chanID,$qname,%consume_opts) ;

# NOTE THAT recv() is BLOCKING!!! get wasn't!
while ( my $payload = $mq->recv() )
{
    last if not defined $payload ;
    my $body  = $payload->{body} ;
    my $dtag  = $payload->{delivery_tag} ;
    my ($sec) = ( $body =~ m{(d+)} ) ;
    print STDERR qq{Worker $$: Received from queue $qname: $bodyn} ;
    sleep $sec ;
    $mq->ack($chanID,$dtag,) ;
    print STDERR qq{Worker $$: Work done in $sec secondsn} ;
}

output

bronto@cooper:~/Lab/gravity/tutorial-3$ ./receive_logs.pl & ./receive_logs.pl &
[1] 8886
[2] 8887
bronto@cooper:~/Lab/gravity/tutorial-3$ Bound to queue amq.gen-j_2BRhSJOy7KoR-qFcEOeQ
Bound to queue amq.gen-qH7OYl6EBEW3yqw0grf1DQ
./emit_log.pl 3
Message "This task will last for 4 seconds" sent to exchange gravity.xchecks
Message "This task will last for 5 seconds" sent to exchange gravity.xchecks
Message "This task will last for 7 seconds" sent to exchange gravity.xchecks
Worker 8887: Received from queue amq.gen-j_2BRhSJOy7KoR-qFcEOeQ: This task will last for 4 seconds
Worker 8886: Received from queue amq.gen-qH7OYl6EBEW3yqw0grf1DQ: This task will last for 4 seconds
bronto@cooper:~/Lab/gravity/tutorial-3$ Worker 8887: Work done in 4 seconds
Worker 8887: Received from queue amq.gen-j_2BRhSJOy7KoR-qFcEOeQ: This task will last for 5 seconds
Worker 8886: Work done in 4 seconds
Worker 8886: Received from queue amq.gen-qH7OYl6EBEW3yqw0grf1DQ: This task will last for 5 seconds
Worker 8887: Work done in 5 seconds
Worker 8887: Received from queue amq.gen-j_2BRhSJOy7KoR-qFcEOeQ: This task will last for 7 seconds
Worker 8886: Work done in 5 seconds
Worker 8886: Received from queue amq.gen-qH7OYl6EBEW3yqw0grf1DQ: This task will last for 7 seconds
Worker 8887: Work done in 7 seconds
Worker 8886: Work done in 7 seconds

bronto@cooper:~/Lab/gravity/tutorial-3$ kill %1 %2
bronto@cooper:~/Lab/gravity/tutorial-3$ 
[1]-  Terminated              ./receive_logs.pl
[2]+  Terminated              ./receive_logs.pl

Notes

  • I have learnt at my own expenses that using an empty string for the queue name in queue_declare is not equivalent to leave it undefined; in the latter case, the method does not what you want;
  • have you noticed that we are publishing the messages to the exchange, and we don't use queues in emit_log.pl?
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s