Building a Distributed Messaging System

at June 3rd, 2013

Introduction

Groupon’s architecture is composed of many services connected via either messages or HTTP REST APIs. These services manage functions such as processing your orders, shipping your Groupon Goods, or offering $5 off of your next order! Terabytes of data are exchanged daily among these services, and part of this flows through our messaging platform.

Two use cases originally motivated us to build a distributed messaging platform:

  1. Events created by one application are interesting to multiple parties, which need to be notified about the event in real-time. For example, when an order is created, the Financial Engineering service needs to update the company’s accounting records. Other systems may also need to hear about the order: the Groupon Goods service may need to trigger a shipment, or the Groupon Getaways service may need to book your hotel.
  2. Some systems are message-driven in nature. For example, our Merchant Data Service provides a system for discovery, exploration, and analysis of the world’s merchants, places, and venues. This system has individual components for data crawling, content aggregation, and searching. High volume data is streamed top-down through the system via messaging.

We therefore built MessageBus, a distributed messaging system based on HornetQ from JBoss. HornetQ is a high-performance open source messaging system popular in the community. MessageBus is a cluster of HornetQ servers running in standalone mode, coordinated by load balancers and client APIs.

Like out-of-the-box HornetQ, MessageBus provides a publish-subscribe service where each queue can be listened to by multiple subscribers. Each subscriber receives their own copy of the data. Additional subscribers can be plugged in without interrupting the service or interfering with existing subscribers.

messagebus

MessageBus Architecture

While the publish-subscribe model is supported by a single HornetQ server, we anticipated this system being adopted quickly within Groupon, and needed to plan for scaling it quickly. Hence we looked for a distributed solution, and considered HornetQ’s native clustering feature (referred to as HornetQ Cluster below). Unfortunately, our tests comparing vanilla HornetQ with HornetQ Cluster indicated that the HornetQ Cluster greatly compromised throughput (see benchmarks).

To address the performance drawbacks, we designed an infrastructure that’s coordinated by load balancers and a client API.

Summary of the design:

  1. The publisher publishes to one broker at a time, chosen by the load balancer.
  2. The subscriber discovers an available broker through load balancer, which returns the list of consumers it should listen to. The consumer then connects to all brokers simultaneously. It fetches the server list periodically and updates the connections. The consumer list on the brokers can be updated on the fly.

architecture

When a publisher sends message 1 (green line), it may be routed to HornetQ 2, which is listened to by the subscriber. The publisher then sends another message (magenta line), which is routed to HornetQ 1 and delivered to the subscriber through a different connection. The subscriber listens to all N hosts simultaneously to receive real time messages from the publisher.

When a single node fails, the Load Balancer quickly detects it through heartbeat and directs traffic away from it. The consumer client continues to consume from available nodes. Engineering will be informed of the failure and perform maintenance. If the HornetQ host cannot be recovered, data will be manually copied from the DRBD backup machine to a new host, which will be added to the cluster.

With this design, we are able to control publishing and consuming separately, making adding/removing brokers easy.

To add a new broker to the cluster, we:

  1. Add the new broker to consumer lists on existing brokers, including the new one. This is done by updating a text file on the brokers. Existing consumer clients will automatically pick up the new broker, which doesn’t contain any messages yet.
  2. Change the load balancer to direct traffic to the new broker. At this time all clients are already consuming from the new broker, and new messages can be consumed in real-time.

To remove a broker, we:

  1. Remove the broker from the load-balancer to prevent new events from being published to it, but leave the consumers be.
  2. Verify that all messages have been consumed from this broker, then remove the broker from the consumer lists on remaining brokers.
  3. Retire the host.

Certain messages are business critical and have stringent data persistence requirement. At the time we conceived of MessageBus, HornetQ did not provide a data replication feature, so we selected DRBD to provide this functionality. (DRBD is a block device mirroring solution, see drbd.org for more information.) This ensures that each message is replicated to the backup host before the publishing call returns “success”. As of writing of this blog, HornetQ 2.3.0 has been released and supports data replication, though it does non-blocking replication and has weaker backup guarantee.

Benchmarks

In this section we compare benchmark results for the out-of-the-box HornetQ Cluster and our MessageBus cluster.

With HornetQ Cluster, every broker distributes messages to the rest of the hosts in the cluster, as illustrated below:

hornetq_cluster

When a message reaches HornetQ 1, it’s either re-routed to HornetQ 2 (magenta message), HornetQ 3 (green message), or persisted locally (blue message). This distribution is done in a round-robin manner. While this ensures that hosts get an even share of the incoming messages, additional traffic is incurred and the performance is significantly affected. With this design, most publications are finished only after they’ve been handled by two hosts.

The test is run on a cluster of two brokers, each with this hardware setup:

  • CPU: 2 x Intel E5645 (2.4 GHz, 6 cores, Hyper Threaded, 12MB cache, 5.86GT/s QPI)
  • Memory: 64GB DDR3-1333
  • Storage: 4 x 1TB SATA, 3Gb/s, 7200 RPM, 64MB Cache, software RAID 10
  • NIC: 1Gbps Ethernet

We tested two scenarios and measured the number of messages published per second.

Scenario 1: 50 publishers publish 10,000 messages each.

Scenario 2: 50 publishers publish 10,000 messages each while 50 consumers are consuming from the same queue.

Benchmarks

MessageBus has 238% performance of HornetQ Cluster in scenario 1, and 197% in scenario 2. It’s also interesting to note that HornetQ Cluster yields almost the same throughput in the two scenarios, indicating that server load is no longer the bottleneck.

JUDCon Talk

I will be presenting MessageBus at JUDCon 2013 in Boston. If you have registered for the conference, please come and listen to my talk to learn more about our system. If you use products from JBOSS, then JUDCon is the conference in the community and you should check out the agenda and consider attending. Note that there is a registration fee of $150, and this increases to $250 once the conference commences on June 9, 2013.

John B. Hynes Veterans Memorial Convention Center
900 Boylston Street, Boston, MA, 02115

Time 2:30pm–3:20pm EST
Date Tue 11th June 2013

More details of the talk.

Future Work

MessageBus has become an important component in Groupon’s infrastructure, with over 100 topics, 200 subscribers deployed in production. 5 million messages are published to two clusters with 9 nodes daily, and spiky traffic up to 2 million per hour. But there is still room to improve.

  • Introduction of a RESTful API. MessageBus currently provides a Java client and a Ruby client, which covers the great majority of Groupon’s use cases. However, occasionally there are still demands for a true language-neutral solution. The biggest challenge for a RESTful interface is the consumer – the REST client needs to buffer messages from all hosts and manage their life cycle. It also needs to determine the number of consumers dynamically based on query rate, so that it can efficiently answer queries without starving stale messages in its buffer.
  • Aggregated monitoring. We use jmx to monitor metrics of each HornetQ broker. But a common requirement is to get the number of messages in a queue across all nodes. There is an ongoing effort to set up a centralized monitor to aggregate metrics reported by individual nodes.

We are currently polishing up the code base so we can release MessageBus as an open source project. Once it’s finished I will follow up with a post. Check back in coming weeks for access to the source code and everything! In the meantime email messagebus-team@groupon.com for more information.

No Tags


No comments yet


Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>