[Article] Concepts Behind Broker Networks
By Hasitha Abeykoon
- 29 Sep, 2016
This article focuses on the factors that are considered when designing a scalable message broker. It discusses why the clustering concept came to the world of brokers and how various broker implementations cater to this concept. Comparison between the schemes used by different implementations is done where necessary. In some areas, we can identify common algorithmic approaches implemented with different underlying technologies by different brokers.
One of the main issues in designing message brokers are the problems of shared storage and scalability. The concepts, advantages, and disadvantages of different approaches taken by major message brokers in the market are discussed towards the end of the article.
Today, the SOA architecture which dominated middleware is slowly embracing microservices. Message Brokers continue to play an important role providing asynchronous communication between independent microservices; this involvement is also discussed in the final section of this article.
With the advent of the Internet and network infrastructures, many systems moved from conventional monolithic hardware platforms to scalable and virtualized hardware ecosystems. As a result, IaaS platforms came into existence - platforms such as Amazon Web Services (AWS), Windows Azure, Google Compute Engine, Rackspace Open Cloud, and IBM SmartCloud Enterprise. Server software, which needed large amounts of computer memory and CPU processing power, was written in a way that allowed them can be setup on multiple physical hardware units and yet behave as a single logical unit.
In that way, systems gained the ability to scale horizontally. This also enabled systems to be highly available regardless of parts being offline for failures or maintenance.
Networking of brokers emerged later
While web services and web applications went relatively quickly down this path, the JMS, AMQP world lagged behind.
One reason was that protocols like AMQP emerged after HTTP; they also had request-response semantics that would make broker systems stateful. Consider a queue: one element in a queue has a time bounded relationship with another. This defines the order of elements. Maintaining this order between elements in a distributed way, logically, between different nodes, made broker networks a non-trivial and complex problem: a distributed queue is a concept that is not yet completely solved in computer science. Scaling brokers only provide a solution with loose ordering between elements. Thus the AMQP 0-8, 0-91 and 0-10 specification did not have broker network concepts; it was only by AMQP 1.0 specification that the concepts of broker federation came up.
Federation of brokers defined by AMQP 1.0
The latest specification for AMQP is v1.0. Previous versions always referred to a client-server architecture, where there are producers and consumers connected to a central server node. With AMQP 1-0, this model is generalized to have nodes and links.
- Nodes - entities responsible for the safe storage and/or delivery of Messages. Messages can originate from, terminate at, or be relayed by nodes
- Links - unidirectional route between two Nodes. Links attach to a node at a terminus. There are two types of terminus: sources track outgoing messages while targets track incoming messages.
As a message travels through the AMQP network, the responsibility for safe storage and delivery of the message is transferred between the nodes it encounters (link protocol). These definitions have introduced the necessary flexibility for a broker network.
Why designing broker networks is hard
Broker networks are by nature stateful: they need to track which message was received by which node and whether the receipt of such was acknowledged. At the same time, they need to keep track of message ordering. The situation is such that these message states cannot be replicated across cluster per message basis. Nodes in the broker cluster should be aware of each other's actions and existence but that information cannot be shared very frequently.
Nevertheless, during a node shutdown or during a message consumer/publisher shut down or failover, other nodes should detect such events and take actions upon. Sometimes they might route messages via some other nodes.
Most of the distributed algorithms for these assume that the underlying network is reliable, which is not the case in the practical world. Nodes might not be in sync with cluster states and shared events all the time, causing some messages to get out of track. Those messages might be redelivered or not delivered to any live consumer at all. They can end up in a terminal node due to invalid cluster states. Network outages cause “split brain” situations where the broker cluster is split into two individual clusters. Broker states might get individually updated in broker nodes as the system functions as two individual clusters thereafter. When those two clusters (partitions) merge again upon network recovery, individually updated states in partitions are not valid: this makes the whole broker network unstable after the merge.
Another issue is the scalability in the storage layer for persistent messages. Consider a design where all broker nodes use same RDBMS database node for persisting messages. There will be an upper limit for the performance we can obtain from the broker cluster without losing scalability. If broker nodes use separate databases, message order and coordination will be disturbed. One can propose using NoSQL databases for persistent storage as a solution to above problem; but then again NoSQL databases do not support ACID properties, especially on consistency. It is a limitation of a typical NoSQL (eventually consistent) persistent layer when considering it for designing a reliable broker network.
Thus, it is evident that the nature of a broker network is governed by the CAP theorem: a good broker network has a balance between consistency, availability, and partition tolerance so that it can be practically used on top of an IaaS platform. A cluster system that maintains all three guarantees does not exist.
Custom implementations evolved
ActiveMQ is another open source message broker released under released under the Apache 2.0 License. It comes with JMS 1.1 and several "cross language" clients. ActiveMQ has scaling capabilities.
ActiveMQ clustering is done based on a “competing consumers” design pattern that describes how the broker can receive the messages sent by the producers, enqueue them, and distribute them between all the registered consumers. It can be clustered into two modes, "active-active" and “active-passive”.
The key idea behind this mode is that if you have a hardware failure at either the broker you have connected to (primary machine), the file system or the entire data center, you get immediate failover to the secondary with no message loss.
ActiveMQ supports three types of primary-secondary deployments:
- Using a shared file system - i.e a SAN network. Here when you persist the message, it is replicated to several backups using a local high-speed network. Thus, physical failure of a particular disk array in SAN does not matter to broker functionality. A primary node and any number of secondaries can connect to a SAN virtual drive. When the primary fails, after broker clients failover to a particular secondary, it can serve the messages as it is connected to the same SAN. The issue here is the cost of SAN data storage option.
- Using JDBC file system - Here the primary node and all secondary nodes connect to same JDBC message store. This is viable if the messaging system does not need scaling on the database (single-point-of-failure) and has a low-performance requirement.
Both techniques use simple logic to achieve reliable delivery: both primary and secondary nodes try to get an exclusive lock from the database system. The primary node gets the lock and works while secondary nodes wait for the lock. For some reason, if the primary node gets killed, one of the other secondaries immediately grabs the exclusive lock on the database and becomes the primary.
The following is the topology before the primary failure.
If the primary loses connection to the database or loses the exclusive lock, it immediately shuts down. Then the following topology is applied.
It is understandable that if primary node is restarted, it will still become the secondary, thus letting the secondary node which owned the exclusive lock to continue as primary.
Using a replicated levelDB store - Here Apache ZooKeeper is used to elect a primary node out of the broker nodes. The elected primary broker node starts and accepts client connections. The other nodes go into secondary mode and connect TO the primary and synchronize their persistent state with it. All persistent operations are replicated to the connected secondaries. If the primary dies, the secondary with the latest update gets promoted to become the primary. If the failed node brought online again, it will become a secondary, letting the current primary continue.
This mechanism uses a quorum value to update the replicas. Before the broker cluster considers that it has successfully persisted a message, it is written to A quorum number of nodes. To other nodes, it is written in an asynchronous way.
Thus, when using ActiveMQ in this mode, it's recommended that you run with at least three replica nodes.
Network of brokers
If you are building a messaging platform with ActiveMQ, and if this platform should have massive scalability, topologies like client-server or hub-spoke will not work. It is apparent that even if the primary-secondary mode gives high availability, this system does not scale. Yes, by adding more memory capacity and processing power, nodes can be scaled vertically to cater the messaging requirements, but there are limits. The traffic that can be handled by shared storage will also provide an upper bound for the performance.
To build a scalable messaging platform while tolerating broker, machine or subnet failures, one needs a network of brokers. Running message producers and consumers across geographic regions/data centers can be architected better using such a network.
- Networks of brokers do reliable storing and forwarding of messages. If messages are not durable, those messages may get lost in broker network.
- Total message ordering is not preserved in a network of brokers. If a single consumer moves between networked brokers, the total order may be preserved if all messages always follow the consumer, but this can be difficult to guarantee with large message backlogs.
- Broker networks are usually configured to have networkConnectors and transportConnectors.
A network connector (unidirectional by default) is a link between two brokers used for forwarding messages from one broker to another. A transport connector is a socket listening on a port for the broker node.
In other words, if Broker B1 initiates a network connector to Broker B2, then the messages on a channel (queue/topic) on B1 get forwarded to B2 if there is at least one consumer on B2 for the same channel. From ActiveMQ broker perspective, there is no difference between a standard client consuming messages or another broker acting on behalf of a client. They are treated in the exact same manner.
Generally, it is a bad practice to connect the broker in an ad-hoc manner. This may create multiple routes for messages to flow from the broker where the producer is to the broker where the consumer is. Store-and-forward traffic increases exponentially as messages flow through any number of different routes.
Designers can get rid of this complexity using other topologies like hub and spoke.
RabbitMQ has become a prominent broker in the industry within a short time. With Erlang in place, RabbitMQ engineers have received significantly large performance results. RabbitMQ has three ways to accomplish scaling: clustering, federation and shovel.
Control data like virtual hosts, exchanges, users, and permissions are mirrored across all nodes in a cluster. What is not replicated by default is messages in queues. Nevertheless, a client connecting to any node in a cluster can see all queues in the cluster, even if they are not physically located on that node. In essence, this connects multiple machines together to form a single logical broker.
Cluster formation can be done using a command line client, using configuration files or using clustering plugins. Network links between the nodes MUST be reliable and all machines should run same Erlang and RabbitMQ versions.
In this mode, a broker can also be configured to mirror messages across the nodes. This adds high availability to the broker cluster. One node of the cluster will be selected as the primary queue node, while others will serve as secondaries. A node joining as a secondary will not be synced immediately: it will take some time.
RabbitMQ has ways to sync queues automatically and manually. While being synced explicitly, the internal queue of that particular queue is not accessible. In the case of a queue primary shut down while all secondary nodes are unsynced, the RabbitMQ client will refuse to failover to an unsynced node and it would choose one of the synced nodes. If, however, the primary node process is killed, all other nodes will be in the unsynced state - the client will then failover to an unsynced node.
Clients that are consuming from a mirrored queue may wish to know that the queue from which they have been consuming has failed over. When a mirrored queue fails over, knowledge of which messages have been sent to which consumer is lost, and therefore all unacknowledged messages are redelivered with the redelivered flag set.
RabbitMQ also has some modes to recover from network partitioning.
When connecting clients to the cluster, if the hostnames and IPs are configured into the client applications, it adds some coupling to the system. Instead, RabbitMQ recommends a more abstracted approach as a dynamic DNS service which has a very short TTL configuration, or a plain TCP load balancer, or some sort of mobile IP achieved with a pacemaker or similar technologies.
Clustering is meant to be used across LAN. To connect brokers across a WAN, RabbitMQ recommends Shovel or Federation (described below).
The idea behind federation is that if a message consumer is connected to a broker node where the actual queue exists on a different upstream broker node, it allows the system to fetch the messages from the upstream broker node.
Federation links connect to upstreams using RabbitMQ Erlang client itself. That means broker nodes start an AMQP client inside that server node to fetch messages from other nodes. Thus, proper authentication and users should be configured so that it can pull messages from other brokers. Federated links can be secured using TLS (Transport Layer Security) as well.
Messages will be moved between federated queues an arbitrary number of times to follow the consumers.
Queues, exchanges, broker clusters as well as individual broker nodes can be federated: federation can happen between individual broker nodes or broker clusters, and federated exchanges are connected with one-way point-to-point links.
At the deployment below, each exchange links to the other in this symmetric arrangement. A publisher and consumer connected to each broker are illustrated. Both consumers can receive messages published by either publisher.
Conceptually, this is same as federation, but works at a lower level than using an AMQP client to move messages. The shovel plugin consumes messages from a source broker node and forwards them to a destination broker. It can handle connection failures and so on and reliably move messages.
Thus, RabbitMQ provides, primarily, two concepts to achieve scalability: nevertheless, you cannot have all three aspects as specified by CAP theorem in either concept.
- Clustering - has chosen Consistency and Partition Tolerance (hence a CP system)
- Federation/Shovel - has chosen Availability and Partition Tolerance (hence an AP system)
In Websphere MQ, queue managers are defined to manage the resources associated to its queues: whenever a queue is created, it is attached to one of the queue managers. An IBM MQ cluster is a cluster of queue managers. Thus to horizontally scale the MQ, you need to have more than one queue manager and clearly defined connections and channels between the managers.
- One possible and common deployment is to use some MQ nodes as gateway nodes and send messages. Clients have failover configured between these nodes so that the failure of one GW node does not make the application/s unreachable from the MQ. The GW queue manager distributes the messages in round robin manner to the other queue managers.
- This mechanism provides
- Load balancing - messages are stored to different queue managers. QM1 distributes messages between QM2 and QM3.
- High availability - if one queue manager is down, the other queue manager is used.
To establish communication between queue managers in a cluster you need to configure a link using one of the supported communication protocols (static wiring). As part of this configuration, you also need channel initiators and channel listeners just as you do with distributed queuing.
With this clustering solution, there is a possibility of messages getting stuck in intermediate queue managers or in transmission queues (QM1 has it to buffer messages to be distributed).
If QM2 goes down, messages in the transmission queue at QM1 will be transferred to QM3 only if the bind is not fixed (not open). Otherwise, messages will be stuck there until QM2 is up again (thus, the bind type is important for availability).
Until you start QM2 ASAP again, messages stuck in QM2 are not delivered to any client. To make QM2 also highly available by automatically starting another QM instance, IBM MQ has options like multi-instance queue manager or HA clusters (same QM running on different machines) or a shared queue concept.
How messages are rerouted on subscription failures is another topic to broach, because cluster workload balancing does not account for the availability of receiving applications.
What will happen is that QM2’s queue will get filled and messages will start going to the DLC queue. As a solution, IBM provides a queue monitoring agent (MQSCLM) that can be installed to every queue manager. This client then monitors if there is any consumer application available for that queue on that QM instance. This sample is extensible and users can add smart routing logic as well if needed (i.e slow consumption rules). This agent updates a shared repository between queue managers, which ultimately changes the message routing between queue managers (cluster workload priority).
In this example, the cluster workload priority of QM3 is marked high by MQSCLM agent and messages coming to QM1 are routed to QM3, leaving QM2 behind. But while the agent detects service disconnects, some messages are routed to QM2. This agent can push them to one of active queue managers so that messages are not stuck in a node of the cluster.
- This solution works as long as services/subscribers are attached to the cluster in a stable manner. If they connect and disconnect frequently, the system is not going to work as monitoring agents have to keep up with those changes.
- There is a single point-of-failure with QM1. To solve that, the client can be configured to have failover with another Queue Manager. When reply comes MQ knows the correct queue manager back.
- Messages might need different types of SLAs, i.e. real-time messages, audit messages etc. Setting up different clusters and configuring clients and services accordingly is recommended for such scenarios.
- Queues might be unbalanced between queue managers: some QMs might have many queues and some might have less. Traffic on different queues might be different, too. In such a scenario, it is better to do load balancing at channel level rather than individual queue level.
Globally distributed messaging system
IBM MQ can be configured to be a globally running messaging platform. There are vivid topologies that can be followed. Typically, if you configure all the nodes to be in a single cluster, queue manager aliases and priority levels are configured in such a way that local messages are routed to local QM nodes. If the local one is down, it will route to the remote one.
Following is an example where Queue Managers are distributed in two data centers in New York and London. Note how aliases and priorities are set.
For disaster recovery, MQ is compatible with synchronous datacenter replication, asynchronous data center replication and warm standby (no synchronization). Synchronous replication is costly in terms of performance but is reliable. Asynchronous replication is faster, but during failover to a data replication site, some information loss can happen (a replication site with no synchronization focuses on carrying on present traffic without worrying about older messages and states).
Judging from this discussion, we can conclude that IBM MQ has a solid cluster offering which has been matured over years.
The scalability of the message store database
Consider this problem: if the message order should be kept, all messages should be stored in a particular database table in a particular database. If the message publish/subscribe rate is high, this table will naturally become a hot table, as there will be many database operations. If two database server instances are used, it is quite understandable that we would lose message ordering. This is an inherited problem from the very nature of a message broker cluster.
Using independent message stores
One solution to this is to use several database instances and point different broker instances to different database instances, and distribute queues across databases. Then, there will be only one node to handle the messages of a particular queue. For example, if node MB1 handles messages of queue “foo”, a message read/write request for queue “foo” coming to MB2 will be delegated to MB1 node. This will guarantee message ordering at the cost of performance.
RabbitMQ follows this paradigm. IBM MQ also adopts the same concept, as different Queue Managers have dedicated stores.
Using scalable database solutions
Another solution is to use a NoSQL database like MongoDB or Apache Cassandra. You can scale these databases to cater the load. A few replicas are stored in the database instances so that if one database node goes down, it can recover.
However, NoSQL databases are not perfect. They have inherited problems arise owing to noSQL and distributed nature. WSO2 MB was released with Apache Cassandra DB support up until version 2.2; we ran into the issues listed below:
- Cassandra was key, value based. We could not store any relational information there.
- There is no way to query and know how many messages are stored in the database in a distributed manner at a given instance.
- If we need a distributed counter, we cannot update a CounterColumnFamily from different Cassandra database nodes. It will not be atomic and reliable. In order to get a reliable value out of Cassandra, operations should be idempotent to each other. Increasing and decreasing a global value is not idempotent.
- Writing to a Cassandra database is fast compared to regular RDBMS databases. Yet, when writes gain priority, reads from the database tend to get timed out. Multiple factors affect this behavior. This needs proper tuning should the process is very specific to the environment.
- Cassandra is resource hungry. It consumes a considerable amount of RAM and CPU, which makes the system not lean.
- Tombstones are collected when you delete records. If some message was not read, there is no way you can go over all the tombstones collected and read up that value. Most probably that read request will timeout.
- There are consistency levels you can set in Cassandra. As a broker should always be consistent, we tend to use “QUORUM” for consistency. In this scenario, at least 3 nodes of Cassandra are required even for a small deployment. If a node or its replica goes down, the message cannot be recovered.
- Deployment and maintenance overhead as Cassandra HA cluster need to have at least 3 nodes and even they are resource intensive. Monitoring DB instances is needed in order to make sure they do not go out of resources etc.
Thus, we realized that we could not adapt that database easily to build a distributed message broker to implement a scalable yet reliable broker.
According to Ebay’s list of best practices, implementing a reliable message queue on top of Cassandra seems to be error prone. If you do, you will receive performance at the cost of reliability, whereas for a Broker reliability is priority number one. Thus this architecture is not suitable enough to design a reliable and fault tolerant broker serving the purpose of a broker.
Using SANs (or shared network drives)
A storage-area network (SAN) is a dedicated high-speed network (or subnetwork) that interconnects and presents shared pools of storage devices to multiple servers. This allows each server to access shared storage as if it were a drive directly attached to the server. Multiple servers can share same underlying persistent storage, enabling data sharing between servers. Nodes in a message broker cluster can utilize shared storage to make a shared queue. The scaling of the storage should be handled by SAN infrastructure.
Deploying a SAN infrastructure in-house (and associated consultancy) is rather costly. Sharing a database across nodes is a solution, but this has scaling limitations. Carefully designing broker applications to mitigate the database overhead, keeping the reliability factor to an acceptable level, is a more practical solution.
Broker networks and microservices
The microservices architecture becoming more popular within enterprises. It is not totally a new concept, as organizations already adopted this concept by spinning up a new environment for each service instead of sharing the same environment for many services; this proto-microservices style enables easy maintenance owing to the lack of inter-relationships between services.
Even though services are built independently to serve a business requirement, inter-process-communication (IPC mechanisms) between services is a vital part. This is usually the responsibility of the middleware platform services are built upon. In a typical microservice reference architecture, there is a section for “Messaging Channels”. That is where enterprise service bus (ESB) and message broker (MB) fit in.
Inter-service communications may be one-to-one or one-to-many. Another dimension for service invocation is whether is the invocation is synchronous or asynchronous.
- Synchronous – The client expects a timely response from the service and might even block while it waits. This is achieved by API calls between services. The standard way of doing this is using a communication bus which is shared by services.
- Asynchronous – The client doesn’t block while waiting for a response, and the response, if any, isn’t necessarily sent immediately. This is achieved using a message broker. A client service publishes a notification message, which is consumed by zero or more interested services.This falls under event-driven architecture (EDA).
Below is an example for inter-service communication using EDA. It shows communication between different services involving dispatching a cab to a passenger by a dispatcher.
Interestingly, message broker by itself can represent another microservice cluster comprised of many broker nodes (broker cluster). Different services can subscribe to the “messaging service” offered by broker cluster, using which they can publish messages and consume messages. When observed in detail, it can be identified that physical connections from a container bearing a service can go to some container bearing the message broker. Thus, message broker cluster should be intelligent enough to route the required messages to the required subscriber across Message Broker nodes in a reliable way.
This mechanism brings up beneficial features. The broker cluster would not have a single point of failure and also it would have scalability. It is convenient for maintenance as well. All of these come with a cost of designing and implementing a message broker service that works across the platform in a reliable and fault tolerant manner. Thus, Message Broker continues to play an important role in microservices architecture as one of the IPC mechanisms.
As explained here, designing and implementing a message broker network is inherently hard. One is because of the asynchronous nature of message publishing and subscribing. The other is the reliability it needs to provide for upper layer applications. At the same time, when it comes to the software market, performance is also a factor to consider. Unfortunately, all these non-functional requirements conflict with each other and cannot be achieved together.
|Broker||Strength Area||Scaling Mechanism||Persistent Storage|
|ActiveMQ||Open Source and Consists of many features||Shared storage / broker-to-broker communication||File-based storage. RDBMS DB support is there via adaptors|
|RabbitMQ||Performance with Erlang language and raw AMQP clients||Clustering/federation/shovel||File-based storage (custom written)|
|IBM MQ||Spec compliancy and matured product with lot of features||Clustering queue managers using queue monitoring agents||File-based storage (custom written)|
Thus over time people have implemented various Message Broker products which have their own strengths. Clustering have made them scalable; nevertheless, at the same time, it has provided a new dimension to the queuing problem as the network is providing an unreliable medium. It has also widened the technologies used by Message Broker software and strength areas.
Finally, a distributed queue is a problem in computer science which is not solved entirely. CAP theorem (Consistency - Availability - Partition Tolerance) introduces a natural balance between competing factors; thus research, algorithms, and implementations on this area will continue to evolve with time.
-  https://www.rabbitmq.com/ha.html#cancellation
-  https://www.rabbitmq.com/distributed.html
-  https://www.rabbitmq.com/shovel.html
-  https://www.rabbitmq.com/nettick.html
-  https://www.youtube.com/watch?v=w65mJelIBQM
-  https://www.ibm.com/developerworks/websphere/library/techarticles/1005_vanstone/1005_vanstone.html
-  https://www.lifeng.name/document/manual/mqm/csqzah/csqzah0m.htm
-  https://dzone.com/articles/active-mq-network-brokers
-  https://searchstorage.techtarget.com/definition/storage-area-network-SAN
-  https://wso2.com/wso2_resources/wso2-whitepaper-scope-versus-size-a-pragmatic-approach-to-microservice-architecture.pdf
-  https://www.jakubkorab.net/2011/11/understanding-activemq-broker-networks.html
-  https://activemq.apache.org/clustering.html
-  https://activemq.apache.org/networks-of-brokers.html
-  https://activemq.apache.org/how-do-distributed-queues-work.html
-  https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
-  https://bigdata-blog.com/real-time-data-processing-with-apache-kafka
-  https://kafka.apache.org/documentation.html#introduction
-  https://medium.com/systems-architectures/walking-the-microservices-path-towards-loose-coupling-few-pitfalls-4067bf5e497a#.rqpkciapl
Table of content
- Why networks?
- Networking of brokers emerged later
- Federation of brokers defined by AMQP 1.0
- Why designing broker networks is hard
- Custom implementations evolved
- The scalability of the message store database
- Using independent message stores
- Using scalable database solutions
- Using SANs (or shared network drives)
- Broker networks and microservices
- Hasitha Abeykoon
- Associate Technical Lead