Thoughts and Notes on Software Development

Delaying Message Processing in RabbitMQ

If you search for “how to delay message processing in RabbitMQ”, you'll most likely run into two possible solutions for it.

Both solutions presented above are valid solutions, but I ended up not implementing any of those solutions, and instead went with a solution that is configurable via the consumer application. First, my reasons for not going with the established solutions listed above.

So, the solution I went with was to add a PublishDate via the message headers and then the consumer can delay message processing based on this date value.

Adding a PublishDate header value is easy, you add it to the Properties.Headers dictionary before publishing the message.

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

properties.Headers = new Dictionary<string, object>();
properties.Headers.Add("PublishDate", DateTime.Now.ToString());

channel.BasicPublish(exchange: "",
    routingKey: "task_queue",
    basicProperties: properties,
    body: body);

Note that I'm adding the PublishDate value as a string, instead of a DateTime value. For some reason, adding it to the dictionary as a DateTime value causes an error. I don't remember what the error was, something about an invalid table value, so I just went with a string value.

On the consumer side, you will need to add code to retrieve the Publish Date from the headers.

consumer.Received += (model, ea) =>
{
    byte[] publishDateHeader = (byte[])ea.BasicProperties.Headers["PublishDate"];
    DateTime publishDate = Convert.ToDateTime(Encoding.UTF8.GetString(publishDateHeader));
    // Now you can delay message processing based on the publish date value

    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine(" [x] Received {0}", message);

    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

Note that I'm first casting the header value to a byte array, before converting it to a string, then finally to a DateTime value. For some reason, adding a string as a custom header turns it into a byte array. Thankfully somebody else ran into this issue before and shared a solution for it.

With a PublishDate value available, you can now delay message processing however you would like. In my case, I opted to compare the PublishDate value to the DateTime.Now value, which allowed me to check how old the message was. For example, if a message was 5 minutes old, it has been delayed enough and gets processed right away. If the message was only a minute old, the consumer thread will wait until such time that the message was now 5 minutes old, before it processes it.

There are some drawbacks to this approach, namely, you will have to go through the Publisher/Consumer classes to add the code for handling a PublishDate header value. Depending on how your queues are structured and how many publisher-consumer class files you have, you could end up with changes to multiple files just to add this feature. On the flip side though, if only one queue needs this “delayed message processing” feature, then you'll have minimal changes while your other queues continue as is. There are probably more pros and cons to this approach that I haven't thought of. Still I prefer the flexibility with this approach as I only must worry about editing a consumer's config file and it allows me to run multiple consumers each with their own specific message processing setting.

Have you had to design a solution to delay message processing in RabbitMQ? If so, I am curious to hear what approach you went with and why. Please do share in the comments below or send me an email and we can discuss.

Tags: #CSharp #DotNet #RabbitMQ

Discuss... or leave a comment below.