State Machine Snapshots laden und speichern

Im letzten Teil habe ich die Finite State Machine (FSM) gebaut, die Commands verarbeitet und den Zustand berechnet. Um diesen Zustand speichern und wiederherstellen zu können, füge ich jetzt Snapshots des aktuellen Zustands hinzu. Ein Snapshot ist eine Momentaufnahme des aktuellen FSM-Zustands, die gespeichert und später wiederhergestellt werden kann. Um weniger Datenverlust bei einem Serverneustart zu haben, speichere ich in einem späteren Teil auch die Commands. Allerdings bleiben Snapshots weiterhin wichtig, um den Zustand schnell wiederherzustellen, ohne alle Commands erneut anwenden zu müssen. Dadurch müssen nach dem Laden eines Snapshots nur noch die Commands angewendet werden, die nach dem Snapshot eingegangen sind.

Teil der Serie
  1. Die State Machine – Aus Commands wird ein deterministischer Zustand
  2. State Machine Snapshots laden und speichern

Ich erweitere in diesem Artikel die Finite State Machine aus dem letzten Teil. Als erstes erweitere ich die Commands um ein CommandID Feld, um sie eindeutig identifizieren zu können. Das CommandID-Feld ist wichtig, damit beim Wiederherstellen des Snapshots klar ist, bis zu welchem Command der Zustand bereits angewendet wurde. So können nach dem Laden des Snapshots nur die noch nicht angewendeten Commands erneut ausgeführt werden.

type Command struct {
    CommandID uint64
    // ...f
}

Zusätzlich erweitere ich die FSM um ein LastAppliedCommandID Feld, das in der Apply Methode aktualisiert wird, wenn ein Command erfolgreich angewendet wurde.

type FSM struct {
    // lastAppliedCommandID is the ID of the last command applied to this FSM.
    lastAppliedCommandID uint64
    // ...
}

func (f *FSM) Apply(cmd *Command) any {
    // ...
    f.lastAppliedCommandID = cmd.CommandID
    // ...
}

Snapshot erstellen

Die Methode CreateSnapshot erstellt einen Zustandssnapshot, der später wiederhergestellt werden kann. In der Hashicorp Raft Implementierung wird dafür der State geclont und über ein etwas komplexeres Verfahren asynchron gespeichert. Die verwendete Memory Datenbank MemDB kann sich direkt clonen und danach speichern, sodass der Zustand nicht mehr verändert wird.

Ich mache es hier einfacher und serialisiere den Zustand direkt. Der Snapshot enthält die lastAppliedCommandID und den aktuellen Zustand der FSM, der als JSON serialisiert wird.

type Snapshot struct {
	// lastAppliedCommandID is the ID of the last command applied before this snapshot was created.
	LastAppliedCommandID string `json:"last_applied_command_id"`
	// State is the serialized state of the FSM at the time of the snapshot.
	State json.RawMessage `json:"state"`
}

Der State in meinem Beispiel ist ein Node-Tree, also eine Baumstruktur aus Nodes. Daher muss ich nur das Root-Element serialisieren, um den gesamten Zustand zu speichern. Um zu verhindern, dass sich der Zustand während der Snapshot-Erstellung ändert, sperre ich die FSM mit einem Mutex.

func (f *FSM) CreateSnapshot() *Snapshot {
	f.lock.Lock()
	defer f.lock.Unlock()

	state := f.parentNode
	serializedState, err := json.Marshal(state)
	if err != nil {
		panic(fmt.Sprintf("failed to serialize FSM state: %v", err))
	}

	return &Snapshot{
		LastAppliedCommandID: fmt.Sprintf("%d", f.lastAppliedCommandID),
		State:                serializedState,
	}
}

Snapshot wiederherstellen

Die Methode RestoreSnapshot stellt den Zustand der FSM aus einem Snapshot wieder her. Dabei wird der aktuelle Zustand durch den im Snapshot gespeicherten Zustand ersetzt. Die lastAppliedCommandID wird auf den Wert des Snapshots gesetzt, um zu wissen, bis zu welchem Command der Zustand gültig ist.

func (f *FSM) RestoreSnapshot(snapshot *Snapshot) error {
	f.lock.Lock()
	defer f.lock.Unlock()

	var state Node
	if err := json.Unmarshal(snapshot.State, &state); err != nil {
		return fmt.Errorf("failed to unmarshal FSM state: %v", err)
	}

	f.parentNode = &state
	f.nodes = make(map[string]*Node)
	f.nodes[state.ID] = &state

	// Add all children to the FSM nodes map
	f.addChildrenToMap(&state)

	// Reset the LastAppliedCommandID to the one from the snapshot
	lastAppliedID, err := strconv.ParseInt(snapshot.LastAppliedCommandID, 10, 64)
	if err != nil {
		return fmt.Errorf("invalid lastAppliedCommandID in snapshot: %v", err)
	}
	f.lastAppliedCommandID = lastAppliedID

	return nil
}

Die Methode addChildrenToMap fügt alle Children des Root-Nodes zur nodes Map hinzu, sodass die FSM wieder schnellen Zugriff auf alle Nodes hat.

func (f *FSM) addChildrenToMap(node *Node) {
	for _, child := range node.Children {
		f.nodes[child.ID] = child
		f.addChildrenToMap(child)
	}
}

Anwendung

Die FSM kann jetzt Snapshots erstellen und wiederherstellen. Hier ist ein Beispiel, wie die FSM verwendet werden kann.

Als Erstes erstelle ich eine neue FSM und lade, wenn vorhanden, den letzten Snapshot.

package main

import (
    "encoding/json"
    "fmt"
    "os"
)

func main() {
	fsm := NewFSM()

	// Load snapshot if it exists
	snapshotFileName := "fsm_snapshot.json"
	if _, err := os.Stat(snapshotFileName); err == nil {
		// Read the snapshot file
		rawSnapshot, err := os.ReadFile(snapshotFileName)
		if err != nil {
			log.Fatalf("Error reading snapshot file: %v", err)
		}

		snapshot := &Snapshot{}
		if err := json.Unmarshal(rawSnapshot, snapshot); err != nil {
			log.Fatalf("Error unmarshalling snapshot: %v", err)
		}
		if err := fsm.RestoreSnapshot(snapshot); err != nil {
			log.Fatalf("Error restoring snapshot: %v", err)
		}
		fmt.Println("Restored FSM from snapshot.")
	}
}

Wenn der Server herunterfährt, möchte ich den aktuellen Zustand in einem Snapshot speichern.

func main() {
    // ...

    defer func() {
        snapshot := fsm.CreateSnapshot()
        rawSnapshot, err := json.Marshal(snapshot)
        if err != nil {
            log.Fatalf("Error marshalling snapshot: %v", err)
        }
        if err := os.WriteFile(snapshotFileName, rawSnapshot, 0644); err != nil {
            log.Fatalf("Error writing snapshot file: %v", err)
        }
        fmt.Println("FSM snapshot saved.")
    }()
}

Denselben Code kann ich auch an einen Timer hängen, um regelmäßig Snapshots zu erstellen (z.B. alle 10 Minuten). Durch das lock in der CreateSnapshot Methode wird sichergestellt, dass es keine Konflikte gibt.

Anschließend kann, wie im letzten Teil beschrieben, die FSM Commands anwenden, um den Zustand zu verändern. Hier ein Beispiel, wie ein Node erstellt wird:

func main() {
    // ...

    cmd1 := &Command{
        CommandID:    1,
        Operation:    CreateOperation,
        TargetNodeID: "root",
        NewNodeID:    "node1",
    }
    response := fsm.Apply(cmd1)
    if err, ok := response.(error); ok {
        log.Printf("Error applying command: %v", err)
    } else {
        fmt.Printf("Created new node: %v\n", response)
    }
}