Sunday, August 7, 2011

Experimenting with the Kombu framework

The Kombu framework is an excellent way to work with RabbitMQ/AMQP message brokers.   The documentation has several different ways of doing so by instantiating the exchange, queue, and then the RabbitMQ (AMQP broker) connection (See http://packages.python.org/kombu/introduction.html). The documentation at http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/ shows how you can use the py-amqplib to talk to RabbitMQ hosts, but the Kombu framework with its ability to support multiple back-ends provides a much more elegant approach.

Here's a simple code that we can use to talk to our queue if we also have our Celery configuration settings defined too. In this example, we only use the Queue class to consume messages. We can bind the default queue and exchange to the channel and then register a callback that will dump the message to stdout.

from celery.conf import settings
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer

connection = BrokerConnection(settings.BROKER_HOST, settings.BROKER_USER, settings.BROKER_PASSWORD, settings.BROKER_VHOST)

# RabbitMQ connection
channel = connection.channel()

default_exchange = Exchange("default", "direct", durable=True)
default_queue = Queue("default", exchange=default_exchange, key="default")
bound_default_queue = default_queue(channel)

def process_msg(msg):
    print "%s" % repr(msg)

bound_default_queue.consume(callback=process_msg)

while True:
    connection.drain_events()

We can also do the same by declaring a Consumer class too and calling the consume() to register the Consumer:

from celery.conf import settings
from kombu.connection import BrokerConnection
from kombu.messaging import Exchange, Queue, Consumer

connection = BrokerConnection(settings.BROKER_HOST, settings.BROKER_USER, settings.BROKER_PASSWORD, settings.BROKER_VHOST)

# RabbitMQ connection                                                                                                                                                                                                                          
channel = connection.channel()

default_exchange = Exchange("default", "direct", durable=True)

default_queue = Queue("default", exchange=default_exchange, key="default")

def process_msg(body, msg):
    print "body %s, msg %s" % (repr(body), repr(msg))

consumer = Consumer(channel, default_queue, callbacks=[process_msg])
consumer.consume()

while True:
    connection.drain_events()

No comments:

Post a Comment