RabbitMQ's tutorial 4' scope is: subscribe only to a subset of the messages.
The following shows an implementation of that tutorial (sort of) in Perl with Net::RabbitMQ. As previously, the code is just sketched out and definitely not an example of style: it just aims to show how things work. Once I'll get all the tutorials sorted out in Perl, I'll build on these sketches to create something "real".
Enjoy! …emit_log_direct.pl
#!/usr/bin/perl use strict ; use warnings ; use Net::RabbitMQ ; use Data::Dumper ; use List::Util qw{shuffle} ; { # closure to return a new channel ID every time we call nextchan my $nextchan = 1 ; sub nextchan { return $nextchan++ } ; } { # select a random severity # OK, OK, this is a bit overkill, I know... blame on my laziness my @severities = qw{info warning error} ; sub random_severity { my ($severity) = shuffle @severities ; return $severity ; } } ### BEGIN CONFIGURABLE PARAMETERS ###################################### my $qserver = q{gravity} ; my %qparms = () ; my %eparms = ( exchange_type => "direct", auto_delete => 1 ) ; my $ename = q{gravity.xchecks} ; my $count = $ARGV[0] ; ### NO CONFIGURABLE PARAMETERS BELOW THIS LINE ######################### my $mq = Net::RabbitMQ->new() ; my $chanID = nextchan() ; $mq->connect($qserver, %qparms) ; $mq->channel_open($chanID) ; $mq->exchange_declare($chanID,$ename,%eparms) ; for (my $i = 1 ; $i <= $count ; $i++) { my $sec = 1+int(rand(10)) ; my $severity = random_severity ; my $message = qq{$severity: This task will last for $sec seconds} ; $mq->publish($chanID,$severity,$message,{ exchange => $ename },) ; print STDERR qq{Message "$message" sent to exchange $enamen} ; } $mq->disconnect ;
receive_logs_direct.pl
#!/usr/bin/perl use strict ; use warnings ; use Net::RabbitMQ ; die if not @ARGV > 0 ; my @severities = @ARGV ; { # closure to return a new channel ID every time we call nextchan my $nextchan = 1 ; sub nextchan { return $nextchan++ } ; } ### BEGIN CONFIGURABLE PARAMETERS ###################################### my $qserver = q{gravity} ; my %qparms = () ; my %eparms = ( exchange_type => "direct", auto_delete => 1 ) ; my %consume_opts = ( consumer_tag => "worker_$$", no_ack => 0, exclusive => 0, ) ; my %declare_opts = ( durable => 1, auto_delete => 1, exclusive => 1 ) ; my $qname ; my $ename = q{gravity.xchecks} ; ### NO CONFIGURABLE PARAMETERS BELOW THIS LINE ######################### my $mq = Net::RabbitMQ->new() ; my $chanID = nextchan() ; $mq->connect($qserver, %qparms) ; $mq->channel_open($chanID) ; $mq->exchange_declare($chanID,$ename,%eparms) ; $qname = $mq->queue_declare($chanID,"",%declare_opts,) ; foreach my $severity (@severities) { $mq->queue_bind($chanID, $qname, $ename, $severity,) ; print STDERR qq{Bound to queue $qname for severity $severityn} ; } $mq->consume($chanID,$qname,%consume_opts) ; # NOTE THAT recv() is BLOCKING!!! get wasn't! while ( my $payload = $mq->recv() ) { last if not defined $payload ; my $body = $payload->{body} ; my $dtag = $payload->{delivery_tag} ; my ($sec) = ( $body =~ m{(d+)} ) ; print STDERR qq{Worker $$: Received from queue $qname: $bodyn} ; sleep $sec ; $mq->ack($chanID,$dtag,) ; print STDERR qq{Worker $$: Work done in $sec secondsn} ; }
output
bronto@cooper:~/Lab/gravity/tutorial-4$ ./receive_logs_direct.pl error & ./receive_logs_direct.pl info error warning & [3] 21437 [4] 21438 bronto@cooper:~/Lab/gravity/tutorial-4$ Bound to queue amq.gen-6ILylE6ZHkEpWWWuG3tbqg for severity error Bound to queue amq.gen-G0ieutjZgEyV6YaRkIgMog for severity info Bound to queue amq.gen-G0ieutjZgEyV6YaRkIgMog for severity error Bound to queue amq.gen-G0ieutjZgEyV6YaRkIgMog for severity warning bronto@cooper:~/Lab/gravity/tutorial-4$ ./emit_log_direct.pl 5 Message "warning: This task will last for 9 seconds" sent to exchange gravity.xchecks Message "error: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "warning: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "warning: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "warning: This task will last for 9 seconds" sent to exchange gravity.xchecks Worker 21438: Received from queue amq.gen-G0ieutjZgEyV6YaRkIgMog: warning: This task will last for 9 seconds Worker 21437: Received from queue amq.gen-6ILylE6ZHkEpWWWuG3tbqg: error: This task will last for 3 seconds bronto@cooper:~/Lab/gravity/tutorial-4$ Worker 21437: Work done in 3 seconds Worker 21438: Work done in 9 seconds Worker 21438: Received from queue amq.gen-G0ieutjZgEyV6YaRkIgMog: error: This task will last for 3 seconds Worker 21438: Work done in 3 seconds Worker 21438: Received from queue amq.gen-G0ieutjZgEyV6YaRkIgMog: warning: This task will last for 8 seconds Worker 21438: Work done in 8 seconds Worker 21438: Received from queue amq.gen-G0ieutjZgEyV6YaRkIgMog: warning: This task will last for 7 seconds Worker 21438: Work done in 7 seconds Worker 21438: Received from queue amq.gen-G0ieutjZgEyV6YaRkIgMog: warning: This task will last for 9 seconds Worker 21438: Work done in 9 seconds
As you can see, one of the two receivers processed the only "error" message immediately, and then remained quiet for the rest of the time, while the other receiver processed everything, including the error message processed by its mate.