Skip to content

使用kombu连接RabbitMQ

Published: at 00:00
queues.py
__author__ = 'nate'

from kombu import Exchange, Queue

test_exchange = Exchange('test-exchange', type='direct')
data = ['abc', 'def', 'ghi']
test_queues = [Queue(name, test_exchange, routing_key=name) for name in data]
client.py
__author__ = 'nate'

from kombu.pools import producers
from kombu import Connection
from queues import data
from queues import test_exchange
from datetime import datetime

if __name__ == '__main__':
	url = 'amqp://nate:[email protected]:5672/test_vhost'
	payload = {'content': 'This is test at {now}'.format(now=datetime.now())}
	with Connection(url) as conn:
		with producers[conn].acquire(block=True) as producer:
			producer.publish(payload,
							 serializer='json',
							 compression='bzip2',
							 exchange=test_exchange,
							 declare=[test_exchange],
							 routing_key=data[0])

			print 'sent : %s' % payload
worker.py
#coding=utf-8
__author__ = 'nate'

from kombu import Connection
from kombu.mixins import ConsumerMixin
from queues import test_queues


class Worker(ConsumerMixin):

	def __init__(self, connection):
		self.connection = connection

	def get_consumers(self, Consumer, channel):
		return [Consumer(queues=test_queues,
						 accept=['json', 'pickle', 'msgpack', 'yaml'],
						 callbacks=[self.work])]

	def work(self, body, message):
		print 'receive: %s' % body['content']
		message.ack()

if __name__ == '__main__':
	url = 'amqp://nate:[email protected]:5672/test_vhost'
	with Connection(url) as conn:
		try:
			worker = Worker(conn)
			worker.run()
		except KeyboardInterrupt:
			print 'Exit'