Init 1

Category: unix

Time-series adventures

posted on 2024 Apr 13

Databases, how they interact with the filesystem at low-level, have always represented a fascinating topic for me. I implemented countless in-memory key-value stores at various level of abstraction and various stages of development; with multiple languages, Scala, Python, Elixir, Go. Gradually going more and more in details, it was finally time to try and write something a bit more challenging.

A relational database was too big of a step to begin with and being the persistence the very first big problem I could think of facing almost immediately, I wanted something that could be implemented based on a simple but extremely versatile and powerful structure, the log. I wanted something that could be implemented on top of a this simple concept, in my mind the system should’ve basically been a variation of a Kafka commit log, but instead of forwarding binary chunks to connected consumers (sendfile and how Kafka works internally is another quite interesting topic I explored and I’d like to dig into a bit more in the future); a time-series seemed interesting and fitting my initial thoughts.

The programming language to write it in was the next decision to make, I considered a couple of choices:

  • Elixir/Erlang - a marvel, fantastic back-end language, the BEAM is a piece of art, I use Elixir daily at work though so I craved for something of a change my scenery
  • Go - great design, simplicity and productivity like no others, also a bit boring too
  • Rust - different beast from anything else, requires discipline and perseverance to retain productivity, I like it, but for it’s very nature and focus on safety, I always feel kinda constrained and fighting the compiler more than the problem I’m solving (gotta put some more effort into it to make it click completely). Will certainly pick it up again for something else in the future.
  • C++ - Nope, this was a joke, didn’t consider this at all

Eventually I always find myself coming back to a place of comfort with C. I can’t really say the reason, it’s a powerful tool with many dangers and when I used it for my toy projects in the past, I almost never felt enough confidence to say “this could to be used in a prod environment”. I fought with myself on it for many years, learning Rust, flirting with Go, but I eventually gave up and embraced my comfort place. They’re great languages, I used Go for work for a while and Rust seems a good C++ replacement most of time, provided you use it with consistency (borrow checker and lifetimes can be hellish to deal and re-learn if out of shape).

With C, I reckon it all boils down to it’s compactness, conceptually simple and leaving very little to imagination of what’s happening under the hood. I’m perfectly conscious how easy it is to introduce memory-related bugs, and implementing the same dynamic array for each project can get old pretty fast; but what matters to me, ultimately, is having fun, and C provides me with that. Timeseries have always been a fascinating topic for me

Design

Conceptually it’s not really an efficient or particularly smart architecture, I approached the implementation on a best-effort way, the idea being to delay as much as possible the need of non-basic data-structures such as arrays.

The two main segments are just fixed size arrays where each position stores a pointer to the first position of a dynamic array. This to be able to store all the data points included in the time range that the segment covers

Segments

Each time a new record is to be inserted, first it is appended as a new entry in the Write-Ahead-Log (WAL) and only then it is store in the in-memory segment where it belongs. The WAL acts as a disaster recovery policy, it is solely responsible for storing incoming records in order to be able to read them and re-populate the segment on restarts.

Disk_1

In short:

  • two segments of 15 minutes each
    • each position represents a second, so 900 is the length of the segments
    • each second points to a dynamic array storing positions based on the microsecond portion of the timestamp of the point
  • each time a new point is inserted, it is first stored on a WAL on disk in order to be able to recover in case of crash
  • once the main segment is full, or a timestamp too far in the future is received (i.e. more than 15 minutes past the 1st point store)
    • the tail segment gets persisted on disk and becomes immutable, WAL is cleared
    • the head segment gets persisted on disk and becomes immutable, WAL is cleared
    • the head segment becomes the new tail segment
    • a new head in-memory segment is generated
  • immutable segments on disk are paired with an index file to read records from the past

Although there are plenty of improvements, fixes and other things to take care of, this function is roughly at the heart of the logic (there is a secondary logic in here for which we flush on disk also based on the WAL size, nothing of interest for the purpose of simplicity)



/*
 * Set a record in a timeseries.
 *
 * This function sets a record with the specified timestamp and value in the
 * given timeseries. The function handles the storage of records in memory and
 * on disk to ensure data integrity and efficient usage of resources.
 *
 * @param ts A pointer to the Timeseries structure representing the timeseries.
 * @param timestamp The timestamp of the record to be set, in nanoseconds.
 * @param value The value of the record to be set.
 * @return 0 on success, -1 on failure.
 */
int ts_insert(Timeseries *ts, uint64_t timestamp, double_t value) {
    // Extract seconds and nanoseconds from timestamp
    uint64_t sec = timestamp / (uint64_t)1e9;
    uint64_t nsec = timestamp % (uint64_t)1e9;

    char pathbuf[MAX_PATH_SIZE];
    snprintf(pathbuf, sizeof(pathbuf), "%s/%s/%s", BASE_PATH, ts->db_data_path,
             ts->name);

    // if the limit is reached we dump the chunks into disk and create 2 new
    // ones
    if (wal_size(&ts->head.wal) >= TS_FLUSH_SIZE) {
        uint64_t base = ts->prev.base_offset > 0 ? ts->prev.base_offset
                                                 : ts->head.base_offset;
        size_t partition_nr = ts->partition_nr == 0 ? 0 : ts->partition_nr - 1;

        if (ts->partitions[partition_nr].clog.base_timestamp < base) {
            if (partition_init(&ts->partitions[ts->partition_nr], pathbuf,
                               base) < 0) {
                return -1;
            }
            partition_nr = ts->partition_nr;
            ts->partition_nr++;
        }

        // Dump chunks into disk and create new ones
        if (partition_flush_chunk(&ts->partitions[partition_nr], &ts->prev) < 0)
            return -1;
        if (partition_flush_chunk(&ts->partitions[partition_nr], &ts->head) < 0)
            return -1;

        // Reset clean both head and prev in-memory chunks
        ts_deinit(ts);
    }
    // Let it crash for now if the timestamp is out of bounds in the ooo
    if (sec < ts->head.base_offset) {
        // If the chunk is empty, it also means the base offset is 0, we set
        // it here with the first record inserted
        if (ts->prev.base_offset == 0)
            ts_chunk_init(&ts->prev, pathbuf, sec, 0);

        // Persist to disk for disaster recovery
        wal_append_record(&ts->prev.wal, timestamp, value);

        // If we successfully insert the record, we can return
        if (ts_chunk_record_fit(&ts->prev, sec) == 0)
            return ts_chunk_set_record(&ts->prev, sec, nsec, value);
    }

    if (ts->head.base_offset == 0)
        ts_chunk_init(&ts->head, pathbuf, sec, 1);

    // Persist to disk for disaster recovery
    wal_append_record(&ts->head.wal, timestamp, value);
    // Check if the timestamp is in range of the current chunk, otherwise
    // create a new in-memory segment
    if (ts_chunk_record_fit(&ts->head, sec) < 0) {
        // Flush the prev chunk to persistence
        if (partition_flush_chunk(&ts->partitions[ts->partition_nr],
                                  &ts->prev) < 0)
            return -1;
        // Clean up the prev chunk and delete it's WAL
        ts_chunk_destroy(&ts->prev);
        wal_delete(&ts->prev.wal);
        // Set the current head as new prev
        ts->prev = ts->head;
        // Reset the current head as new head
        ts_chunk_destroy(&ts->head);
        wal_delete(&ts->head.wal);
    }
    // Insert it into the head chunk
    return ts_chunk_set_record(&ts->head, sec, nsec, value);
}

The current state

At the current stage of development, it’s still a very crude core set of features but it seems to be working as expected, with definitely many edge cases and assertions to solve; the heart of the DB is there, and can be built into a dynamic library to be used on a server. The repository can be found at https://github.com/codepr/roach.

Main features

  • Fixed size records: to keep things simple each record is represented by just a timestamp with nanoseconds precision and a double
  • In memory segments: Data is stored in time series format, allowing efficient querying and retrieval based on timestamp, with the last slice of data in memory, composed by two segments (currently covering 15 minutes of data each)
    • The last 15 minutes of data
    • The previous 15 minutes for records out of order, totaling 30 minutes
  • Commit Log: Persistence is achieved using a commit log at the base, ensuring durability of data on disk.
  • Write-Ahead Log (WAL): In-memory segments are managed using a write-ahead log, providing durability and recovery in case of crashes or failures.

What’s in the road map

  • Duplicate points policy
  • CRC32 of records for data integrity
  • Adopt an arena for memory allocations
  • Memory mapped indexes, above a threshold enable binary search
  • Schema definitions

Timeseries library APIs

  • tsdb_init(1) creates a new database
  • tsdb_close(1) closes the database
  • ts_create(3) creates a new Timeseries in a given database
  • ts_get(2) retrieve an existing Timeseries from a database
  • ts_insert(3) inserts a new point into the Timeseries
  • ts_find(3) finds a point inside the Timeseries
  • ts_range(4) finds a range of points in the Timeseries, returning a vector with the results
  • ts_close(1) closes a Timeseries

Writing a Makefile

A simple Makefile to build the library as a .so file that can be linked to any project as an external lightweight dependency or used alone.



CC=gcc
CFLAGS=-Wall -Wextra -Werror -Wunused -std=c11 -pedantic -ggdb -D_DEFAULT_SOURCE=200809L -Iinclude -Isrc
LDFLAGS=-L. -ltimeseries

LIB_SOURCES=src/timeseries.c src/partition.c src/wal.c src/disk_io.c src/binary.c src/logging.c src/persistent_index.c src/commit_log.c
LIB_OBJECTS=$(LIB_SOURCES:.c=.o)

libtimeseries.so: $(LIB_OBJECTS)
	$(CC) -shared -o $@ $(LIB_OBJECTS)
%.o: %.c
	$(CC) $(CFLAGS) -fPIC -c $< -o $@
clean:
	@rm -f $(LIB_OBJECTS) libtimeseries.so

Building the library is a simple thing now, just a single command make, to link it to a main, just a one liner

gcc -o my_project main.c -I/path/to/timeseries/include -L/path/to/timeseries -ltimeseries

LD_LIBRARY_PATH=/path/to/timeseries.so ./my_project to run the main, a basic example of interaction with the library



#include "timeseries.h"

int main() {
    // Initialize the database
    Timeseries_DB *db = tsdb_init("testdb");
    if (!db)
        abort();
    // Create a timeseries, retention is not implemented yet
    Timeseries *ts = ts_create(db, "temperatures", 0, DP_IGNORE);
    if (!ts)
        abort();
    // Insert records into the timeseries
    ts_insert(&ts, 1710033421702081792, 25.5);
    ts_insert(&ts, 1710033422047657984, 26.0);
    // Find a record by timestamp
    Record r;
    int result = ts_find(&ts, 1710033422047657984, &r);
    if (result == 0)
        printf("Record found: timestamp=%lu, value=%.2lf\n", r.timestamp, r.value);
    else
        printf("Record not found.\n");
    // Release the timeseries
    ts_close(&ts);
    // Close the database
    tsdb_close(db);

    return 0;
}

A server draft

Event based server (rely on ev at least initially), TCP as the main transport protocol, text-based custom protocol inspired by RESP but simpler:

  • $ string type
  • ! error type
  • # array type
  • : integer type
  • ; float type
  • \r\n delimiter

With the following encoding:

<type><length>\r\n<payload>\r\n

For example a simple hello string would be

$5\r\nHello\r\n

Simple query language

Definition of a simple, text-based format for clients to interact with the server, allowing them to send commands and receive responses.

Basic outline

  • Text-Based Format: Use a text-based format where each command and response is represented as a single line of text.
  • Commands: Define a set of commands that clients can send to the server to perform various operations such as inserting data, querying data, and managing the database.
  • Responses: Define the format of responses that the server sends back to clients after processing commands. Responses should provide relevant information or acknowledge the completion of the requested operation.

Core commands

Define the basic operations in a SQL-like query language

  • CREATE creates a database or a timeseries

    CREATE <database name>

    CREATE <timeseries name> INTO <database name> [<retention period>] [<duplication policy>]

  • INSERT insertion of point(s) in a timeseries

    INSERT <timeseries name> INTO <database name> <timestamp | *> <value>, ...

  • SELECT query a timeseries, selection of point(s) and aggregations

    SELECT <timeseries name> FROM <database name> AT/RANGE <start_timestamp> TO <end_timestamp> WHERE value [>|<|=|<=|>=|!=] <literal> AGGREGATE [AVG|MIN|MAX] BY <literal>

  • DELETE delete a timeseries or a database

    DELETE <database name> DELETE <timeseries name> FROM <database name>

Flow:

  1. Client Sends Command: Clients send commands to the server in the specified text format.

  2. Server Parses Command: The server parses the received command and executes the corresponding operation on the timeseries database.

  3. Server Sends Response: After processing the command, the server sends a response back to the client indicating the result of the operation or providing requested data.

  4. Client Processes Response: Clients receive and process the response from the server, handling success or error conditions accordingly.

Sol - An MQTT broker from scratch. Refactoring & eventloop

posted on 2019 Sep 25

UPDATE: 2020-02-07

In the previous 6 parts we explored a fair amount of common CS topics such as networks and data structures, the little journey ended up with a bugged but working toy to play with.

Sol - An MQTT broker from scratch. Part 6 - Handlers

posted on 2019 Mar 08

This part will focus on the implementation of the handlers, they will be mapped one-on-one with MQTT commands in an array, indexed by command type, making it trivial to call the correct function depending on the packet type.

Sol - An MQTT broker from scratch. Part 5 - Topic abstraction

posted on 2019 Mar 08

In the Part 4 we explored some useful concepts and implemented two data structures on top of those concepts.

Sol - An MQTT broker from scratch. Part 4 - Data structures

posted on 2019 Mar 07

Before proceeding to the implementation of all command handlers, we’re going to design and implement some of the most common data structures needed to the correct functioning of the server, namely hashtable, list and a trie.

Sol - An MQTT broker from scratch. Part 3 - Server

posted on 2019 Mar 06

This part deal with the implementation of the server part of our application, by using the network module we drafted on part-2 it should be relative easy to handle incoming commands from a MQTT clients respecting 3.1.1 standards as we defined on part 1.

Sol - An MQTT broker from scratch. Part 2 - Networking

posted on 2019 Mar 04

Let’s continue from where we left, in the part 1 we defined and roughly modeled the MQTT v3.1.1 protocol and our src/mqtt.c module has now all unpacking functions, we must add the remaining build helpers and the packing functions to serialize packet for output.

Sol - An MQTT broker from scratch. Part 1 - The protocol

posted on 2019 Mar 03

It’s been a while that for my daily work I deal with IoT architectures and research best patterns to develop such systems, including diving through standards and protocols like MQTT;