queues.py
__author__ = 'nate'
from kombu import Exchange, Queue
test_exchange = Exchange('test-exchange', type='direct')
data = ['abc', 'def', 'ghi']
test_queues = [Queue(name, test_exchange, routing_key=name) for name in data]
client.py
__author__ = 'nate'
from kombu.pools import producers
from kombu import Connection
from queues import data
from queues import test_exchange
from datetime import datetime
if __name__ == '__main__':
url = 'amqp://nate:[email protected]:5672/test_vhost'
payload = {'content': 'This is test at {now}'.format(now=datetime.now())}
with Connection(url) as conn:
with producers[conn].acquire(block=True) as producer:
producer.publish(payload,
serializer='json',
compression='bzip2',
exchange=test_exchange,
declare=[test_exchange],
routing_key=data[0])
print 'sent : %s' % payload
worker.py
#coding=utf-8
__author__ = 'nate'
from kombu import Connection
from kombu.mixins import ConsumerMixin
from queues import test_queues
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=test_queues,
accept=['json', 'pickle', 'msgpack', 'yaml'],
callbacks=[self.work])]
def work(self, body, message):
print 'receive: %s' % body['content']
message.ack()
if __name__ == '__main__':
url = 'amqp://nate:[email protected]:5672/test_vhost'
with Connection(url) as conn:
try:
worker = Worker(conn)
worker.run()
except KeyboardInterrupt:
print 'Exit'