Next step: «we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe"». … emit_log.pl
#!/usr/bin/perl use strict ; use warnings ; use Net::RabbitMQ ; use Data::Dumper ; { # closure to return a new channel ID every time we call nextchan my $nextchan = 1 ; sub nextchan { return $nextchan++ } ; } ### BEGIN CONFIGURABLE PARAMETERS ###################################### my $qserver = q{gravity} ; my %qparms = () ; my %eparms = ( exchange_type => "fanout" ) ; my $ename = q{gravity.xchecks} ; my $count = $ARGV[0] ; ### NO CONFIGURABLE PARAMETERS BELOW THIS LINE ######################### my $mq = Net::RabbitMQ->new() ; my $chanID = nextchan() ; $mq->connect($qserver, %qparms) ; $mq->channel_open($chanID) ; $mq->exchange_declare($chanID,$ename,%eparms) ; for (my $i = 1 ; $i <= $count ; $i++) { my $sec = 1+int(rand(10)) ; my $message = qq{This task will last for $sec seconds} ; $mq->publish($chanID,"",$message,{ exchange => $ename },) ; print STDERR qq{Message "$message" sent to exchange $enamen} ; } $mq->disconnect ;
receive_logs.pl
#!/usr/bin/perl use strict ; use warnings ; use Net::RabbitMQ ; { # closure to return a new channel ID every time we call nextchan my $nextchan = 1 ; sub nextchan { return $nextchan++ } ; } ### BEGIN CONFIGURABLE PARAMETERS ###################################### my $qserver = q{gravity} ; my %qparms = () ; my %eparms = ( exchange_type => "fanout" ) ; my %consume_opts = ( consumer_tag => "worker_$$", no_ack => 0, exclusive => 0, ) ; my %declare_opts = ( durable => 1, auto_delete => 1, exclusive => 1 ) ; my $qname ; my $ename = q{gravity.xchecks} ; ### NO CONFIGURABLE PARAMETERS BELOW THIS LINE ######################### my $mq = Net::RabbitMQ->new() ; my $chanID = nextchan() ; $mq->connect($qserver, %qparms) ; $mq->channel_open($chanID) ; $mq->exchange_declare($chanID,$ename,%eparms) ; $qname = $mq->queue_declare($chanID,"",%declare_opts,) ; $mq->queue_bind($chanID, $qname, $ename, "",) ; print STDERR qq{Bound to queue $qnamen} ; $mq->consume($chanID,$qname,%consume_opts) ; # NOTE THAT recv() is BLOCKING!!! get wasn't! while ( my $payload = $mq->recv() ) { last if not defined $payload ; my $body = $payload->{body} ; my $dtag = $payload->{delivery_tag} ; my ($sec) = ( $body =~ m{(d+)} ) ; print STDERR qq{Worker $$: Received from queue $qname: $bodyn} ; sleep $sec ; $mq->ack($chanID,$dtag,) ; print STDERR qq{Worker $$: Work done in $sec secondsn} ; }
output
bronto@cooper:~/Lab/gravity/tutorial-3$ ./receive_logs.pl & ./receive_logs.pl & [1] 8886 [2] 8887 bronto@cooper:~/Lab/gravity/tutorial-3$ Bound to queue amq.gen-j_2BRhSJOy7KoR-qFcEOeQ Bound to queue amq.gen-qH7OYl6EBEW3yqw0grf1DQ ./emit_log.pl 3 Message "This task will last for 4 seconds" sent to exchange gravity.xchecks Message "This task will last for 5 seconds" sent to exchange gravity.xchecks Message "This task will last for 7 seconds" sent to exchange gravity.xchecks Worker 8887: Received from queue amq.gen-j_2BRhSJOy7KoR-qFcEOeQ: This task will last for 4 seconds Worker 8886: Received from queue amq.gen-qH7OYl6EBEW3yqw0grf1DQ: This task will last for 4 seconds bronto@cooper:~/Lab/gravity/tutorial-3$ Worker 8887: Work done in 4 seconds Worker 8887: Received from queue amq.gen-j_2BRhSJOy7KoR-qFcEOeQ: This task will last for 5 seconds Worker 8886: Work done in 4 seconds Worker 8886: Received from queue amq.gen-qH7OYl6EBEW3yqw0grf1DQ: This task will last for 5 seconds Worker 8887: Work done in 5 seconds Worker 8887: Received from queue amq.gen-j_2BRhSJOy7KoR-qFcEOeQ: This task will last for 7 seconds Worker 8886: Work done in 5 seconds Worker 8886: Received from queue amq.gen-qH7OYl6EBEW3yqw0grf1DQ: This task will last for 7 seconds Worker 8887: Work done in 7 seconds Worker 8886: Work done in 7 seconds bronto@cooper:~/Lab/gravity/tutorial-3$ kill %1 %2 bronto@cooper:~/Lab/gravity/tutorial-3$ [1]- Terminated ./receive_logs.pl [2]+ Terminated ./receive_logs.pl
Notes
- I have learnt at my own expenses that using an empty string for the queue name in queue_declare is not equivalent to leave it undefined; in the latter case, the method does not what you want;
- have you noticed that we are publishing the messages to the exchange, and we don't use queues in emit_log.pl?