State Machine Snapshots laden und speichern
Im letzten Teil habe ich die Finite State Machine (FSM) gebaut, die Commands verarbeitet und den Zustand berechnet. Um diesen Zustand speichern und wiederherstellen zu können, füge ich jetzt Snapshots des aktuellen Zustands hinzu. Ein Snapshot ist eine Momentaufnahme des aktuellen FSM-Zustands, die gespeichert und später wiederhergestellt werden kann. Um weniger Datenverlust bei einem Serverneustart zu haben, speichere ich in einem späteren Teil auch die Commands. Allerdings bleiben Snapshots weiterhin wichtig, um den Zustand schnell wiederherzustellen, ohne alle Commands erneut anwenden zu müssen. Dadurch müssen nach dem Laden eines Snapshots nur noch die Commands angewendet werden, die nach dem Snapshot eingegangen sind.
- Die State Machine – Aus Commands wird ein deterministischer Zustand
- State Machine Snapshots laden und speichern
Ich erweitere in diesem Artikel die Finite State Machine aus dem letzten Teil. Als erstes erweitere ich die Commands um ein CommandID
Feld, um sie eindeutig identifizieren zu können. Das CommandID-Feld ist wichtig, damit beim Wiederherstellen des Snapshots klar ist, bis zu welchem Command der Zustand bereits angewendet wurde. So können nach dem Laden des Snapshots nur die noch nicht angewendeten Commands erneut ausgeführt werden.
type Command struct {
CommandID uint64
// ...f
}
Zusätzlich erweitere ich die FSM um ein LastAppliedCommandID
Feld, das in der Apply
Methode aktualisiert wird, wenn ein Command erfolgreich angewendet wurde.
type FSM struct {
// lastAppliedCommandID is the ID of the last command applied to this FSM.
lastAppliedCommandID uint64
// ...
}
func (f *FSM) Apply(cmd *Command) any {
// ...
f.lastAppliedCommandID = cmd.CommandID
// ...
}
Snapshot erstellen
Die Methode CreateSnapshot
erstellt einen Zustandssnapshot, der später wiederhergestellt werden kann. In der Hashicorp Raft Implementierung wird dafür der State geclont und über ein etwas komplexeres Verfahren asynchron gespeichert. Die verwendete Memory Datenbank MemDB kann sich direkt clonen und danach speichern, sodass der Zustand nicht mehr verändert wird.
Ich mache es hier einfacher und serialisiere den Zustand direkt. Der Snapshot enthält die lastAppliedCommandID
und den aktuellen Zustand der FSM, der als JSON serialisiert wird.
type Snapshot struct {
// lastAppliedCommandID is the ID of the last command applied before this snapshot was created.
LastAppliedCommandID string `json:"last_applied_command_id"`
// State is the serialized state of the FSM at the time of the snapshot.
State json.RawMessage `json:"state"`
}
Der State in meinem Beispiel ist ein Node-Tree, also eine Baumstruktur aus Nodes. Daher muss ich nur das Root-Element serialisieren, um den gesamten Zustand zu speichern. Um zu verhindern, dass sich der Zustand während der Snapshot-Erstellung ändert, sperre ich die FSM mit einem Mutex.
func (f *FSM) CreateSnapshot() *Snapshot {
f.lock.Lock()
defer f.lock.Unlock()
state := f.parentNode
serializedState, err := json.Marshal(state)
if err != nil {
panic(fmt.Sprintf("failed to serialize FSM state: %v", err))
}
return &Snapshot{
LastAppliedCommandID: fmt.Sprintf("%d", f.lastAppliedCommandID),
State: serializedState,
}
}
Snapshot wiederherstellen
Die Methode RestoreSnapshot
stellt den Zustand der FSM aus einem Snapshot wieder her. Dabei wird der aktuelle Zustand durch den im Snapshot gespeicherten Zustand ersetzt. Die lastAppliedCommandID
wird auf den Wert des Snapshots gesetzt, um zu wissen, bis zu welchem Command der Zustand gültig ist.
func (f *FSM) RestoreSnapshot(snapshot *Snapshot) error {
f.lock.Lock()
defer f.lock.Unlock()
var state Node
if err := json.Unmarshal(snapshot.State, &state); err != nil {
return fmt.Errorf("failed to unmarshal FSM state: %v", err)
}
f.parentNode = &state
f.nodes = make(map[string]*Node)
f.nodes[state.ID] = &state
// Add all children to the FSM nodes map
f.addChildrenToMap(&state)
// Reset the LastAppliedCommandID to the one from the snapshot
lastAppliedID, err := strconv.ParseInt(snapshot.LastAppliedCommandID, 10, 64)
if err != nil {
return fmt.Errorf("invalid lastAppliedCommandID in snapshot: %v", err)
}
f.lastAppliedCommandID = lastAppliedID
return nil
}
Die Methode addChildrenToMap
fügt alle Children des Root-Nodes zur nodes
Map hinzu, sodass die FSM wieder schnellen Zugriff auf alle Nodes hat.
func (f *FSM) addChildrenToMap(node *Node) {
for _, child := range node.Children {
f.nodes[child.ID] = child
f.addChildrenToMap(child)
}
}
Anwendung
Die FSM kann jetzt Snapshots erstellen und wiederherstellen. Hier ist ein Beispiel, wie die FSM verwendet werden kann.
Als Erstes erstelle ich eine neue FSM und lade, wenn vorhanden, den letzten Snapshot.
package main
import (
"encoding/json"
"fmt"
"os"
)
func main() {
fsm := NewFSM()
// Load snapshot if it exists
snapshotFileName := "fsm_snapshot.json"
if _, err := os.Stat(snapshotFileName); err == nil {
// Read the snapshot file
rawSnapshot, err := os.ReadFile(snapshotFileName)
if err != nil {
log.Fatalf("Error reading snapshot file: %v", err)
}
snapshot := &Snapshot{}
if err := json.Unmarshal(rawSnapshot, snapshot); err != nil {
log.Fatalf("Error unmarshalling snapshot: %v", err)
}
if err := fsm.RestoreSnapshot(snapshot); err != nil {
log.Fatalf("Error restoring snapshot: %v", err)
}
fmt.Println("Restored FSM from snapshot.")
}
}
Wenn der Server herunterfährt, möchte ich den aktuellen Zustand in einem Snapshot speichern.
func main() {
// ...
defer func() {
snapshot := fsm.CreateSnapshot()
rawSnapshot, err := json.Marshal(snapshot)
if err != nil {
log.Fatalf("Error marshalling snapshot: %v", err)
}
if err := os.WriteFile(snapshotFileName, rawSnapshot, 0644); err != nil {
log.Fatalf("Error writing snapshot file: %v", err)
}
fmt.Println("FSM snapshot saved.")
}()
}
Denselben Code kann ich auch an einen Timer hängen, um regelmäßig Snapshots zu erstellen (z.B. alle 10 Minuten). Durch das lock
in der CreateSnapshot
Methode wird sichergestellt, dass es keine Konflikte gibt.
Anschließend kann, wie im letzten Teil beschrieben, die FSM Commands anwenden, um den Zustand zu verändern. Hier ein Beispiel, wie ein Node erstellt wird:
func main() {
// ...
cmd1 := &Command{
CommandID: 1,
Operation: CreateOperation,
TargetNodeID: "root",
NewNodeID: "node1",
}
response := fsm.Apply(cmd1)
if err, ok := response.(error); ok {
log.Printf("Error applying command: %v", err)
} else {
fmt.Printf("Created new node: %v\n", response)
}
}