The second last tutorial, tutorial 5, focuses on topic routing: «we need to learn about a more complex topic exchange». … emit_log_topic.pl
#!/usr/bin/perl use strict ; use warnings ; use Net::RabbitMQ ; use Data::Dumper ; use List::Util qw{shuffle} ; { # closure to return a new channel ID every time we call nextchan my $nextchan = 1 ; sub nextchan { return $nextchan++ } ; } { # select a random pair facility.level # facility/priority lists is taken from man syslog(3) # OK, OK, this is a bit overkill, I know... blame on my laziness my @facilities = qw{ auth authpriv cron daemon ftp kern local0 local1 local2 local3 local4 local5 local6 local7 lpr mail news syslog user uucp } ; my @levels = qw{emerg alert crit err warning notice info debug} ; sub random_loglevel { my ($facility) = shuffle @facilities ; my ($level) = shuffle @levels ; return "$facility.$level" ; } } ### BEGIN CONFIGURABLE PARAMETERS ###################################### my $qserver = q{gravity} ; my %qparms = () ; my %eparms = ( exchange_type => "topic", auto_delete => 1 ) ; my $ename = q{gravity.xchecks} ; my $count = $ARGV[0] ; ### NO CONFIGURABLE PARAMETERS BELOW THIS LINE ######################### my $mq = Net::RabbitMQ->new() ; my $chanID = nextchan() ; $mq->connect($qserver, %qparms) ; $mq->channel_open($chanID) ; $mq->exchange_declare($chanID,$ename,%eparms) ; for (my $i = 1 ; $i <= $count ; $i++) { my $sec = 1+int(rand(10)) ; my $loglevel = random_loglevel ; my $message = qq{$loglevel: This task will last for $sec seconds} ; $mq->publish($chanID,$loglevel,$message,{ exchange => $ename },) ; print STDERR qq{Message "$message" sent to exchange $enamen} ; } $mq->disconnect ;
receive_logs_topic.pl
#!/usr/bin/perl use strict ; use warnings ; use Net::RabbitMQ ; die if not @ARGV > 0 ; my ($topic) = @ARGV ; { # closure to return a new channel ID every time we call nextchan my $nextchan = 1 ; sub nextchan { return $nextchan++ } ; } ### BEGIN CONFIGURABLE PARAMETERS ###################################### my $qserver = q{gravity} ; my %qparms = () ; my %eparms = ( exchange_type => "topic", auto_delete => 1 ) ; my %consume_opts = ( consumer_tag => "worker_$$", no_ack => 0, exclusive => 0, ) ; my %declare_opts = ( durable => 1, auto_delete => 1, exclusive => 1 ) ; my $qname ; my $ename = q{gravity.xchecks} ; ### NO CONFIGURABLE PARAMETERS BELOW THIS LINE ######################### my $mq = Net::RabbitMQ->new() ; my $chanID = nextchan() ; $mq->connect($qserver, %qparms) ; $mq->channel_open($chanID) ; $mq->exchange_declare($chanID,$ename,%eparms) ; $qname = $mq->queue_declare($chanID,"",%declare_opts,) ; $mq->queue_bind($chanID, $qname, $ename, $topic,) ; print STDERR qq{Bound to queue $qname for topic $topicn} ; $mq->consume($chanID,$qname,%consume_opts) ; # NOTE THAT recv() is BLOCKING!!! get wasn't! while ( my $payload = $mq->recv() ) { last if not defined $payload ; my $body = $payload->{body} ; my $dtag = $payload->{delivery_tag} ; my ($sec) = ( $body =~ m{(d+)} ) ; print STDERR qq{Worker $$: Received from queue $qname: $bodyn} ; sleep $sec ; $mq->ack($chanID,$dtag,) ; print STDERR qq{Worker $$: Work done in $sec secondsn} ; }
output
bronto@cooper:~/Lab/gravity/tutorial-5$ Bound to queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw for topic *.emerg Bound to queue amq.gen-kL9Zb4nRaVdPklK3t9awWw for topic auth.* bronto@cooper:~/Lab/gravity/tutorial-5$ ./emit_log_topic.pl 50 Message "local4.crit: This task will last for 10 seconds" sent to exchange gravity.xchecks Message "news.err: This task will last for 1 seconds" sent to exchange gravity.xchecks Message "news.warning: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "local4.notice: This task will last for 9 seconds" sent to exchange gravity.xchecks Message "kern.alert: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "user.err: This task will last for 5 seconds" sent to exchange gravity.xchecks Message "local3.notice: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "uucp.notice: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "syslog.err: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "mail.crit: This task will last for 1 seconds" sent to exchange gravity.xchecks Message "local7.notice: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "local2.alert: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "local0.notice: This task will last for 10 seconds" sent to exchange gravity.xchecks Message "kern.notice: This task will last for 6 seconds" sent to exchange gravity.xchecks Message "cron.emerg: This task will last for 10 seconds" sent to exchange gravity.xchecks Message "daemon.emerg: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "auth.warning: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "lpr.alert: This task will last for 2 seconds" sent to exchange gravity.xchecks Message "mail.warning: This task will last for 9 seconds" sent to exchange gravity.xchecks Message "user.alert: This task will last for 2 seconds" sent to exchange gravity.xchecks Message "local7.err: This task will last for 4 seconds" sent to exchange gravity.xchecks Message "kern.info: This task will last for 9 seconds" sent to exchange gravity.xchecks Message "kern.alert: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "authpriv.err: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "ftp.emerg: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "local7.notice: This task will last for 4 seconds" sent to exchange gravity.xchecks Message "ftp.emerg: This task will last for 10 seconds" sent to exchange gravity.xchecks Message "syslog.notice: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "daemon.debug: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "ftp.notice: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "uucp.warning: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "local5.debug: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "lpr.notice: This task will last for 5 seconds" sent to exchange gravity.xchecks Message "local0.debug: This task will last for 2 seconds" sent to exchange gravity.xchecks Message "local4.notice: This task will last for 6 seconds" sent to exchange gravity.xchecks Message "syslog.alert: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "local3.err: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "user.alert: This task will last for 5 seconds" sent to exchange gravity.xchecks Message "syslog.warning: This task will last for 4 seconds" sent to exchange gravity.xchecks Message "kern.debug: This task will last for 10 seconds" sent to exchange gravity.xchecks Message "local2.err: This task will last for 9 seconds" sent to exchange gravity.xchecks Message "syslog.debug: This task will last for 10 seconds" sent to exchange gravity.xchecks Message "local7.info: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "mail.notice: This task will last for 8 seconds" sent to exchange gravity.xchecks Message "uucp.emerg: This task will last for 3 seconds" sent to exchange gravity.xchecks Message "local0.err: This task will last for 1 seconds" sent to exchange gravity.xchecks Message "kern.emerg: This task will last for 7 seconds" sent to exchange gravity.xchecks Message "daemon.alert: This task will last for 2 seconds" sent to exchange gravity.xchecks Message "user.warning: This task will last for 5 seconds" sent to exchange gravity.xchecks Message "local0.debug: This task will last for 1 seconds" sent to exchange gravity.xchecks Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: cron.emerg: This task will last for 10 seconds Worker 27549: Received from queue amq.gen-kL9Zb4nRaVdPklK3t9awWw: auth.warning: This task will last for 7 seconds bronto@cooper:~/Lab/gravity/tutorial-5$ Worker 27549: Work done in 7 seconds Worker 27548: Work done in 10 seconds Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: daemon.emerg: This task will last for 8 seconds Worker 27548: Work done in 8 seconds Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: ftp.emerg: This task will last for 8 seconds Worker 27548: Work done in 8 seconds Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: ftp.emerg: This task will last for 10 seconds Worker 27548: Work done in 10 seconds Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: uucp.emerg: This task will last for 3 seconds Worker 27548: Work done in 3 seconds Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: kern.emerg: This task will last for 7 seconds Worker 27548: Work done in 7 seconds
As you can see, we emitted 50 messages with a random loglevel; the first receiver managed all the messages with level emerg, the other the only one addressed to the auth facility. Of course, a message with loglevel auth.emerg would be delivered to both receivers.
All other messages are lost.