Redoing RabbitMQ’s tutorial – part 5

The second last tutorial, tutorial 5, focuses on topic routing: «we need to learn about a more complex topic exchange». … emit_log_topic.pl

#!/usr/bin/perl

use strict ;
use warnings ;

use Net::RabbitMQ ;
use Data::Dumper ;
use List::Util qw{shuffle} ;

{
    # closure to return a new channel ID every time we call nextchan
    my $nextchan = 1 ;
    sub nextchan { return $nextchan++ } ;
}

{
  # select a random pair facility.level
  # facility/priority lists is taken from man syslog(3)
  # OK, OK, this is a bit overkill, I know... blame on my laziness
  my @facilities = qw{
		       auth authpriv cron daemon ftp kern local0 local1
		       local2 local3 local4 local5 local6 local7
		       lpr mail news syslog user uucp
		   } ;
  my @levels = qw{emerg alert crit err warning notice info debug} ;
  sub random_loglevel {
    my ($facility) = shuffle @facilities ;
    my ($level)    = shuffle @levels ;
    return "$facility.$level" ;
  }
}

### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms  = () ;
my %eparms  = ( exchange_type => "topic", auto_delete => 1 ) ;
my $ename   = q{gravity.xchecks} ;
my $count   = $ARGV[0] ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################

my $mq      = Net::RabbitMQ->new() ;
my $chanID  = nextchan() ;

$mq->connect($qserver, %qparms) ;
$mq->channel_open($chanID) ;
$mq->exchange_declare($chanID,$ename,%eparms) ;

for (my $i = 1 ; $i <= $count ; $i++)
{
    my $sec      = 1+int(rand(10)) ;
    my $loglevel = random_loglevel ;
    my $message  = qq{$loglevel: This task will last for $sec seconds} ;

    $mq->publish($chanID,$loglevel,$message,{ exchange => $ename },) ;

    print STDERR qq{Message "$message" sent to exchange $enamen} ;
}

$mq->disconnect ;

receive_logs_topic.pl

#!/usr/bin/perl

use strict ;
use warnings ;

use Net::RabbitMQ ;

die if not @ARGV > 0 ;
my ($topic) = @ARGV ;

{
    # closure to return a new channel ID every time we call nextchan
    my $nextchan = 1 ;
    sub nextchan { return $nextchan++ } ;
}

### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms = () ;
my %eparms  = ( exchange_type => "topic", auto_delete => 1 ) ;
my %consume_opts = (
		    consumer_tag => "worker_$$",
		    no_ack       => 0,
		    exclusive    => 0,
		   ) ;
my %declare_opts = ( durable => 1, auto_delete => 1, exclusive => 1 ) ;
my $qname ;
my $ename = q{gravity.xchecks} ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################


my $mq      = Net::RabbitMQ->new() ;
my $chanID  = nextchan() ;

$mq->connect($qserver, %qparms) ;
$mq->channel_open($chanID) ;
$mq->exchange_declare($chanID,$ename,%eparms) ;

$qname = $mq->queue_declare($chanID,"",%declare_opts,) ;

$mq->queue_bind($chanID, $qname, $ename, $topic,) ;
print STDERR qq{Bound to queue $qname for topic $topicn} ;

$mq->consume($chanID,$qname,%consume_opts) ;

# NOTE THAT recv() is BLOCKING!!! get wasn't!
while ( my $payload = $mq->recv() )
{
    last if not defined $payload ;
    my $body  = $payload->{body} ;
    my $dtag  = $payload->{delivery_tag} ;
    my ($sec) = ( $body =~ m{(d+)} ) ;
    print STDERR qq{Worker $$: Received from queue $qname: $bodyn} ;
    sleep $sec ;
    $mq->ack($chanID,$dtag,) ;
    print STDERR qq{Worker $$: Work done in $sec secondsn} ;
}

output

bronto@cooper:~/Lab/gravity/tutorial-5$ Bound to queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw for topic *.emerg
Bound to queue amq.gen-kL9Zb4nRaVdPklK3t9awWw for topic auth.*
bronto@cooper:~/Lab/gravity/tutorial-5$ ./emit_log_topic.pl 50
Message "local4.crit: This task will last for 10 seconds" sent to exchange gravity.xchecks
Message "news.err: This task will last for 1 seconds" sent to exchange gravity.xchecks
Message "news.warning: This task will last for 8 seconds" sent to exchange gravity.xchecks
Message "local4.notice: This task will last for 9 seconds" sent to exchange gravity.xchecks
Message "kern.alert: This task will last for 7 seconds" sent to exchange gravity.xchecks
Message "user.err: This task will last for 5 seconds" sent to exchange gravity.xchecks
Message "local3.notice: This task will last for 3 seconds" sent to exchange gravity.xchecks
Message "uucp.notice: This task will last for 3 seconds" sent to exchange gravity.xchecks
Message "syslog.err: This task will last for 3 seconds" sent to exchange gravity.xchecks
Message "mail.crit: This task will last for 1 seconds" sent to exchange gravity.xchecks
Message "local7.notice: This task will last for 8 seconds" sent to exchange gravity.xchecks
Message "local2.alert: This task will last for 7 seconds" sent to exchange gravity.xchecks
Message "local0.notice: This task will last for 10 seconds" sent to exchange gravity.xchecks
Message "kern.notice: This task will last for 6 seconds" sent to exchange gravity.xchecks
Message "cron.emerg: This task will last for 10 seconds" sent to exchange gravity.xchecks
Message "daemon.emerg: This task will last for 8 seconds" sent to exchange gravity.xchecks
Message "auth.warning: This task will last for 7 seconds" sent to exchange gravity.xchecks
Message "lpr.alert: This task will last for 2 seconds" sent to exchange gravity.xchecks
Message "mail.warning: This task will last for 9 seconds" sent to exchange gravity.xchecks
Message "user.alert: This task will last for 2 seconds" sent to exchange gravity.xchecks
Message "local7.err: This task will last for 4 seconds" sent to exchange gravity.xchecks
Message "kern.info: This task will last for 9 seconds" sent to exchange gravity.xchecks
Message "kern.alert: This task will last for 7 seconds" sent to exchange gravity.xchecks
Message "authpriv.err: This task will last for 7 seconds" sent to exchange gravity.xchecks
Message "ftp.emerg: This task will last for 8 seconds" sent to exchange gravity.xchecks
Message "local7.notice: This task will last for 4 seconds" sent to exchange gravity.xchecks
Message "ftp.emerg: This task will last for 10 seconds" sent to exchange gravity.xchecks
Message "syslog.notice: This task will last for 3 seconds" sent to exchange gravity.xchecks
Message "daemon.debug: This task will last for 7 seconds" sent to exchange gravity.xchecks
Message "ftp.notice: This task will last for 7 seconds" sent to exchange gravity.xchecks
Message "uucp.warning: This task will last for 8 seconds" sent to exchange gravity.xchecks
Message "local5.debug: This task will last for 3 seconds" sent to exchange gravity.xchecks
Message "lpr.notice: This task will last for 5 seconds" sent to exchange gravity.xchecks
Message "local0.debug: This task will last for 2 seconds" sent to exchange gravity.xchecks
Message "local4.notice: This task will last for 6 seconds" sent to exchange gravity.xchecks
Message "syslog.alert: This task will last for 8 seconds" sent to exchange gravity.xchecks
Message "local3.err: This task will last for 3 seconds" sent to exchange gravity.xchecks
Message "user.alert: This task will last for 5 seconds" sent to exchange gravity.xchecks
Message "syslog.warning: This task will last for 4 seconds" sent to exchange gravity.xchecks
Message "kern.debug: This task will last for 10 seconds" sent to exchange gravity.xchecks
Message "local2.err: This task will last for 9 seconds" sent to exchange gravity.xchecks
Message "syslog.debug: This task will last for 10 seconds" sent to exchange gravity.xchecks
Message "local7.info: This task will last for 8 seconds" sent to exchange gravity.xchecks
Message "mail.notice: This task will last for 8 seconds" sent to exchange gravity.xchecks
Message "uucp.emerg: This task will last for 3 seconds" sent to exchange gravity.xchecks
Message "local0.err: This task will last for 1 seconds" sent to exchange gravity.xchecks
Message "kern.emerg: This task will last for 7 seconds" sent to exchange gravity.xchecks
Message "daemon.alert: This task will last for 2 seconds" sent to exchange gravity.xchecks
Message "user.warning: This task will last for 5 seconds" sent to exchange gravity.xchecks
Message "local0.debug: This task will last for 1 seconds" sent to exchange gravity.xchecks
Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: cron.emerg: This task will last for 10 seconds
Worker 27549: Received from queue amq.gen-kL9Zb4nRaVdPklK3t9awWw: auth.warning: This task will last for 7 seconds
bronto@cooper:~/Lab/gravity/tutorial-5$ Worker 27549: Work done in 7 seconds
Worker 27548: Work done in 10 seconds
Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: daemon.emerg: This task will last for 8 seconds
Worker 27548: Work done in 8 seconds
Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: ftp.emerg: This task will last for 8 seconds
Worker 27548: Work done in 8 seconds
Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: ftp.emerg: This task will last for 10 seconds
Worker 27548: Work done in 10 seconds
Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: uucp.emerg: This task will last for 3 seconds
Worker 27548: Work done in 3 seconds
Worker 27548: Received from queue amq.gen-LLvwxVpO0EuG1Rv5FL5RHw: kern.emerg: This task will last for 7 seconds
Worker 27548: Work done in 7 seconds

As you can see, we emitted 50 messages with a random loglevel; the first receiver managed all the messages with level emerg, the other the only one addressed to the auth facility. Of course, a message with loglevel auth.emerg would be delivered to both receivers.

All other messages are lost.

Advertisement

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.