I recently had the pleasure of trying to figure out how to get a headers exchange working with RabbitMQ. Unfortunately, there is almost no documentation on how to do this! All of the other exchange types are covered quite well in the tutorials etc, but headers exchanges are apparently the red-headed step children of the exchange world. From what I gathered in this thread, nobody really likes headers exchanges because they aren’t very flexible. But they seemed pretty good for what I needed, so I dug through the pika source and cobbled together an understanding of how they work.
Here’s how you set one up and consume from one using pika (for simplicity I use the BlockingConnection type here, but the asynchronous SelectConnection is preferred when using pika. You can easily adapt the code below to use SelectConnection):
headers_emitter.py
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='testing', type='headers') fields = {} try: while True: data = raw_input('> ') if '=' in data: key, val = data.split('=') fields[key] = val continue channel.basic_publish(exchange = 'testing', routing_key = '', body = data, properties = \ pika.BasicProperties(headers = fields)) print ' [x] Send {0} with headers: {1}'.format(data, fields) except KeyboardInterrupt: print 'Bye' finally: connection.close()
headers_receiver.py
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='testing', type='headers') result = channel.queue_declare(exclusive=True) if not result: print 'Queue didnt declare properly!' sys.exit(1) queue_name = result.method.queue channel.queue_bind(exchange='testing', queue = queue_name, routing_key = '', arguments = {'ham': 'good', 'x-match':'any'}) def callback(ch, method, properties, body): print "{headers}:{body}".format(headers = properties.headers, body = body) channel.basic_consume(callback, queue = queue_name, no_ack=True) try: channel.start_consuming() except KeyboardInterrupt: print 'Bye' finally: connection.close()
These two little scripts are just toys, but show how you can add key/value pairs to the headers of your rabbitMQ messages. headers_emitter.py interprets anything you type with an ‘=’ character in it as a key/value assignment to add to the header. In headers_receiver.py, you can specify what fields are to be matched in the ‘arguments’ argument to queue_bind. Just add the key ‘x-match’ with the values ‘any’ or ‘all’ to decide whether all of the keys need to match, or just any of them need to match (this part is pretty intuitive).
Apparently the complaints about headers exchanges stems from how there is no rich comparison allowed on the headers fields. So you can’t say something like ‘hams = good | bad’, or ‘id_nbr > 100’ etc, you can only match or not match. You can get some semblance of the either / or by making several bindings from the exchange to your queue. So in the ‘hams = good | bad’ case, you canchannel.queue_bind(properties = {‘hams’:’good’},…)andchannel.queue_bind(properties = {‘hams’:’bad’},…), and all messages with either good or bad in the hams key will be sent to that queue. But unless you want to make a binding for all numbers greater than some value, ‘id_nbr > 100’ is just not possible. Of course, you can catch all messages and then filter in your client based on the value of id_nbr, but depending on the volume of messages, that may or may not be practical