Sol - An MQTT broker from scratch. Part 2 - Networking
posted on 2019 Mar 04
Let’s continue from where we left, in the part 1
we defined and roughly modeled the MQTT v3.1.1 protocol and our src/mqtt.c
module has now all unpacking functions, we must add the remaining build helpers
and the packing functions to serialize packet for output.
As a side note we’re probably not going to write perfect and efficient
memory-wise code, but again, premature optimization is the root of all evil,
there will be plenty of time to improve the quality of the software with future
revisions.
Build, pack and send.
For now we only need CONNACK, SUBACK and PUBLISH packet builder, the
other ACK like packets can be created at the same manner with a single
function, that’s why the use of typedef for different ack codes.
union mqtt_header *mqtt_packet_header(unsigned char) will cover packet
Fixed Header as well as PINGREQ, PINGRESP and DISCONNECT packets
struct mqtt_ack *mqtt_packet_ack(unsigned char, unsigned short) will be
used to build:
PUBACK
PUBREC
PUBREL
PUBCOMP
UNSUBACK
The remaining packets will have a dedicated function. There’s probably better
ways to reuse code and to model this but for now let’s stick to something
that works, as previously stated time to optimize and refactor will come.
src/mqtt.c
We move on to packing functions now, essentially they reflect unpacking ones,
but acting in the opposite direction: We start from structs and unions to build a
bytearray, ready to be written out over a socket.
A sure thing to underline, where you see functions returning pointer to static
structure, this trick is OK with little structure that can be instantiated on
the stack and on a single-thread context. Passing to a multithreaded
environment this approach will surely bite us in the ass, cause every pointer
to packet built like this will point to the same area of memory, causing
conflicts and undefined results (different from dreadful undefined behaviour,
the ultimate frightening beast of C/C++), so for future improvements it will
probably better to refactor these parts to malloc some bytes for these
structures.
Let’s map them like we’ve done before with the unpacking functions, an array
where position reflects the packet type. To make the source a little more
concise we could group pack and unpack handlers into a structure, so it’ll be
possible to use a single array as they share the same positions.
src/mqtt.c
The server
The server we’re gonna create will be a single-threaded TCP server with
multiplexed I/O by using epoll interface. Epoll is the last multiplexing
mechanism after select and poll added with kernel 2.5.44, and the most
performant with high number of connection, it’s counterpart for BSD and
BSD-like (Mac OSX) systems is kqueue.
We’re gonna need some functions to manage our socket descriptors.
src/network.h
Just some well-known helper functions to create and bind socket to listen for
new connections and to set socket in non-blocking mode (a requirement to use
epoll multiplexing at his best).
I don’t like to have to manage all streams of bytes incoming to and exiting
from the host, this two functions never fail to appear in every C codebase
regarding TCP communication:
ssize_t send_bytes(int, const unsigned char *, size_t) used to send all
bytes out at once in while loop till no bytes left, by handling EAGAIN and
EWOUDLBLOCK error codes
ssize_t recv_bytes(int, unsigned char *, size_t), read an arbitrary number
of bytes in a while loop, again handling correctly EAGAIN and EWOUDLBLOCK
error codes
src/network.h
And the implementation on network.c. Omitting includes to spare some space.
src/network.c
Basic closure system
To make more easy and comfortable the usage of the epoll API,with this
project requiring not so complex operations to handle, I built a simple
abstraction on top of the multiplexing interface to make it possible to
register callback functions that will be executed on events happening.
There’re a lot of examples of using epoll on the web, the majority of them just
show the basic usage, where we register a set of socket descriptors and start a
loop to monitor them for incoming events, each time a descriptor is ready for
reading or writing, a function is called to make use of them, which is surely a
neat implementation, but a bit constrained. The solution I decided to use,
leverage the union epoll_data:
As shown, there is a void *, an int commonly used to store the descriptor we
were talking about and two integer of different size. I preferred to use a
custom structure with the descriptor inside and some other context fields,
specifically a function pointer and its optional arguments. We’ll register a
pointer to this structure passing it to the pointer void *ptr. This way,
every time an event occur, we’ll have access to the very same structure pointer
we registered, including the file descriptor associated.
There’s two types of callback which can be defined, the common ones, that will
be triggered with events and the periodic ones, that will be executed
automatically every tick of time interval defined. So let’s wrap the epoll loop
into a dedicated structure, we’ll do the same for the callback functions,
defining a structure with some fields useful for the execution of the
callback.
Sequential diagram, for each cycle of epoll_wait on incoming events
We’re going to declare two structures and a function pointer:
struct evloop, a wrapper around the epoll instance, encapsulating all
needed properties
struct closure which abstract a callback and a sort of context with
arguments and a serialized payload of the results
void callback(struct evloop *, void *), the heart of the closure, it’s
the prototype of the function we’re gonna pass as callback.
Plus, we’ll declare and implement on the .c file some creation, delete and
managing functions.
src/network.h
After some declarations on the header for network utility we can move on to the
implementation of the functions.
We start with simple creation, init and deletion of the previously declared
structure evloop, consisting in a file descriptor for the epoll loop, a
number of events to monitor, a timeout in milliseconds that defines the time
we’ll block the loop, the status of the loop (will probably contain error codes
for faulting cases) and finally a dynamic array of periodic tasks that will be
executed.
src/network.c
Now, epoll API is extensively documentated on its manpage, but we’ll need 3
functions to add, remove and modify monitored descriptors and trigger events,
using EPOLLET flag, in order to use epoll on edge-triggered behaviour (the
default one is Level-triggered, see
manpage and avoid in a
future multithreaded implementation to wake up all threads at once every time
a new event is triggered and one or more descriptor are ready to read or write
(thundering herd problem), but this is another story, also this explained
clearly on the man page.
src/network.c
Two things to be noted:
First, as previously stated, the main structure epoll_event contains a
union epoll_data inside, which accept a file descriptor or a void *
pointer. We’ll use the latter, this way we’ll be able to pass around more
informations and use our custom closure, the file descriptor will be stored
inside the structure pointed.
Second, our add and mod functions accepts as third parameters a set of
events, mostly EPOLLIN or EPOLLOUT, but they add EPOLLONESHOT to them,
in other words after an event if fired for a descriptor, that descriptor will
be disabled, until manually rearmed.
This way every time an event is triggered, the descriptor must be manually
rearmed for read or write events.This is done to maintain some degree of
control on low level events triggering and to left an open door in case of
future multithreaded implementation, this great
article
explains wonderfully the advantages (or the broken parts) of the epoll and
why it’s better to use EPOLLONESHOT flag.
We move forward now to implement the basic closure system and the wait loop for
read and write events, as well as periodic timed callbacks.
src/network.c
Of all defined functions, evloop_wait is the most interesting, start an
epoll_wait loop and after error check, it proceeds to apply the callback
registered with that fd, differentiating from periodic task auto-triggered on
time-basis or normal callback for read/write events.
The codebase is growing, we have added another module, currently it should look
like this:
The part 3 awaits for implementation of the server module.