It’s been a while that for my daily work I have to deal with IoT architectures
and researching best patterns to develop such systems, including diving through
standards and protocols like MQTT; as I always been craving for new ideas to
learn and refine my programming skills, I thought that going a little deeper on
the topic would be cool and useful too. So once again I git init a low-level
project on my box to deepen my knowledge on the MQTT protocol, and I wanted to
push myself a little further by sharing my steps; instead of the usual
Key-Value store, which happens to be my favourite pet-project of choice when it
comes to learn a new language or to dusting out on low-level system programming
Sol will be a C project, a super-simple MQTT broker for Linux platform
which will support version 3.1.1 of the protocol, skipping on older protocols
for now, very similar to a lightweight mosquitto (which is already a
lightweight piece of software anyway), and with the abundance of MQTT clients
out there, testing will be also easier. The final result will be a base for
something more clean and with more features, what we’re going to create have to
be considered a minimal implementation, an MVP. As a side note, the name
decision is a 50/50 for the elegance i feel for short names and the martian day
(The Martian docet). Or maybe it stands for Shitty Obnoxious Laxative. Tastes.
Note: the project won’t compile till the very end of the series, following
all steps, to test single parts and modules I suggest to provide a main by
yourself and stop to make experiments, change parts etc.
Going per steps, I usually init my C projects in order to have all sources
in a single folder:
Here the repository on Github
try to describe step by step my journey into the development of the software,
without being too much verbose, and listing lot of code directly with brief
explanation of its purpose. The best way still remains to write it down,
compile it and play/modify it.
This will be a series of posts, each one tackling and mostly implementing a single
concept/module of the project:
I’d like to underline that the resulting software will be a fully functioning
broker, but with large space for improvements and optimization as well as code
quality improvements and probably, with some hidden features as well (aka bugs
In essence a broker is a middleware, a software that accepts input from
multiple clients (producers) and forward it to a set of destinatary clients
(consumers) using an abstraction to define and manage these groups of clients
in the form of a channel or topic as it’s called by the protocol standards.
Much like an IRC channel or equivalent in a generic chart, each consumer client
can subscribe to topics in order to receive all messages published by other
clients to those topics.
The first idea coming to mind is a server built on top of a data structure
of some kind that allow to easily manage these topics and connected
clients, being them producers or consumers. Each message received by a client
must be forwarded to all other connected clients that are subscribed to the
specified topic of the message.
Let’s try this way, a TCP server and a module to handle binary communication
through the wire. There are many ways to implement a server, threads, fork
processes and multiplexing I/O, which is the way I’d like to explore the most.
We’ll start with a single-threaded multiplexing I/O server, with the possibility
on future to scale it out using threads, epoll interface for multiplexing in
fact is thread-safe by implementation.
The MQTT protocol
First of all, we have to model some structures to handle MQTT packets, by
following specifications on the official
Starting from the opcode table and the MQTT header, according to the docs every packet
consists of 3 parts:
Fixed Header (mandatory)
Variable Header (optional)
The Fixed Header part consists of the first byte for command type and flags,
and a second to fifth byte to store the remaining length of the packet.
Flags are not all mandatory, just the 4 bits block MQTT Control Type, the
Dup flag, used when a message is sent more than one time
QoS level, can be AT_MOST_ONCE, AT_LEAST_ONCE and EXACTLY_ONCE, 0,
1, 2 respectively
Retain flag, if a message should be retained, in other words, when a
message is published on a topic, it is saved and future connecting clients
will receive it. It can be updated with another retained message.
So fire up Vim (or your favourite editor) and start writing mqtt.h header
file containing the Control Packet Types and a struct to handle the Fixed
The first 2 #define refers to fixed sizes of the MQTT Fixed Header and of
every type of MQTT ACK packets, set for convenience, we’ll use those later.
As shown, we leverage the union, a value that may have any of several
representations withing the same position in memory, to represent a byte. In
other words, inside unions, in contrast to normal struct, there can be only
one field with a value. Their position in memory are shared, this way using
bitfields we can effectively manipulate single bits or portions of a byte.
The first Control Packet we’re going to define is the CONNECT. The
CONNECT is the first packet that must be sent when a client establish a new
connection and it must be extactly one, more than one CONNECT per client must
be treated as a violation of the protocol and the client must be dropped.
At each CONNECT must be followed in response a CONNACK.
From now on, the definition of other packets are trivial by reproducing the
pattern, accordingly to the documentation of MQTT v3.1.1.
We proceed with SUBSCRIBE, UNSUBSCRIBE and PUBLISH. SUBSCRIBE is the
only packet with a dedicated packet definition SUBACK, the other can be
defined as generic ACK, and typenamed using typedef for semantic
The remaning ACK packets, namely:
can be obtained by typedef’ing struct ack, just for semantic separation of
concerns. The last one, DISCONNECT, is not really an ACK but the format is
We can finally define a generic MQTT packet as a union of the previously
We proceed now with the definition of some public functions, here in the header
we want to collect only those functions and structure that should be used by
To handle the communication using the MQTT protocol we need essentially 4 functions,
2 for each direction of the interaction between server and client:
A packing function (serializing or marshalling, i won’t dive here in a
dissertion on the correct usage of the terms)
An unpacking function (deserializing/unmarshalling)
Supported by 2 functions to handle the encoding and decoding of the
Remaning Length in the Fixed Header part.
We also add some utility functions to build packets and to release
heap-alloc’ed ones, nothing special here.
Fine. We have a decent header module that define all that we need for handling
the communication using the protocol. Let’s now implement those functions.
First of all we define some “private” helpers, to pack and unpack each MQTT
packet, these will be called by the previously defined “public” functions
unpack_mqtt_packet and pack_mqtt_packet.
Packing and unpacking
Before continuing with the implementation of all defined functions on
src/mqtt.h, we need to implement some helpers functions to ease the pack and
unpack process of each received packet and also ready for send forged MQTT
Let’s go fast here, it’s just simple serialization/deserialization respecting
the network byte order (endianness, usually network byte order refers to
Big-endian order, while the majority of machines follow Little-endian
convention) of the packets.
And the corresponding implementation
This allow us to handle incoming stream of bytes and forge them to respond to
connected clients. Let’s move one.
Back to src/mqtt.c
After the creation of pack module we should include it into the mqtt source:
The first step will be the implemetation of the Fixed Header Remaning Length
functions. The MQTT documentation suggests a pseudo-code implementation in one
of the first paragraphs, we’ll stick to that, it’s fairly simple and clear.
We’ll see why and how after the first byte of the Fixed Header, the next 1 or
2 or 3 or 4 bytes are used to encode the remainig bytes of the packet.
The Remaining Length is the number of bytes remaining within the current
packet, including data in the variable header and the payload. The Remaining
Length does not include the bytes used to encode the Remaining Length.
The Remaining Length is encoded using a variable length encoding scheme which
uses a single byte for values up to 127. Larger values are handled as
follows. The least significant seven bits of each byte encode the data, and
the most significant bit is used to indicate that there are following bytes
in the representation. Thus each byte encodes 128 values and a “continuation
bit”. The maximum number of bytes in the Remaining Length field is four.
No need for further explanation, the MQTT documentation is crystal clear.
Now we can read the first header byte and the total length of the packet. Let’s
move on with the unpacking of the CONNECT packet.
It’s the packet with more flags and the second one in length behind only the
It consists in:
A fixed header with MQTT Control packet type 1 on the first 4 most
significant bits (MSB from now on) and unused (reserved for future
implementation) for the 4 least significant bits (LSB).
The Remaining Length of the packet
The Variable Header which consists of four fields:
The Protocol Name is a UTF-8 encoded string that represents the protocol name
“MQTT”, capitalized. The string, its offset and length will not be changed by
future versions of the MQTT specification.
For version 3.1.1 the Protocol Name is ‘M’ ‘Q’ ‘T’ ‘T’, 4 bytes in total, we
will ignore for now what is the name for older versions.
Connect flags byte contains some indications on the behaviour of the client and
the presence or absence of fields in the payload:
The last bit is reserved for future implementations. All other flags are
intended as booleans, on the payload part of the packet, according to those
flags there are also the corresponding fields, so let’s say we have username
and password at true, on the payload we’ll find a 2 bytes field representing
username length, followed by the username itself, and the same for the password.
To clarify the concept, let’s suppose we receive a CONNECT packet with:
username and password flag to 1
username = “hello”
password = “nacho”
client ID = “danzan”
offset (byte position)
1 (4 bits)
32 bytes length, being it < 127 bytes, it requires only 1 byte
Protocol name length
4 bytes length
Protocol name (MQTT)
‘M’ ‘Q’ ‘T’ ‘T’
For version 3.1.1 the level is 4
Username, password, will retain, will QoS, will flag, clean session
16-bit word, maximum value is 18 hr 12 min 15 seconds
Client ID length
2 bytes, 6 is the length of the Client ID (danzan)
‘d’ ‘a’ ‘n’ ‘z’ ‘a’ ‘n’
2 bytes, 5 is the length of the username (hello)
‘h’ ‘e’ ‘l’ ‘l’ ‘o’
2 bytes, 5 is the length of the password (nacho)
‘n’ ‘a’ ‘c’ ‘h’ ‘o’
Having will flags to 0 there’s no need to decode those fields, as they don’t
even appear. We have as a result a total length 34 bytes packet including fixed
The PUBLISH packet now:
Packet identifier MSB and LSB are present in the packet if and only if the QoS
level is > 0, with a QoS set to at most once there’s no need for a packet ID.
Payload length is calculated by subtracting the Remaining Length with all the
other fields already unpacked.
Subscribe and unsubscribe packets are fairly similar, they reflect the
PUBLISH packet, but for payload they have a list of tuple consisting in a
pair (topic, QoS). Their implementation is practically identical, with the
only difference in the payload part, where UNSUBSCRIBE doesn’t specify QoS
for each topic.
And finally the ACK. In MQTT doesn’t exists generic acks, but the structure
of most of the ack-like packets is practically the same for each one, formed by
a header and a packet ID.
We have now all needed helpers functions to implement our only exposed function
on the header mqtt_mqtt_packet. It ended up being a fairly short and simple
function, we’ll use a static array to map all helper functions, making it O(1)
the selection of the correct unpack function based on the Control Packet type.
To be noted that in case of a disconnect, a pingreq or pingresp packet we only
need a single byte, with remaining length 0.
The first part ends here, at this point we have two modules, one of utility for
general serialization operations and one to handle the protocol itself accordingly
to the standard defined by OASIS.