Next step: «we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe"». … emit_log.pl
#!/usr/bin/perl
use strict ;
use warnings ;
use Net::RabbitMQ ;
use Data::Dumper ;
{
# closure to return a new channel ID every time we call nextchan
my $nextchan = 1 ;
sub nextchan { return $nextchan++ } ;
}
### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms = () ;
my %eparms = ( exchange_type => "fanout" ) ;
my $ename = q{gravity.xchecks} ;
my $count = $ARGV[0] ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################
my $mq = Net::RabbitMQ->new() ;
my $chanID = nextchan() ;
$mq->connect($qserver, %qparms) ;
$mq->channel_open($chanID) ;
$mq->exchange_declare($chanID,$ename,%eparms) ;
for (my $i = 1 ; $i <= $count ; $i++)
{
my $sec = 1+int(rand(10)) ;
my $message = qq{This task will last for $sec seconds} ;
$mq->publish($chanID,"",$message,{ exchange => $ename },) ;
print STDERR qq{Message "$message" sent to exchange $enamen} ;
}
$mq->disconnect ;
receive_logs.pl
#!/usr/bin/perl
use strict ;
use warnings ;
use Net::RabbitMQ ;
{
# closure to return a new channel ID every time we call nextchan
my $nextchan = 1 ;
sub nextchan { return $nextchan++ } ;
}
### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms = () ;
my %eparms = ( exchange_type => "fanout" ) ;
my %consume_opts = (
consumer_tag => "worker_$$",
no_ack => 0,
exclusive => 0,
) ;
my %declare_opts = ( durable => 1, auto_delete => 1, exclusive => 1 ) ;
my $qname ;
my $ename = q{gravity.xchecks} ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################
my $mq = Net::RabbitMQ->new() ;
my $chanID = nextchan() ;
$mq->connect($qserver, %qparms) ;
$mq->channel_open($chanID) ;
$mq->exchange_declare($chanID,$ename,%eparms) ;
$qname = $mq->queue_declare($chanID,"",%declare_opts,) ;
$mq->queue_bind($chanID, $qname, $ename, "",) ;
print STDERR qq{Bound to queue $qnamen} ;
$mq->consume($chanID,$qname,%consume_opts) ;
# NOTE THAT recv() is BLOCKING!!! get wasn't!
while ( my $payload = $mq->recv() )
{
last if not defined $payload ;
my $body = $payload->{body} ;
my $dtag = $payload->{delivery_tag} ;
my ($sec) = ( $body =~ m{(d+)} ) ;
print STDERR qq{Worker $$: Received from queue $qname: $bodyn} ;
sleep $sec ;
$mq->ack($chanID,$dtag,) ;
print STDERR qq{Worker $$: Work done in $sec secondsn} ;
}
output
bronto@cooper:~/Lab/gravity/tutorial-3$ ./receive_logs.pl & ./receive_logs.pl & [1] 8886 [2] 8887 bronto@cooper:~/Lab/gravity/tutorial-3$ Bound to queue amq.gen-j_2BRhSJOy7KoR-qFcEOeQ Bound to queue amq.gen-qH7OYl6EBEW3yqw0grf1DQ ./emit_log.pl 3 Message "This task will last for 4 seconds" sent to exchange gravity.xchecks Message "This task will last for 5 seconds" sent to exchange gravity.xchecks Message "This task will last for 7 seconds" sent to exchange gravity.xchecks Worker 8887: Received from queue amq.gen-j_2BRhSJOy7KoR-qFcEOeQ: This task will last for 4 seconds Worker 8886: Received from queue amq.gen-qH7OYl6EBEW3yqw0grf1DQ: This task will last for 4 seconds bronto@cooper:~/Lab/gravity/tutorial-3$ Worker 8887: Work done in 4 seconds Worker 8887: Received from queue amq.gen-j_2BRhSJOy7KoR-qFcEOeQ: This task will last for 5 seconds Worker 8886: Work done in 4 seconds Worker 8886: Received from queue amq.gen-qH7OYl6EBEW3yqw0grf1DQ: This task will last for 5 seconds Worker 8887: Work done in 5 seconds Worker 8887: Received from queue amq.gen-j_2BRhSJOy7KoR-qFcEOeQ: This task will last for 7 seconds Worker 8886: Work done in 5 seconds Worker 8886: Received from queue amq.gen-qH7OYl6EBEW3yqw0grf1DQ: This task will last for 7 seconds Worker 8887: Work done in 7 seconds Worker 8886: Work done in 7 seconds bronto@cooper:~/Lab/gravity/tutorial-3$ kill %1 %2 bronto@cooper:~/Lab/gravity/tutorial-3$ [1]- Terminated ./receive_logs.pl [2]+ Terminated ./receive_logs.pl
Notes
- I have learnt at my own expenses that using an empty string for the queue name in queue_declare is not equivalent to leave it undefined; in the latter case, the method does not what you want;
- have you noticed that we are publishing the messages to the exchange, and we don't use queues in emit_log.pl?