Fubaredness Is Contagious

by Dmitriy Samovskiy

Fubaredness Is Contagious header image 2

Ruby + AMQP + RabbitMQ Example

June 24th, 2008 · 4 Comments

In this post I would like to show how one can exchange messages using AMQP protocol from Ruby, using RabbitMQ as a broker. I posted the original version of this script to rabbitmq-discuss mailing list back in September 2007.

Prerequesites:

  • RabbitMQ broker configured, up and running on 127.0.0.1 (localhost) on port 5672 (standard AMQP port).
  • Apache QPid Ruby library installed within RUBYPATH (svn co http://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/ruby)
  • AMQP specification XML form AMQP official site saved as /etc/amqp0-8.xml

You can also download this script from here.

#!/usr/bin/ruby -I/usr/local/qpid-svn/qpid/ruby
#
#
__doc__ = %q(
 
disttailf.rb - distributed "tail -f"
 
Aggregates "tail -f" output from multiple machines and multiple files
into a single RabbitMQ pubsub queue (kind of splunk's log consolidation
function)
 
Usage:
Producer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] file ...
Consumer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] -c
 
)
 
require 'qpid'
require 'socket'
 
def consumer(client, ch)
    myqueue = ch.queue_declare()
    ch.queue_bind(:queue=>myqueue.queue, :exchange=>'amq.topic',
                    :routing_key=>'disttailf.#')
    cons = ch.basic_consume(:queue=>myqueue.queue, :no_ack => true)
    ruby_queue = client.queue(cons.consumer_tag)
 
    while true
        raise "Rabbitmq broker disconnected" if client.closed?
        begin
          msg = ruby_queue.pop(non_block=true)
          puts "== #{msg.content.headers[:headers]} " \
                "#{msg.routing_key.split('.')[-1]}"
          puts msg.content.body
        rescue
          sleep(0.5)
        end
    end
end
 
def producer(client, ch, filenames)
    rkey = "disttailf." + Socket.gethostname.split('.')[-1]
    tail_f(filenames) do |filename, line|
        h = {'sent' => Time.now.to_i, 'filename' => filename }
        c = Qpid::Content.new({:headers=>h}, line)
        ch.basic_publish(:routing_key=>rkey, :content=>c,
                            :exchange=>'amq.topic')
        puts "#{filename}: #{line}"
    end
end
 
def tail_f(filenames, &block)
    filedict = Hash.new
    filenames.each { |f| filedict[f] = open_or_nil(f) }
    reopen_counter = 0
    while true:
        if reopen_counter > 120
            reopen_counter = 0
            filenames.reject { |f| filedict[f] }.each {
                |f| filedict[f] = open_or_nil(f) }
        end
 
        filedict.values.reject { |f| not f }.each do |f|
            begin
                raise "trunc" unless File.stat(f.path).size >= f.tell
            rescue
                $stderr << "#{f.path}: removed or truncated\n"
                f.close
                filedict[f.path] = nil
                next
            end
 
            begin
              block.call(f.path,f.readline) while true
            rescue EOFError
              true
            end
        end
 
        reopen_counter += 1
        sleep(0.5)
   end # while true
end
 
def open_or_nil(filename)
    begin
        File.open(filename)
    rescue
        nil
    end
end
 
if __FILE__ == $0
    require 'getoptlong'
 
    server = '127.0.0.1'
    port = 5672
    specxml = '/etc/amqp0-8.xml'
    acts_as_consumer = false
 
    opts = GetoptLong.new(
        ['--server', '-s', GetoptLong::REQUIRED_ARGUMENT],
        ['--port', '-p', GetoptLong::REQUIRED_ARGUMENT],
        ['--specxml', '-x', GetoptLong::REQUIRED_ARGUMENT],
        ['--consume', '-c', GetoptLong::NO_ARGUMENT])
    opts.each do |opt,arg|
      case opt
        when '--server'
            server = arg
        when '--port'
            port = arg.to_i
        when '--specxml'
            specxml = arg
        when '--consume'
            acts_as_consumer = true
      end
    end
 
    # set up connection to rabbitmq broker
    client = Qpid::Client.new(server, port, spec=Spec.load(specxml))
    client.start({ "LOGIN" => "guest", "PASSWORD" => "guest" })
    ch = client.channel(1)
    ch.channel_open()
 
    if acts_as_consumer
        consumer(client, ch)
    else
        if ARGV.length == 0
            puts __doc__
            raise "List of file names is empty - nothing to do"
        end
        producer(client, ch, ARGV)
    end
 
end

Tags: rabbitmq · ruby

4 responses so far ↓

  • 1 Dmitriy // Aug 23, 2008 at 6:49 am

    Hi,

    I am looking for basic.get operation in QPID Ruby.
    Can you provide an example on that, please?


    Regards
    Dmitriy

  • 2 Dmitriy // Aug 23, 2008 at 10:21 am

    I will try to write an example with basic.get in the next couple of weeks…

  • 3 Brian // Aug 24, 2008 at 3:53 pm

    Dmitriy,

    I get the following error when I run this:

    [root@newobj002 mq]# ./tail.rb -x amqp.0-8.xml -c
    CONNECTION CLOSED: 501, FRAME_ERROR - cannot decode <>, 60, 20
    writer Qpid::Closed
    ./qpid/queue.rb:41:in `pop’: Qpid::Closed (Qpid::Closed)
    from ./qpid/peer.rb:208:in `invoke’
    from ./qpid/peer.rb:190:in `method_missing’
    from ./tail.rb:25:in `consumer’
    from ./tail.rb:126

    I tried 0-9 spec and then get the following:

    [root@newobj002 mq]# ./tail.rb -x amqp.0-9.xml -c
    ./qpid/spec.rb:242: warning: Object#type is deprecated; use Object#class
    Codec::EOF
    ./qpid/codec.rb:247:in `read’
    ./qpid/codec.rb:255:in `unpack’
    ./qpid/codec.rb:198:in `long’
    ./qpid/codec.rb:217:in `longstr’
    ./qpid/codec.rb:176:in `send’
    ./qpid/codec.rb:176:in `decode’
    ./qpid/connection.rb:141:in `decode’
    ./qpid/connection.rb:141:in `map’
    ./qpid/connection.rb:141:in `decode’
    ./qpid/connection.rb:102:in `decode’
    ./qpid/connection.rb:61:in `read’
    ./qpid/peer.rb:90:in `reader’
    ./qpid/peer.rb:74:in `send’
    ./qpid/peer.rb:74:in `spawn’
    ./qpid/peer.rb:72:in `initialize’
    ./qpid/peer.rb:72:in `new’
    ./qpid/peer.rb:72:in `spawn’
    ./qpid/peer.rb:55:in `start’
    ./qpid/client.rb:83:in `start’
    ./tail.rb:121
    deadlock 0×7f8cff2b2698: sleep:- - ./qpid/queue.rb:36
    deadlock 0×7f8cff992e18: sleep:- (main) - ./qpid/queue.rb:36
    deadlock 0×7f8cff2b1c98: sleep:- - ./qpid/queue.rb:36
    ./qpid/peer.rb:72:in `pop’: Thread(0×7f8cff2b1fe0): deadlock (fatal)
    from ./qpid/peer.rb:208:in `invoke’
    from ./qpid/peer.rb:190:in `method_missing’
    from ./tail.rb:123

    Any ideas?

    The rabbitmq i’m running is the latest rpm off the rabbitmq website.

  • 4 Dmitriy // Aug 24, 2008 at 4:56 pm

    Brian,

    Are you sure you use official AMQ
    P spec, not the one that comes with Qpid? Filename is amqp0-8.xml and it is available from http://jira.amqp.org/confluence/display/AMQP/Download

Leave a Comment