Write-Ahead Logs (WAL) für Commands mit Bbolt in Go

In der Serie habe ich eine Finite State Machine (FSM) implementiert, die Commands verarbeitet und den Zustand in Snapshots speichert. Änderungen zwischen zwei Snapshots gehen aktuell verloren, wenn der Server abstürzt.

Die FSM lege ich für diesen Artikel beiseite und entwickle ein Write Ahead Log (WAL), der später im Service alle Commands speichert, die an die FSM gesendet werden und somit auch unabhängig von der FSM funktioniert.

Events/Commands speichern

Ein Write-Ahead Log (WAL) ist ein System, bei dem neue Commands fortlaufend an eine Logdatei angehängt werden. Alle Nachrichten werden durch nummeriert und sind so wieder auffindbar. Um sicherzustellen, dass nichts kaputt geht, wenn der Server z.B. während des Schreibens abstürzt, ist allerdings etwas Komplexität nötig. Diese Komplexität überspringe ich hier und verwende stattdessen die Etcd bbolt Datenbank, die auch in einem zentralen Bestandteil von Kubernetes verwendet wird und sehr stabil ist.

Bbolt ist eine sehr schnelle Key-Value-Datenbank, die das Schreiben und Lesen von Daten selbst verwaltet. Der Key wird unsere Command ID sein und das Value die Nachricht selbst.

import bolt "go.etcd.io/bbolt"

db, err := bolt.Open("journal.db", 0600, nil)
if err != nil {
  return err
}
defer db.Close()

In Bbolt werden Daten in sogenannten Buckets organisiert, die vergleichbar mit Tabellen in einer relationalen Datenbank sind. Zum Start muss der Bucket erstellt werden, wenn er noch nicht existiert. Dafür eröffne ich eine Transaktion und erstelle den Bucket.

db.Update(func(tx *bolt.Tx) error {
    _, err := tx.CreateBucketIfNotExists([]byte("log"))
    return err
})

Angelehnt an die Figma Architektur nenne ich meinen Command Log Journal. Dieses beinhaltet alle Commands, die empfangen wurden. Die, im vorherigen Teil, erstellten Snapshots enthalten den Zustand bis zu einem bestimmten Command. Ab diesem Zeitpunkt kann ich mit der zuletzt verarbeiteten ID alle neueren Commands aus dem Journal anwenden.

Für das Journal erstelle ich eine Struktur, die Bbolt kapselt.

package journal

import (
    "go.etcd.io/bbolt"
)

type Journal struct {
    db *bbolt.DB
}

func NewJournal(path string) (*Journal, error) {
	db, err := bbolt.Open(path, 0600, nil)
	if err != nil {
		return nil, err
	}

	// Create the journal bucket if it doesn't exist
	err = db.Update(func(tx *bbolt.Tx) error {
		_, err := tx.CreateBucketIfNotExists([]byte("log"))
		return err
	})
	if err != nil {
		return nil, err
	}

	// Initialize the Journal
	return &Journal{db: db}, nil
}

// Close the database
func (j *Journal) Close() error {
    return j.db.Close()
}

Commands in das Journal schreiben

Für das Schreiben in das Journal gibt es eine praktische Bbolt Funktion, die automatisch in einem Bucket hochzählt und so eine eindeutige ID für jeden neuen Eintrag generiert. Diese ID kann dann auch in der FSM verwendet werden, um den Zustand nachzuvollziehen. Diese ID bestimmt die Reihenfolge. Als Alternative wäre es möglich, wie bei Redis Streams, einen Timestamp-Sequence oder etwas ganz eigenes zu verwenden.

func (j *Journal) Apply(cmd *Command) error {
	return j.db.Update(func(tx *bbolt.Tx) error {
		bucket := tx.Bucket([]byte("log"))
		if bucket == nil {
			return fmt.Errorf("bucket not found")
		}

		// Get the next sequence ID
		id, err := bucket.NextSequence()
		if err != nil {
			return err
		}

		cmd.CommandID = id

		// Marshal the command to JSON or any other format
		data, err := json.Marshal(cmd)
		if err != nil {
			return err
		}

		// Store the command in the bucket with the ID as key
		return bucket.Put([]byte(fmt.Sprintf("%d", id)), data)
	})
}

Vor dem schreiben des Commands setze ich die CommandID, die automatisch von Bbolt generiert wird. Diese ID kann später in der FSM verwendet werden, um den Zustand zu verfolgen und Snapshots zu erstellen. Da in unserem Beispiel der letzte Command immer gewinnt, legt der Server die Reihenfolge der Commands fest und kann diese nummerieren.

Bbolt ist agnostisch bezüglich des Datenformats. Wenn Geschwindigkeit oder Speicherplatz wichtig sind, kann hier z.B. auch ein Protobuf- oder MessagePack-Format verwendet werden.

Commands aus dem Journal lesen

Das Journal ist damit im Prinzip fertig, da die Commands in der richtigen Reihenfolge gespeichert werden. Um die Commands zu lesen, öffne ich eine Transaktion und iteriere über alle Einträge im Bucket. Dabei kann ich die CommandID und den Command selbst auslesen. Der große Vorteil von Bbolt ist, dass die Keys automatisch aufsteigend sortiert sind. Ich kann bei einer bestimmten CommandID anfangen und alle neueren Commands lesen, die nach diesem Command eingegangen sind.

Damit ich nicht alle Commands im Speicher halten muss, gebe ich eine Callback-Funktion an, die für jeden Command aufgerufen wird.

func (j *Journal) ReadCommandsAfter(commandID uint64, fn func(*Command) error) error {
	startKey := fmt.Sprintf("%d", commandID+1)Eigent

	return j.db.View(func(tx *bbolt.Tx) error {
		bucket := tx.Bucket([]byte("log"))
		if bucket == nil {
			return fmt.Errorf("bucket not found")
		}

		c := bucket.Cursor()
		for k, v := c.Seek([]byte(startKey)); k != nil; k, v = c.Next() {
			var cmd Command
			if err := json.Unmarshal(v, &cmd); err != nil {
				return fmt.Errorf("failed to unmarshal command: %v", err)
			}

			// Call the provided function with the command
			if err := fn(&cmd); err != nil {
				return err // Stop iteration if the function returns an error
			}
		}

		return nil
	})
}

Mit der Funktion Cursor().Seek(startKey) springe ich direkt zum gewünschten Key oder zum nächsten größeren Key. Dadurch kann ich alle Commands ab einem bestimmten CommandID lesen, ohne alle vorherigen Commands zu laden. Mit c.Next() iteriere ich dann über alle weiteren Commands.

Anwendung

Die Anwendung des Journals ist einfach. Ich erstelle ein neues Journal und wende Commands an und kann sie später lesen.

package main

func main() {
    journal, err := journal.NewJournal("journal.db")
    if err != nil {
        panic(err)
    }
    defer journal.Close()

    // Beispiel Command
    cmd := &Command{
        Operation: CreateOperation,
        TargetNodeID: "root",
        NewNodeID: "node1",
    }

    // Command anwenden und ins Journal schreiben
    if err := journal.Apply(cmd); err != nil {
        panic(err)
    }

    // Der Command hat jetzt eine CommandID, die automatisch von Bbolt generiert wurde.
    fmt.Printf("Command applied with ID: %d\n", cmd.CommandID)

    // Send command to FSM...
}

Wenn der Dienst neu startet, kann ich wie bisher den FSM-Snapshot laden. Anschließend kann ich die zuletzt verarbeitete CommandID aus dem Snapshot lesen und alle neueren Commands aus dem Journal lesen und anwenden.

fsm := NewFSM()
fsm.RestoreSnapshot(snapshot)

lastCommandID := fsm.GetLastCommandID()
err = journal.ReadCommandsAfter(lastCommandID, func(cmd *Command) error {
    response := fsm.Apply(cmd)
    if err, ok := response.(error); ok {
        return err
    }
    return nil
})
if err != nil {
    panic(err)
}

// Start the server ...