知识共享许可协议
本作品采用知识共享署名-非商业性使用-禁止演绎 3.0 未本地化版本许可协议进行许可。

queues.py
__author__ = 'nate'

from kombu import Exchange, Queue

test_exchange = Exchange('test-exchange', type='direct')
data = ['abc', 'def', 'ghi']
test_queues = [Queue(name, test_exchange, routing_key=name) for name in data]
client.py
__author__ = 'nate'

from kombu.pools import producers
from kombu import Connection
from queues import data
from queues import test_exchange
from datetime import datetime

if __name__ == '__main__':
    url = 'amqp://nate:123123@10.211.55.61:5672/test_vhost'
    payload = {'content': 'This is test at {now}'.format(now=datetime.now())}
    with Connection(url) as conn:
        with producers[conn].acquire(block=True) as producer:
            producer.publish(payload,
                             serializer='json',
                             compression='bzip2',
                             exchange=test_exchange,
                             declare=[test_exchange],
                             routing_key=data[0])

            print 'sent : %s' % payload
worker.py
#coding=utf-8
__author__ = 'nate'

from kombu import Connection
from kombu.mixins import ConsumerMixin
from queues import test_queues


class Worker(ConsumerMixin):

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=test_queues,
                         accept=['json', 'pickle', 'msgpack', 'yaml'],
                         callbacks=[self.work])]

    def work(self, body, message):
        print 'receive: %s' % body['content']
        message.ack()

if __name__ == '__main__':
    url = 'amqp://nate:123123@10.211.55.61:5672/test_vhost'
    with Connection(url) as conn:
        try:
            worker = Worker(conn)
            worker.run()
        except KeyboardInterrupt:
            print 'Exit'