The second last tutorial, tutorial 5, focuses on topic routing: «we need to learn about a more complex topic exchange». … emit_log_topic.pl
#!/usr/bin/perl
use strict ;
use warnings ;
use Net::RabbitMQ ;
use Data::Dumper ;
use List::Util qw{shuffle} ;
{
# closure to return a new channel ID every time we call nextchan
my $nextchan = 1 ;
sub nextchan { return $nextchan++ } ;
}
{
# select a random pair facility.level
# facility/priority lists is taken from man syslog(3)
# OK, OK, this is a bit overkill, I know... blame on my laziness
my @facilities = qw{
auth authpriv cron daemon ftp kern local0 local1
local2 local3 local4 local5 local6 local7
lpr mail news syslog user uucp
} ;
my @levels = qw{emerg alert crit err warning notice info debug} ;
sub random_loglevel {
my ($facility) = shuffle @facilities ;
my ($level) = shuffle @levels ;
return "$facility.$level" ;
}
}
### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms = () ;
my %eparms = ( exchange_type => "topic", auto_delete => 1 ) ;
my $ename = q{gravity.xchecks} ;
my $count = $ARGV[0] ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################
my $mq = Net::RabbitMQ->new() ;
my $chanID = nextchan() ;
$mq->connect($qserver, %qparms) ;
$mq->channel_open($chanID) ;
$mq->exchange_declare($chanID,$ename,%eparms) ;
for (my $i = 1 ; $i <= $count ; $i++)
{
my $sec = 1+int(rand(10)) ;
my $loglevel = random_loglevel ;
my $message = qq{$loglevel: This task will last for $sec seconds} ;
$mq->publish($chanID,$loglevel,$message,{ exchange => $ename },) ;
print STDERR qq{Message "$message" sent to exchange $enamen} ;
}
$mq->disconnect ;
receive_logs_topic.pl
#!/usr/bin/perl
use strict ;
use warnings ;
use Net::RabbitMQ ;
die if not @ARGV > 0 ;
my ($topic) = @ARGV ;
{
# closure to return a new channel ID every time we call nextchan
my $nextchan = 1 ;
sub nextchan { return $nextchan++ } ;
}
### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms = () ;
my %eparms = ( exchange_type => "topic", auto_delete => 1 ) ;
my %consume_opts = (
consumer_tag => "worker_$$",
no_ack => 0,
exclusive => 0,
) ;
my %declare_opts = ( durable => 1, auto_delete => 1, exclusive => 1 ) ;
my $qname ;
my $ename = q{gravity.xchecks} ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################
my $mq = Net::RabbitMQ->new() ;
my $chanID = nextchan() ;
$mq->connect($qserver, %qparms) ;
$mq->channel_open($chanID) ;
$mq->exchange_declare($chanID,$ename,%eparms) ;
$qname = $mq->queue_declare($chanID,"",%declare_opts,) ;
$mq->queue_bind($chanID, $qname, $ename, $topic,) ;
print STDERR qq{Bound to queue $qname for topic $topicn} ;
$mq->consume($chanID,$qname,%consume_opts) ;
# NOTE THAT recv() is BLOCKING!!! get wasn't!
while ( my $payload = $mq->recv() )
{
last if not defined $payload ;
my $body = $payload->{body} ;
my $dtag = $payload->{delivery_tag} ;
my ($sec) = ( $body =~ m{(d+)} ) ;
print STDERR qq{Worker $$: Received from queue $qname: $bodyn} ;
sleep $sec ;
$mq->ack($chanID,$dtag,) ;
print STDERR qq{Worker $$: Work done in $sec secondsn} ;
}
output
bronto@cooper:~/Lab/gravity/tutorial-5$ Bound to queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw for topic *.emerg Bound to queue amq.gen-kL9Zb4nRaVdPklK3t9awWw for topic auth.* bronto@cooper:~/Lab/gravity/tutorial-5$ ./emit_log_topic.pl 50 Message "local4.crit: This task will last for 10 seconds" sent to exchange gravity.xchecks Message "news.err: This task will last for 1 seconds" sent to exchange gravity.xchecks Message "news.warning: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "local4.notice: This task will last for 9 seconds" sent to exchange gravity.xchecks Message "kern.alert: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "user.err: This task will last for 5 seconds" sent to exchange gravity.xchecks Message "local3.notice: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "uucp.notice: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "syslog.err: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "mail.crit: This task will last for 1 seconds" sent to exchange gravity.xchecks Message "local7.notice: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "local2.alert: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "local0.notice: This task will last for 10 seconds" sent to exchange gravity.xchecks Message "kern.notice: This task will last for 6 seconds" sent to exchange gravity.xchecks Message "cron.emerg: This task will last for 10 seconds" sent to exchange gravity.xchecks Message "daemon.emerg: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "auth.warning: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "lpr.alert: This task will last for 2 seconds" sent to exchange gravity.xchecks Message "mail.warning: This task will last for 9 seconds" sent to exchange gravity.xchecks Message "user.alert: This task will last for 2 seconds" sent to exchange gravity.xchecks Message "local7.err: This task will last for 4 seconds" sent to exchange gravity.xchecks Message "kern.info: This task will last for 9 seconds" sent to exchange gravity.xchecks Message "kern.alert: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "authpriv.err: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "ftp.emerg: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "local7.notice: This task will last for 4 seconds" sent to exchange gravity.xchecks Message "ftp.emerg: This task will last for 10 seconds" sent to exchange gravity.xchecks Message "syslog.notice: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "daemon.debug: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "ftp.notice: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "uucp.warning: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "local5.debug: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "lpr.notice: This task will last for 5 seconds" sent to exchange gravity.xchecks Message "local0.debug: This task will last for 2 seconds" sent to exchange gravity.xchecks Message "local4.notice: This task will last for 6 seconds" sent to exchange gravity.xchecks Message "syslog.alert: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "local3.err: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "user.alert: This task will last for 5 seconds" sent to exchange gravity.xchecks Message "syslog.warning: This task will last for 4 seconds" sent to exchange gravity.xchecks Message "kern.debug: This task will last for 10 seconds" sent to exchange gravity.xchecks Message "local2.err: This task will last for 9 seconds" sent to exchange gravity.xchecks Message "syslog.debug: This task will last for 10 seconds" sent to exchange gravity.xchecks Message "local7.info: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "mail.notice: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "uucp.emerg: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "local0.err: This task will last for 1 seconds" sent to exchange gravity.xchecks Message "kern.emerg: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "daemon.alert: This task will last for 2 seconds" sent to exchange gravity.xchecks Message "user.warning: This task will last for 5 seconds" sent to exchange gravity.xchecks Message "local0.debug: This task will last for 1 seconds" sent to exchange gravity.xchecks Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: cron.emerg: This task will last for 10 seconds Worker 27549: Received from queue amq.gen-kL9Zb4nRaVdPklK3t9awWw: auth.warning: This task will last for 7 seconds bronto@cooper:~/Lab/gravity/tutorial-5$ Worker 27549: Work done in 7 seconds Worker 27548: Work done in 10 seconds Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: daemon.emerg: This task will last for 8 seconds Worker 27548: Work done in 8 seconds Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: ftp.emerg: This task will last for 8 seconds Worker 27548: Work done in 8 seconds Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: ftp.emerg: This task will last for 10 seconds Worker 27548: Work done in 10 seconds Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: uucp.emerg: This task will last for 3 seconds Worker 27548: Work done in 3 seconds Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: kern.emerg: This task will last for 7 seconds Worker 27548: Work done in 7 seconds
As you can see, we emitted 50 messages with a random loglevel; the first receiver managed all the messages with level emerg, the other the only one addressed to the auth facility. Of course, a message with loglevel auth.emerg would be delivered to both receivers.
All other messages are lost.