Sol - An MQTT broker from scratch. Part 1 - The protocol
posted on 2019 Mar 03
It’s been a while that for my daily work I deal with IoT architectures and
research best patterns to develop such systems, including diving through
standards and protocols like MQTT;
as I always been craving for new ideas to learn and refine my programming
skills, I thought that going a little deeper on the topic would’ve been cool
and useful too. So once again I git init a low-level project on my box
pushing myself a little further by sharing my steps.
Sol will be a C project, a super-simple MQTT broker targeting Linux
platform which will support version 3.1.1 of the protocol, skipping older
versions for now, very similar to a lightweight Mosquitto (which is already a
lightweight piece of software anyway), and with the abundance of MQTT clients
out there, testing will be fairly easy. The final result will hopefully serve
as a base for something more clean and with more features, what we’re going to
create have to be considered a minimal implementation, an MVP. As a side note,
the name decision is a 50/50 for the elegance i feel for short names and the
martian day (The Martian docet). Or maybe it stands for Scrappy Ol’
Loser. Tastes.
Note: the project won’t compile till the very end of the series, following
all steps, to test single parts and modules I suggest to provide a main by
yourself and stop to make experiments, change parts etc.
Step by step, I usually init my C projects in order to have all sources
in a single folder:
Here the repository on GitHub.
I’ll try to describe step by step my journey into the development of the software,
without being too verbose, and listing lot of code directly with brief
explanations of its purpose. The best way is still to write it down,
compile and/or play/modify it.
This will be a series of posts, each one tackling and mostly implementing a single
concept/module of the project:
Part 1 - Protocol, lays the foundations to handle the MQTT protocol packets
I’d like to underline that the resulting software will be a fully functioning
broker, but with large space for improvements and optimization as well as code
quality improvements and probably, with some hidden features as well (aka bugs
:P).
General architecture
In essence a broker is a middleware, a software that accepts input from
multiple clients (producers) and forward it to a set of recipient clients
(consumers) using an abstraction to define and manage these groups of clients
in the form of a channel, or topic, as it’s called by the protocol standards.
Much like an IRC channel or equivalent in a generic chat, each consumer client
can subscribe to topics in order to receive all messages published by other
clients to those topics.
The first idea coming to mind is a server built on top of a data structure
of some kind that allow to easily manage these topics and connected
clients, being them producers or consumers. Each message received by a client
must be forwarded to all other connected clients that are subscribed to the
specified topic of the message.
Let’s try this way, a TCP server and a module to handle binary communication
through the wire. There are many ways to implement a server, threads, fork
processes and multiplexing I/O, which is the way I’d like to explore the most.
We’ll start with a single-threaded multiplexing I/O server, with the possibility
on future to scale it out using threads, epoll interface for multiplexing in
fact is thread-safe by implementation.
The MQTT protocol
First of all, we have to model some structures to handle MQTT packets, by
following specifications on the official
documentation.
Starting from the opcode table and the MQTT header, according to the docs,
every packet consists of 3 parts:
Fixed Header (mandatory)
Variable Header (optional)
Payload (optional)
The Fixed Header part consists of the first byte for command type and flags,
and a second to fifth byte to store the remaining length of the packet.
Flags are not all mandatory, just the 4 bits block MQTT Control Type, the
others are:
Dup flag, used when a message is sent more than one time
QoS level, can be AT_MOST_ONCE, AT_LEAST_ONCE and EXACTLY_ONCE, 0,
1, 2 respectively
Retain flag, if a message should be retained, in other words, when a
message is published on a topic, it is saved and future connecting clients
will receive it. It can be updated with another retained message.
So fire up Vim (or your favourite editor) and start writing mqtt.h header
file containing the Control Packet Types and a struct to handle the Fixed
Header:
src/mqtt.h
The first 2 #define refers to fixed sizes of the MQTT Fixed Header and of
every type of MQTT ACK packets, set for convenience, we’ll use those later.
As shown, we leverage the union, a value that may have any of several
representations withing the same position in memory, to represent a byte. In
other words, inside unions, in contrast to normal struct, there can be only
one field with a value. Their position in memory are shared, this way using
bitfields we can effectively manipulate single bits or portions of a byte.
The first Control Packet we’re going to define is the CONNECT. It’ s the first
packet that must be sent when a client establish a new connection and it must
be exactly one, more than one CONNECT per client must be treated as a
violation of the protocol and the client must be dropped.
For each CONNECT, a CONNACK packet must be sent in response.
src/mqtt.h
From now on, the definition of other packets are trivial by reproducing the
pattern, accordingly to the documentation of MQTT v3.1.1.
We proceed with SUBSCRIBE, UNSUBSCRIBE and PUBLISH. SUBSCRIBE is the
only packet with a dedicated packet definition SUBACK, the other can be
defined as generic ACK, and type-named using typedef for semantic
separation.
src/mqtt.h
The remaining ACK packets, namely:
PUBACK
PUBREC
PUBREL
PUBCOMP
UNSUBACK
PINGREQ
PINGRESP
DISCONNECT
can be obtained by typedef’ing struct ack, just for semantic separation of
concerns. The last one, DISCONNECT, is not really an ACK but the format is
the same.
src/mqtt.h
We can finally define a generic MQTT packet as a union of the previously
defined packets.
src/mqtt.h
We proceed now with the definition of some public functions, here in the header
we want to collect only those functions and structure that should be used by
other modules.
To handle the communication using the MQTT protocol we need essentially 4 functions,
2 for each direction of the interaction between server and client:
A packing function (serializing or marshaling, I won’t dive here in a
dissertation on the correct usage of these terms)
An unpacking function (deserializing/unmarshaling)
Supported by 2 functions to handle the encoding and decoding of the
Remaining Length in the Fixed Header part.
src/mqtt.h
We also add some utility functions to build packets and to release
heap-alloc’ed ones, nothing special here.
src/mqtt.h
Fine. We have a decent header module that define all that we need for handling
the communication using the protocol. Let’s now implement those functions.
First of all we define some “private” helpers, to pack and unpack each MQTT
packet, these will be called by the previously defined “public” functions
unpack_mqtt_packet and pack_mqtt_packet.
src/mqtt.c
Packing and unpacking
Before continuing with the implementation of all defined functions on
src/mqtt.h, we need to implement some helpers functions to ease the pack and
unpack process of each received packet and also ready for send forged MQTT
packet.
Let’s go fast here, it’s just simple serialization/deserialization respecting
the network byte order (endianness, usually network byte order refers to
Big-endian order, while the majority of machines follow Little-endian
convention) of the packets.
src/pack.h
And the corresponding implementation
src/pack.c
This allow us to handle incoming stream of bytes and forge them to respond to
connected clients. Let’s move one.
Back to mqtt module
After the creation of pack module we should include it into the mqtt source:
src/mqtt.c
The first step will be the implementation of the Fixed Header Remaining Length
functions. The MQTT documentation suggests a pseudo-code implementation in one
of the first paragraphs, we’ll stick to that, it’s quiet simple and clear.
We’ll see why and how after the first byte of the Fixed Header, the next 1 or
2 or 3 or 4 bytes are used to encode the remaining bytes of the packet.
The Remaining Length is the number of bytes remaining within the current
packet, including data in the variable header and the payload. The Remaining
Length does not include the bytes used to encode the Remaining Length.
The Remaining Length is encoded using a variable length encoding scheme which
uses a single byte for values up to 127. Larger values are handled as
follows. The least significant seven bits of each byte encode the data, and
the most significant bit is used to indicate that there are following bytes
in the representation. Thus each byte encodes 128 values and a “continuation
bit”. The maximum number of bytes in the Remaining Length field is four.
No need for further explanation, the MQTT documentation is crystal clear.
src/mqtt.c
Now we can read the first header byte and the total length of the packet. Let’s
move on with the unpacking of the CONNECT packet.
It’s the packet with more flags and the second one in length behind only the
PUBLISH packet.
It consists in:
A fixed header with MQTT Control packet type 1 on the first 4 most
significant bits (MSB from now on) and unused (reserved for future
implementation) for the 4 least significant bits (LSB).
The Remaining Length of the packet
The Variable Header which consists of four fields:
Protocol Name
Protocol Level
Connect Flags
Keep Alive
The Protocol Name is a UTF-8 encoded string that represents the protocol name
“MQTT”, capitalized. The string, its offset and length will not be changed by
future versions of the MQTT specification.
For version 3.1.1 the Protocol Name is ‘M’ ‘Q’ ‘T’ ‘T’, 4 bytes in total, we
will ignore for now what is the name for older versions.
Connect flags byte contains some indications on the behaviour of the client and
the presence or absence of fields in the payload:
Username flag
Password flag
Will retain
Will QoS
Will flag
Clean Session
The last bit is reserved for future implementations. All other flags are
intended as booleans, on the payload part of the packet, according to those
flags there are also the corresponding fields, so let’s say we have username
and password at true, on the payload we’ll find a 2 bytes field representing
username length, followed by the username itself, and the same for the password.
To clarify the concept, let’s suppose we receive a CONNECT packet with:
username and password flag to 1
username = “hello”
password = “nacho”
client ID = “danzan”
Field
size (bytes)
offset (byte position)
Description
Packet type
1 (4 bits)
0
Connect type
Length
1
1
32 bytes length, being it < 127 bytes, it requires only 1 byte
Protocol name length
2
2
4 bytes length
Protocol name (MQTT)
4
4
‘M’ ‘Q’ ‘T’ ‘T’
Protocol level
1
8
For version 3.1.1 the level is 4
Connect flags
1
9
Username, password, will retain, will QoS, will flag, clean session
Keepalive
2
10
16-bit word, maximum value is 18 hr 12 min 15 seconds
Client ID length
2
12
2 bytes, 6 is the length of the Client ID (danzan)
Client ID
6
14
‘d’ ‘a’ ‘n’ ‘z’ ‘a’ ‘n’
Username length
2
20
2 bytes, 5 is the length of the username (hello)
Username
5
22
‘h’ ‘e’ ‘l’ ‘l’ ‘o’
Password length
2
27
2 bytes, 5 is the length of the password (nacho)
Password
5
29
‘n’ ‘a’ ‘c’ ‘h’ ‘o’
Having will flags to 0 there’s no need to decode those fields, as they don’t
even appear. We have as a result a total length 34 bytes packet including fixed
header.
src/mqtt.c
The PUBLISH packet now:
Packet identifier MSB and LSB are present in the packet if and only if the QoS
level is > 0, with a QoS set to at most once there’s no need for a packet ID.
Payload length is calculated by subtracting the Remaining Length with all the
other fields already unpacked.
src/mqtt.c
Subscribe and unsubscribe packets are fairly similar, they reflect the
PUBLISH packet, but for payload they have a list of tuple consisting in a
pair (topic, QoS). Their implementation is practically identical, with the
only difference in the payload part, where UNSUBSCRIBE doesn’t specify QoS
for each topic.
src/mqtt.c
And finally the ACK. In MQTT doesn’t exists generic acks, but the structure
of most of the ack-like packets is practically the same for each one, formed by
a header and a packet ID.
These are:
PUBACK
PUBREC
PUBREL
PUBCOMP
UNSUBACK
src/mqtt.c
We have now all needed helpers functions to implement our only exposed function
on the header mqtt_mqtt_packet. It ended up being a fairly short and simple
function, we’ll use a static array to map all helper functions, making it O(1)
the selection of the correct unpack function based on the Control Packet type.
To be noted that in case of a disconnect, a pingreq or pingresp packet we only
need a single byte, with remaining length 0.
src/mqtt.c
The first part ends here, at this point we have two modules, one of utility for
general serialization operations and one to handle the protocol itself accordingly
to the standard defined by OASIS.
Just git commit and git push. Cya.
Part-2 will deal with the
networking utilities needed to setup our communication layer and thus the server.