Redoing RabbitMQ’s tutorial – part 4

RabbitMQ's tutorial 4' scope is: subscribe only to a subset of the messages.

The following shows an implementation of that tutorial (sort of) in Perl with Net::RabbitMQ. As previously, the code is just sketched out and definitely not an example of style: it just aims to show how things work. Once I'll get all the tutorials sorted out in Perl, I'll build on these sketches to create something "real".

Enjoy! …emit_log_direct.pl

#!/usr/bin/perl

use strict ;
use warnings ;

use Net::RabbitMQ ;
use Data::Dumper ;
use List::Util qw{shuffle} ;

{
    # closure to return a new channel ID every time we call nextchan
    my $nextchan = 1 ;
    sub nextchan { return $nextchan++ } ;
}

{
  # select a random severity
  # OK, OK, this is a bit overkill, I know... blame on my laziness
  my @severities = qw{info warning error} ;
  sub random_severity {
    my ($severity) = shuffle @severities ;
    return $severity ;
  }
}

### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms  = () ;
my %eparms  = ( exchange_type => "direct", auto_delete => 1 ) ;
my $ename   = q{gravity.xchecks} ;
my $count   = $ARGV[0] ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################

my $mq      = Net::RabbitMQ->new() ;
my $chanID  = nextchan() ;

$mq->connect($qserver, %qparms) ;
$mq->channel_open($chanID) ;
$mq->exchange_declare($chanID,$ename,%eparms) ;

for (my $i = 1 ; $i <= $count ; $i++)
{
    my $sec      = 1+int(rand(10)) ;
    my $severity = random_severity ;
    my $message  = qq{$severity: This task will last for $sec seconds} ;

    $mq->publish($chanID,$severity,$message,{ exchange => $ename },) ;

    print STDERR qq{Message "$message" sent to exchange $enamen} ;
}

$mq->disconnect ;

receive_logs_direct.pl

#!/usr/bin/perl

use strict ;
use warnings ;

use Net::RabbitMQ ;

die if not @ARGV > 0 ;
my @severities = @ARGV ;

{
    # closure to return a new channel ID every time we call nextchan
    my $nextchan = 1 ;
    sub nextchan { return $nextchan++ } ;
}

### BEGIN CONFIGURABLE PARAMETERS ######################################
my $qserver = q{gravity} ;
my %qparms = () ;
my %eparms  = ( exchange_type => "direct", auto_delete => 1 ) ;
my %consume_opts = (
		    consumer_tag => "worker_$$",
		    no_ack       => 0,
		    exclusive    => 0,
		   ) ;
my %declare_opts = ( durable => 1, auto_delete => 1, exclusive => 1 ) ;
my $qname ;
my $ename = q{gravity.xchecks} ;
### NO CONFIGURABLE PARAMETERS BELOW THIS LINE #########################


my $mq      = Net::RabbitMQ->new() ;
my $chanID  = nextchan() ;

$mq->connect($qserver, %qparms) ;
$mq->channel_open($chanID) ;
$mq->exchange_declare($chanID,$ename,%eparms) ;

$qname = $mq->queue_declare($chanID,"",%declare_opts,) ;

foreach my $severity (@severities) {
  $mq->queue_bind($chanID, $qname, $ename, $severity,) ;
  print STDERR qq{Bound to queue $qname for severity $severityn} ;
}

$mq->consume($chanID,$qname,%consume_opts) ;

# NOTE THAT recv() is BLOCKING!!! get wasn't!
while ( my $payload = $mq->recv() )
{
    last if not defined $payload ;
    my $body  = $payload->{body} ;
    my $dtag  = $payload->{delivery_tag} ;
    my ($sec) = ( $body =~ m{(d+)} ) ;
    print STDERR qq{Worker $$: Received from queue $qname: $bodyn} ;
    sleep $sec ;
    $mq->ack($chanID,$dtag,) ;
    print STDERR qq{Worker $$: Work done in $sec secondsn} ;
}

output

bronto@cooper:~/Lab/gravity/tutorial-4$ ./receive_logs_direct.pl error & ./receive_logs_direct.pl info error warning &
[3] 21437
[4] 21438
bronto@cooper:~/Lab/gravity/tutorial-4$ Bound to queue amq.gen-6ILylE6ZHkEpWWWuG3tbqg for severity error
Bound to queue amq.gen-G0ieutjZgEyV6YaRkIgMog for severity info
Bound to queue amq.gen-G0ieutjZgEyV6YaRkIgMog for severity error
Bound to queue amq.gen-G0ieutjZgEyV6YaRkIgMog for severity warning

bronto@cooper:~/Lab/gravity/tutorial-4$ ./emit_log_direct.pl 5
Message "warning: This task will last for 9 seconds" sent to exchange gravity.xchecks
Message "error: This task will last for 3 seconds" sent to exchange gravity.xchecks
Message "warning: This task will last for 8 seconds" sent to exchange gravity.xchecks
Message "warning: This task will last for 7 seconds" sent to exchange gravity.xchecks
Message "warning: This task will last for 9 seconds" sent to exchange gravity.xchecks
Worker 21438: Received from queue amq.gen-G0ieutjZgEyV6YaRkIgMog: warning: This task will last for 9 seconds
Worker 21437: Received from queue amq.gen-6ILylE6ZHkEpWWWuG3tbqg: error: This task will last for 3 seconds
bronto@cooper:~/Lab/gravity/tutorial-4$ Worker 21437: Work done in 3 seconds
Worker 21438: Work done in 9 seconds
Worker 21438: Received from queue amq.gen-G0ieutjZgEyV6YaRkIgMog: error: This task will last for 3 seconds
Worker 21438: Work done in 3 seconds
Worker 21438: Received from queue amq.gen-G0ieutjZgEyV6YaRkIgMog: warning: This task will last for 8 seconds
Worker 21438: Work done in 8 seconds
Worker 21438: Received from queue amq.gen-G0ieutjZgEyV6YaRkIgMog: warning: This task will last for 7 seconds
Worker 21438: Work done in 7 seconds
Worker 21438: Received from queue amq.gen-G0ieutjZgEyV6YaRkIgMog: warning: This task will last for 9 seconds
Worker 21438: Work done in 9 seconds

As you can see, one of the two receivers processed the only "error" message immediately, and then remained quiet for the rest of the time, while the other receiver processed everything, including the error message processed by its mate.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s