MQTT Broker
A custom MQTT broker developed to handle communications between IoT devices and cloud services.
Built using mqtt_cpp open-source project.
Rationale
Initially a websocket
based high performance and highly scalable services were
developed to handle the communications. Unfortunately websocket
libraries
are not readily available in the embedded C space, and hence the sensor firmware
development company recommended use of the popular MQTT
protocol.
We set up an AWS IoT core service and also developed a very simple listener
daemon
on the cloud. This scheme works very well, but it opened up some
questions about potential data loss, since MQTT is a very rudimentary
pub/sub based protocol, and does not have strong loss-less message delivery
guarantees that streaming protocols like kafka introduced.
To get around these limitations, we decided to implement our own MQTT broker, which would take care of message persistence and provide us the strong loss-less guarantee that we were looking for.
Implementation
We followed the same overall design that the mqtt_cpp project code samples and test suite used. A common broker implementation that implemented the various MQTT protocol handlers, and two wrapping services - TLS and non-TLS.
An advantage of the custom broker implementation is that there is no need for
each message from the IoT device to carry along the device identifier (a GUID assigned
to the device by the device manufacturer or firmware). The device identifier
is specified as the clientId
when connecting to the MQTT broker. This
information is normally not available to a subscriber, but since we are handling
the messages on the broker, we have access to the clientId
.
Persistence
All messages received by the broker are persisted to the same backend MongoDB database that the rest of the cloud services use.
Subscriptions
All subscriptions by clients are saved to the mqtt.subscription
collection.
This enables support for cleanSession: false
when a client establishes a
connection to the broker. The structure of a persisted subscription is as
shown:
{
"_id" : "5d958499c88c526e030306c1",
"topic" : "/local/+/message",
"clientId" : "listener:db:local",
"wildcard": "+",
"qos" : 1,
"created" : "2019-10-03T05:18:17.408Z"
}
Wildcard Topics
Wildcard topics (+
and #
) are supported by pre-parsing the topic name and storing a wildcard
property in the document. When retrieving the subscriber list to disseminate incoming messages to, subscribers that specified topic names with wildcard patterns are retrieved and then compared for match with the message topic.
It would be possible to build a more optimal query by translating the wildcard topic pattern into a appropriate Mongo regex
query, but for our usecase it is an overkill trying to implement a topic pattern as a Mongo regex
DSL.
Messages
Messages received by the broker are persisted to the mqtt.message
collection. These
documents have a TTL index set to purge automatically after 30
days.
The structure of a persisted message is as shown:
{
"_id" : "5d99d33f2a0886257e0014c8",
"topic" : "local",
"data" : "66.6666,-88.8888",
"clientId" : "MQTT_FX_Client",
"qos" : 0,
"retain" : true,
"created" : "2019-10-06T11:42:55.030Z",
"host" : "rakesh-mbp.local"
}
where data
is the message payload. In our case, we only receive CSV messages, hence the payload is stored as a string (utf_8
type in bson). For more general purpose use, they would be saved as binary
type.
Published Messages
All published messages are persisted to the mqtt.publishedMessage
collection.
This allows tracking of messages that have been delivered to a client, and also
ensures that every subscriber (even if off-line for a while) receives every
message that was published to the topics of interest. These documents have a
TTL index set to purge after a period of 7
days.
The structure of a published message looks like the following:
{
"_id" : "5d9b88935796f916e56c0cb4",
"clientId" : "MQTT_FX_Client",
"topic" : "test",
"messageId" : "5d9b885dffe3e3b8b9c2b1f9",
"host" : "rakesh-mbp.local",
"created" : "2019-10-07T18:48:51.142Z"
}
Clustering
All state - client subscriptions, messages, published messages etc. - are captured in the backend Mongo database. This allows clustering of the brokers, since messages received on one broker can also be delivered to clients connected to other brokers in the cluster (assuming they are subscribed to the same topic).
When the broker receives either a ping
or a publish
request, the broker
queries the database for messages published to topics the client has subscribed
to from other hosts in the cluster. Any messages that were previously not
published to the client are published at this stage.
Note: When subscribing to a topic for the first time, only a message
with retain
flag set is published. Other messages published to the topic are
not published.
Security
A noted previously, the implementation supports operating with or without TLS enabled. If TLS support is required, the following types of certificates are needed:
- Server
- Root CA. Certificate for the root certificate authority.
- Server certificate. PEM format certificate identifying the server and the DNS name for the server.
- Server key - Certificate key file for the server.
- Client
- Root CA. Same as the one used with server.
- Client certificate. PEM format certificate for the client. Based on OpenSSL documentation and local testing, this seems optional.
- Client key - Certificate key file for the client. As with client certificate this seems optional.
Configuration
Configuration such as database connection information, certificate files,
TLS etc are configured via etcd
as the rest of the services in the
cloud stack.
/iot/mqtt/tls
- Specifytrue|false
to enable or disable TLS. Defaultfalse
./iot/mqtt/cacert
- Path to Root CA certificate file./iot/mqtt/serverCert
- Path to server certificate file./iot/mqtt/serverKey
- Path to server certificate key file.