In this post I would like to show how one can exchange messages using AMQP protocol from Ruby, using RabbitMQ as a broker. I posted the original version of this script to rabbitmq-discuss mailing list back in September 2007.
Prerequesites:
- RabbitMQ broker configured, up and running on 127.0.0.1 (localhost) on port 5672 (standard AMQP port).
- Apache QPid Ruby library installed within RUBYPATH (svn co http://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/ruby)
- AMQP specification XML form AMQP official site saved as /etc/amqp0-8.xml
You can also download this script from here.
#!/usr/bin/ruby -I/usr/local/qpid-svn/qpid/ruby # # __doc__ = %q( disttailf.rb - distributed "tail -f" Aggregates "tail -f" output from multiple machines and multiple files into a single RabbitMQ pubsub queue (kind of splunk's log consolidation function) Usage: Producer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] file ... Consumer: disttailf.rb [-s broker_host] [-p broker_port] [-x spec_xml] -c ) require 'qpid' require 'socket' def consumer(client, ch) myqueue = ch.queue_declare() ch.queue_bind(:queue=>myqueue.queue, :exchange=>'amq.topic', :routing_key=>'disttailf.#') cons = ch.basic_consume(:queue=>myqueue.queue, :no_ack => true) ruby_queue = client.queue(cons.consumer_tag) while true raise "Rabbitmq broker disconnected" if client.closed? begin msg = ruby_queue.pop(non_block=true) puts "== #{msg.content.headers[:headers]} " \ "#{msg.routing_key.split('.')[-1]}" puts msg.content.body rescue sleep(0.5) end end end def producer(client, ch, filenames) rkey = "disttailf." + Socket.gethostname.split('.')[-1] tail_f(filenames) do |filename, line| h = {'sent' => Time.now.to_i, 'filename' => filename } c = Qpid::Content.new({:headers=>h}, line) ch.basic_publish(:routing_key=>rkey, :content=>c, :exchange=>'amq.topic') puts "#{filename}: #{line}" end end def tail_f(filenames, &block) filedict = Hash.new filenames.each { |f| filedict[f] = open_or_nil(f) } reopen_counter = 0 while true: if reopen_counter > 120 reopen_counter = 0 filenames.reject { |f| filedict[f] }.each { |f| filedict[f] = open_or_nil(f) } end filedict.values.reject { |f| not f }.each do |f| begin raise "trunc" unless File.stat(f.path).size >= f.tell rescue $stderr << "#{f.path}: removed or truncated\n" f.close filedict[f.path] = nil next end begin block.call(f.path,f.readline) while true rescue EOFError true end end reopen_counter += 1 sleep(0.5) end # while true end def open_or_nil(filename) begin File.open(filename) rescue nil end end if __FILE__ == $0 require 'getoptlong' server = '127.0.0.1' port = 5672 specxml = '/etc/amqp0-8.xml' acts_as_consumer = false opts = GetoptLong.new( ['--server', '-s', GetoptLong::REQUIRED_ARGUMENT], ['--port', '-p', GetoptLong::REQUIRED_ARGUMENT], ['--specxml', '-x', GetoptLong::REQUIRED_ARGUMENT], ['--consume', '-c', GetoptLong::NO_ARGUMENT]) opts.each do |opt,arg| case opt when '--server' server = arg when '--port' port = arg.to_i when '--specxml' specxml = arg when '--consume' acts_as_consumer = true end end # set up connection to rabbitmq broker client = Qpid::Client.new(server, port, spec=Spec.load(specxml)) client.start({ "LOGIN" => "guest", "PASSWORD" => "guest" }) ch = client.channel(1) ch.channel_open() if acts_as_consumer consumer(client, ch) else if ARGV.length == 0 puts __doc__ raise "List of file names is empty - nothing to do" end producer(client, ch, ARGV) end end
4 responses so far ↓
1 Dmitriy // Aug 23, 2008 at 6:49 am
Hi,
I am looking for basic.get operation in QPID Ruby.
Can you provide an example on that, please?
–
Regards
Dmitriy
2 Dmitriy // Aug 23, 2008 at 10:21 am
I will try to write an example with basic.get in the next couple of weeks…
3 Brian // Aug 24, 2008 at 3:53 pm
Dmitriy,
I get the following error when I run this:
[root@newobj002 mq]# ./tail.rb -x amqp.0-8.xml -c
CONNECTION CLOSED: 501, FRAME_ERROR - cannot decode <>, 60, 20
writer Qpid::Closed
./qpid/queue.rb:41:in `pop’: Qpid::Closed (Qpid::Closed)
from ./qpid/peer.rb:208:in `invoke’
from ./qpid/peer.rb:190:in `method_missing’
from ./tail.rb:25:in `consumer’
from ./tail.rb:126
I tried 0-9 spec and then get the following:
[root@newobj002 mq]# ./tail.rb -x amqp.0-9.xml -c
./qpid/spec.rb:242: warning: Object#type is deprecated; use Object#class
Codec::EOF
./qpid/codec.rb:247:in `read’
./qpid/codec.rb:255:in `unpack’
./qpid/codec.rb:198:in `long’
./qpid/codec.rb:217:in `longstr’
./qpid/codec.rb:176:in `send’
./qpid/codec.rb:176:in `decode’
./qpid/connection.rb:141:in `decode’
./qpid/connection.rb:141:in `map’
./qpid/connection.rb:141:in `decode’
./qpid/connection.rb:102:in `decode’
./qpid/connection.rb:61:in `read’
./qpid/peer.rb:90:in `reader’
./qpid/peer.rb:74:in `send’
./qpid/peer.rb:74:in `spawn’
./qpid/peer.rb:72:in `initialize’
./qpid/peer.rb:72:in `new’
./qpid/peer.rb:72:in `spawn’
./qpid/peer.rb:55:in `start’
./qpid/client.rb:83:in `start’
./tail.rb:121
deadlock 0×7f8cff2b2698: sleep:- - ./qpid/queue.rb:36
deadlock 0×7f8cff992e18: sleep:- (main) - ./qpid/queue.rb:36
deadlock 0×7f8cff2b1c98: sleep:- - ./qpid/queue.rb:36
./qpid/peer.rb:72:in `pop’: Thread(0×7f8cff2b1fe0): deadlock (fatal)
from ./qpid/peer.rb:208:in `invoke’
from ./qpid/peer.rb:190:in `method_missing’
from ./tail.rb:123
Any ideas?
The rabbitmq i’m running is the latest rpm off the rabbitmq website.
4 Dmitriy // Aug 24, 2008 at 4:56 pm
Brian,
Are you sure you use official AMQ
P spec, not the one that comes with Qpid? Filename is amqp0-8.xml and it is available from http://jira.amqp.org/confluence/display/AMQP/Download
Leave a Comment