laamannjohann.es

Kawa Dev Log #1: Building a Distributed Saga Orchestration Engine

Introduction: The Distributed Transaction Problem

Anyone who’s worked with microservices at scale has faced this nightmare scenario: you’re processing an e-commerce order that needs to reserve inventory, charge payment, and send a confirmation email. Everything seems fine until the payment service goes down after inventory is reserved, leaving you with inconsistent state across services and no clear way to clean up.

Microservices and Consistency Challenges

The promise of microservices is compelling—independent deployments, technology diversity, and clear boundaries. But this comes at a cost: distributed transactions. When a single business operation spans multiple services, maintaining consistency becomes incredibly complex.

Consider this simple order flow:

  1. Inventory Service: Reserve 2 units of Product #123
  2. Payment Service: Charge $99.99 to customer’s card
  3. Email Service: Send order confirmation
  4. Analytics Service: Record purchase event

What happens when step 3 fails? You’ve already charged the customer and reserved inventory. Rolling back requires coordinating with multiple services, each with their own failure modes and recovery semantics.

Why Traditional 2PC Doesn’t Work at Scale

Two-phase commit (2PC) was designed to solve exactly this problem, but it falls apart in distributed systems:

In practice, 2PC creates more problems than it solves. Modern distributed systems need something better.

The Saga Pattern as a Solution

Enter the Saga pattern—a way to manage distributed transactions through a sequence of local transactions, each with a corresponding compensation action. Instead of locking resources across services, sagas allow you to:

This is exactly what Kawa is designed to orchestrate.

Project Overview: What is Kawa?

Kawa is a distributed saga orchestration engine built from the ground up to handle complex, multi-service transactions with reliability and observability. Think of it as a transaction coordinator that knows how to gracefully handle failures and keep your distributed system consistent.

Distributed Saga Orchestration Engine

At its core, Kawa manages workflow definitions and coordinates their execution across multiple services. Here’s how a typical workflow looks:

name: "ecommerce-order"
description: "Complete order processing with inventory and payment"
timeout: "5m"

steps:
  - id: "reserve_inventory"
    type: "http"
    action:
      method: "POST"
      url: "http://inventory-service/reserve"
      body: 
        product_id: "{{ saga.input.product_id }}"
        quantity: "{{ saga.input.quantity }}"
    compensation:
      method: "POST" 
      url: "http://inventory-service/release"
      body:
        reservation_id: "{{ steps.reserve_inventory.response.reservation_id }}"
    timeout: "30s"
    
  - id: "charge_payment"
    type: "http"
    depends_on: ["reserve_inventory"]
    action:
      method: "POST"
      url: "http://payment-service/charge"
      body:
        amount: "{{ saga.input.amount }}"
        payment_method: "{{ saga.input.payment_method }}"
    compensation:
      method: "POST"
      url: "http://payment-service/refund" 
      body:
        charge_id: "{{ steps.charge_payment.response.charge_id }}"
    timeout: "45s"

Kawa acts as a centralized orchestrator that communicates with your services over standard protocols.

Event Sourcing for Reliability

Every operation in Kawa generates immutable events that tell the complete story of a saga’s execution:

-- From the saga_events table
CREATE TABLE public.saga_events (
    id uuid DEFAULT public.uuid_generate_v4() NOT NULL,
    saga_id uuid NOT NULL,
    sequence_number bigint NOT NULL,
    event_type character varying(50) NOT NULL,
    step_id character varying(100),
    payload jsonb DEFAULT '{}'::jsonb NOT NULL,
    before_state jsonb,
    after_state jsonb,
    duration_ms integer,
    occurred_at timestamp(0) without time zone DEFAULT now() NOT NULL,
    inserted_at timestamp(0) without time zone NOT NULL
);

This event-sourced approach means Kawa can reconstruct the exact state of any saga at any point in time—crucial for debugging, auditing, and recovery scenarios.

WebSocket-Based Client Communication

Rather than requiring language-specific runtimes, Kawa communicates with clients over WebSockets. This makes integration straightforward:

# Client-side workflow registration
defmodule MyApp.KawaClient do
  use GenServer
  
  def handle_info({:execute_step, saga_id, step_id, input}, state) do
    result = execute_workflow_step(step_id, input)
    Phoenix.Channel.push(state.socket, "step_result", %{
      saga_id: saga_id,
      step_id: step_id, 
      result: result
    })
    {:noreply, state}
  end
end

Why Elixir? The Perfect Fit

Choosing Elixir for Kawa wasn’t arbitrary—it’s the ideal language for building reliable distributed systems.

Actor Model for Natural Concurrency

Elixir’s actor model maps perfectly to saga orchestration. Each saga can run in its own lightweight process, handling hundreds of thousands of concurrent workflows without breaking a sweat:

# From the application supervision tree
defmodule Kawa.Application do
  use Application

  def start(_type, _args) do
    children = [
      KawaWeb.Telemetry,
      Kawa.Repo,
      {Phoenix.PubSub, name: Kawa.PubSub},
      {Finch, name: Kawa.Finch},
      KawaWeb.Endpoint
    ]

    opts = [strategy: :one_for_one, name: Kawa.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

OTP Supervisor Trees for Fault Tolerance

The “let it crash” philosophy means individual saga failures don’t bring down the entire system. Supervisors automatically restart failed processes:

# Supervisor configuration ensures resilience
opts = [strategy: :one_for_one, name: Kawa.Supervisor]

Phoenix Channels for Real-Time Communication

Phoenix’s WebSocket implementation is battle-tested and handles thousands of concurrent connections efficiently—perfect for real-time saga coordination.

BEAM’s Legendary Reliability

The BEAM virtual machine was designed for 99.9999999% uptime in telecom systems. It’s the foundation that makes Kawa’s reliability guarantees possible.

Current Architecture Deep Dive

Let’s explore how Kawa actually works under the hood.

Database Schema and Event Sourcing

The database schema reflects the core domain entities:

-- Clients register and maintain connections
CREATE TABLE public.clients (
    id uuid DEFAULT public.uuid_generate_v4() NOT NULL,
    name character varying(255) NOT NULL,
    status character varying(20) DEFAULT 'disconnected' NOT NULL,
    last_heartbeat_at timestamp(0) without time zone,
    capabilities jsonb DEFAULT '{}'::jsonb NOT NULL
);

-- Workflow definitions act as templates
CREATE TABLE public.workflow_definitions (
    id uuid DEFAULT public.uuid_generate_v4() NOT NULL,
    name character varying(255) NOT NULL,
    client_id uuid NOT NULL,
    definition jsonb NOT NULL,
    definition_checksum character varying(32) NOT NULL
);

The event sourcing implementation uses PostgreSQL functions to maintain sequence integrity:

-- Auto-increment sequence numbers per saga
CREATE FUNCTION public.next_saga_sequence_number(p_saga_id uuid)
RETURNS bigint AS $$
DECLARE
  next_seq bigint;
BEGIN
  SELECT COALESCE(MAX(sequence_number), 0) + 1
  INTO next_seq
  FROM saga_events
  WHERE saga_id = p_saga_id;
  
  RETURN next_seq;
END;
$$ LANGUAGE plpgsql;

WebSocket Connection Management

Client connections are managed through Phoenix channels with automatic reconnection handling:

# Connection management tracks client state
defmodule Kawa.ClientRegistry do
  def client_connected(client_id, socket_pid) do
    # Resume paused sagas for this client
    Kawa.SagaManager.resume_client_sagas(client_id)
  end
  
  def client_disconnected(client_id) do
    # Pause running sagas for this client
    Kawa.SagaManager.pause_client_sagas(client_id)
  end
end

Saga State Machines and Step Execution

Each saga follows a well-defined state machine with proper status tracking:

@valid_states ~w(pending running completed failed compensating compensated skipped)

@valid_transitions %{
"pending" => ~w(running skipped),
"running" => ~w(completed failed),
"completed" => ~w(compensating),
"failed" => ~w(compensating),
"compensating" => ~w(compensated failed),
"compensated" => [],
"skipped" => []
}

State Recovery and Reliability

Reliability isn’t just a feature in Kawa—it’s the foundation everything else is built on.

Event Replay for Crash Recovery

When Kawa restarts, it automatically reconstructs saga state by replaying events:

# Startup recovery process
def recover_active_sagas() do
  active_sagas = get_active_sagas()
  
  Enum.each(active_sagas, fn saga ->
    events = get_saga_events(saga.id)
    reconstructed_state = replay_events(events)
    resume_saga(saga.id, reconstructed_state)
  end)
end

Client Reconnection Handling

When clients reconnect after network failures, Kawa seamlessly resumes where they left off:

def handle_client_reconnection(client_id, socket_pid) do
  # Find sagas that were paused due to disconnection
  paused_sagas = get_paused_sagas_for_client(client_id)
  
  # Resume execution
  Enum.each(paused_sagas, fn saga ->
    resume_saga_execution(saga.id)
  end)
end

Compensation Guarantees

Failed sagas trigger automatic compensation in reverse order:

def compensate_saga(saga_id) do
  completed_steps = get_completed_steps(saga_id)
  
  # Execute compensations in reverse order
  completed_steps
  |> Enum.reverse()
  |> Enum.each(&execute_compensation/1)
end

Zero-Transaction-Loss Design

The combination of event sourcing, proper database constraints, and careful state management ensures that no transaction is ever lost:

What’s Next: Future Roadmap

Kawa is still in active development, and there’s an exciting roadmap ahead.

Language-Specific SDKs (Go, Python, Node.js)

While the current WebSocket approach works, native SDKs will make integration even smoother:

// Future Go SDK
import "github.com/kawa/go-sdk"

func ProcessOrder(ctx context.Context, orderData OrderData) error {
    workflow := kawa.NewWorkflow("ecommerce-order")
    
    workflow.Step("reserve_inventory", func(ctx kawa.StepContext) error {
        return inventoryService.Reserve(orderData.ProductID)
    }).CompensateWith(func(ctx kawa.StepContext) error {
        return inventoryService.Release(ctx.Get("reservation_id"))
    })
    
    return workflow.Execute(ctx, orderData)
}

Web-Based Monitoring Dashboard

A Phoenix LiveView dashboard will provide real-time visibility into saga execution, performance metrics, and failure analysis.

Advanced Retry Policies and Circuit Breakers

Enhanced resilience features including exponential backoff, jitter, and automatic circuit breaking for unhealthy services.

Distributed Deployment Patterns

Support for running Kawa across multiple data centers with proper consensus and leader election.


Kawa is open source. Check out the repository at https://github.com/jlaamann/kawa