Späť na blog

Transactional Outbox: Ako vyriešiť Dual Write problém bez 2PC

Prvy outbox mi prisiel ako overkill, kym nezachranil deploy. “Uložíme objednávku a pošleme email.” Jedna veta, ktorá mi spôsobila tri mesiace debugovania. Zákazníci sa sťažovali, že nedostali potvrdenie objednávky, ale objednávka v systéme bola. Alebo opačne - dostali email, ale objednávka nikde.

Problém? Dual Write - dva nezávislé systémy (databáza + email/message queue) bez spoločnej transakcie. Riešenie? Transactional Outbox pattern.

Testované na: PostgreSQL 14+, Node.js 20+, TypeScript 5.x. Produkčne overené na systémoch s 50k+ správami denne.

Čo je Dual Write problém

Pozri sa na tento kód:

async function createOrder(order: Order): Promise<void> {
  // Krok 1: Ulož do DB
  await db.orders.insert(order);

  // Krok 2: Pošli event do Kafka/RabbitMQ
  await messageQueue.publish('order.created', order);

  // Krok 3: Pošli notifikáciu
  await emailService.send(order.customerEmail, 'Order confirmed');
}

Čo sa môže pokaziť?

  1. DB OK, Kafka FAIL - objednávka existuje, ale žiadny iný systém o nej nevie
  2. DB OK, Kafka OK, Email FAIL - zákazník nedostane potvrdenie
  3. DB timeout po Kafka - správa odoslaná, ale transakcia rollbacknutá

Race condition v praxi

Ešte horšie - aj keď všetko prebehne:

// Request 1 (pomalý network)
await db.orders.insert(order);  // t=0ms
// ... network delay ...
await kafka.publish(event);      // t=500ms

// Request 2 (rýchly)
await db.orders.update(order);  // t=100ms
await kafka.publish(updateEvent); // t=150ms

// Kafka dostane: updateEvent (t=150ms), createEvent (t=500ms)
// Consumer vidí UPDATE pred CREATE!

Prečo 2PC nie je odpoveď

Two-Phase Commit (2PC) teoreticky rieši distributed transactions:

Coordinator: "Prepare?"
  DB: "Ready"
  Kafka: "Ready"
Coordinator: "Commit!"
  DB: "Committed"
  Kafka: "Committed"

Problémy:

  1. Kafka nepodporuje 2PC - väčšina message brokers nie
  2. Performance - blokovanie na najpomalší systém
  3. Dostupnosť - ak jeden systém padne, všetko stojí
  4. Komplexita - coordinator failures, timeouts, recovery

Transactional Outbox Pattern

Namiesto písania do dvoch systémov, píš LEN do databázy:

┌─────────────────────────────────────┐
│           PostgreSQL                │
│  ┌─────────────┐  ┌──────────────┐  │
│  │   orders    │  │   outbox     │  │
│  │             │  │              │  │
│  │ id, data... │  │ id, payload  │  │
│  │             │  │ processed_at │  │
│  └─────────────┘  └──────────────┘  │
└─────────────────────────────────────┘

         ▼ (async worker)
    ┌─────────┐     ┌─────────┐
    │  Kafka  │     │  Email  │
    └─────────┘     └─────────┘

Princíp:

  1. V jednej DB transakcii ulož order + outbox event
  2. Separátny worker číta outbox a posiela do externých systémov
  3. Worker označí event ako spracovaný

Implementácia v TypeScript

Outbox tabuľka

CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id VARCHAR(100) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    processed_at TIMESTAMP WITH TIME ZONE,
    retry_count INT DEFAULT 0,
    last_error TEXT
);

-- Index pre worker
CREATE INDEX idx_outbox_unprocessed
ON outbox(created_at)
WHERE processed_at IS NULL;

-- Index pre cleanup
CREATE INDEX idx_outbox_processed
ON outbox(processed_at)
WHERE processed_at IS NOT NULL;

Business logika s Outbox

interface OutboxEvent {
  aggregateType: string;
  aggregateId: string;
  eventType: string;
  payload: unknown;
}

class OrderService {
  constructor(
    private db: Pool,
    private outbox: OutboxRepository
  ) {}

  async createOrder(order: CreateOrderInput): Promise<Order> {
    return await this.db.transaction(async (tx) => {
      // 1. Ulož order
      const savedOrder = await tx.orders.insert(order);

      // 2. Ulož event do outbox - v tej istej transakcii!
      await this.outbox.insertInTransaction(tx, {
        aggregateType: 'Order',
        aggregateId: savedOrder.id,
        eventType: 'order.created',
        payload: {
          orderId: savedOrder.id,
          customerId: savedOrder.customerId,
          items: savedOrder.items,
          total: savedOrder.total,
        }
      });

      // 3. Ďalší event pre email
      await this.outbox.insertInTransaction(tx, {
        aggregateType: 'Order',
        aggregateId: savedOrder.id,
        eventType: 'notification.send',
        payload: {
          type: 'email',
          to: order.customerEmail,
          template: 'order-confirmation',
          data: { orderId: savedOrder.id }
        }
      });

      return savedOrder;
    });
  }
}

Outbox Worker s LISTEN/NOTIFY

PostgreSQL LISTEN/NOTIFY umožňuje real-time notifikácie bez pollingu:

-- Trigger na nové outbox eventy
CREATE OR REPLACE FUNCTION notify_outbox_insert()
RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify('outbox_events', NEW.id::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER tr_outbox_notify
    AFTER INSERT ON outbox
    FOR EACH ROW
    EXECUTE FUNCTION notify_outbox_insert();

Worker v TypeScript:

import { Pool, Client } from 'pg';

class OutboxWorker {
  private listenerClient: Client;
  private processingPool: Pool;
  private isRunning = false;

  constructor(
    private handlers: Map<string, EventHandler>,
    connectionString: string
  ) {
    this.listenerClient = new Client(connectionString);
    this.processingPool = new Pool({
      connectionString,
      max: 5  // Paralelné spracovanie
    });
  }

  async start(): Promise<void> {
    this.isRunning = true;

    await this.listenerClient.connect();
    await this.listenerClient.query('LISTEN outbox_events');

    this.listenerClient.on('notification', async (msg) => {
      if (msg.channel === 'outbox_events' && msg.payload) {
        await this.processEvent(msg.payload);
      }
    });

    // Spracuj existujúce nespracované eventy pri štarte
    await this.processBacklog();

    console.log('Outbox worker started');
  }

  private async processEvent(eventId: string): Promise<void> {
    const client = await this.processingPool.connect();

    try {
      // Lock riadok pre exclusive spracovanie
      const result = await client.query(`
        SELECT * FROM outbox
        WHERE id = $1
        AND processed_at IS NULL
        FOR UPDATE SKIP LOCKED
      `, [eventId]);

      if (result.rows.length === 0) {
        return; // Už spracované iným workerom
      }

      const event = result.rows[0];
      const handler = this.handlers.get(event.event_type);

      if (!handler) {
        console.error(`No handler for event type: ${event.event_type}`);
        return;
      }

      try {
        await handler.handle(event.payload);

        // Označ ako spracované
        await client.query(`
          UPDATE outbox
          SET processed_at = NOW()
          WHERE id = $1
        `, [eventId]);

      } catch (error) {
        // Retry logika
        await client.query(`
          UPDATE outbox
          SET retry_count = retry_count + 1,
              last_error = $2
          WHERE id = $1
        `, [eventId, error.message]);

        // Dead letter po X pokusoch
        if (event.retry_count >= 5) {
          await this.moveToDeadLetter(client, event);
        }
      }
    } finally {
      client.release();
    }
  }

  private async processBacklog(): Promise<void> {
    const result = await this.processingPool.query(`
      SELECT id FROM outbox
      WHERE processed_at IS NULL
      ORDER BY created_at
      LIMIT 100
    `);

    for (const row of result.rows) {
      await this.processEvent(row.id);
    }
  }

  async stop(): Promise<void> {
    this.isRunning = false;
    await this.listenerClient.query('UNLISTEN outbox_events');
    await this.listenerClient.end();
    await this.processingPool.end();
  }
}

Event Handlers

interface EventHandler {
  handle(payload: unknown): Promise<void>;
}

class KafkaEventHandler implements EventHandler {
  constructor(private kafka: KafkaProducer) {}

  async handle(payload: OrderCreatedPayload): Promise<void> {
    await this.kafka.send({
      topic: 'orders',
      messages: [{ value: JSON.stringify(payload) }]
    });
  }
}

class EmailEventHandler implements EventHandler {
  constructor(private emailService: EmailService) {}

  async handle(payload: NotificationPayload): Promise<void> {
    await this.emailService.sendTemplate(
      payload.to,
      payload.template,
      payload.data
    );
  }
}

// Registrácia handlers
const handlers = new Map<string, EventHandler>();
handlers.set('order.created', new KafkaEventHandler(kafka));
handlers.set('notification.send', new EmailEventHandler(emailService));

const worker = new OutboxWorker(handlers, DATABASE_URL);
await worker.start();

Garantované poradie eventov

Outbox pattern garantuje poradie pre jeden aggregate:

// V transakcii - poradie garantované
await this.outbox.insert(tx, { eventType: 'order.created', ... });
await this.outbox.insert(tx, { eventType: 'order.confirmed', ... });
await this.outbox.insert(tx, { eventType: 'order.shipped', ... });

Worker spracuje v poradí created_at:

SELECT * FROM outbox
WHERE aggregate_id = $1
AND processed_at IS NULL
ORDER BY created_at
FOR UPDATE SKIP LOCKED

Idempotencia a at-least-once delivery

Outbox garantuje at-least-once delivery (event sa doručí minimálne raz). Consumers musia byť idempotentní:

class OrderCreatedConsumer {
  async handle(event: OrderCreatedEvent): Promise<void> {
    // Idempotency check
    const existing = await this.db.processedEvents.findOne({
      eventId: event.id
    });

    if (existing) {
      console.log(`Event ${event.id} already processed, skipping`);
      return;
    }

    // Process event
    await this.db.transaction(async (tx) => {
      await this.processOrder(tx, event);

      // Mark as processed
      await tx.processedEvents.insert({ eventId: event.id });
    });
  }
}

Production Checklist

## Outbox Pattern Checklist

### Databáza
- [ ] Outbox tabuľka s indexami pre worker
- [ ] LISTEN/NOTIFY trigger
- [ ] Monitoring veľkosti outbox tabuľky
- [ ] Cleanup job pre staré processed eventy

### Worker
- [ ] Graceful shutdown (dokončí rozpracované)
- [ ] Health check endpoint
- [ ] Retry s exponential backoff
- [ ] Dead letter queue pre failed eventy
- [ ] Metrics: processing time, queue depth, error rate

### Monitoring
- [ ] Alert: outbox queue > N nespracovaných
- [ ] Alert: event processing time > X ms
- [ ] Alert: retry_count > threshold
- [ ] Dashboard: throughput, latency, errors

### High Availability
- [ ] Multiple worker instances (SKIP LOCKED)
- [ ] Worker restart policy (K8s, systemd)
- [ ] Database connection pool sizing

Kedy Outbox nepoužívať

  1. Nepotrebuješ guaranteed delivery - fire-and-forget je OK
  2. Single service - nepotrebuješ distributed transactions
  3. Máš event store - Event Sourcing už rieši tento problém
  4. Real-time requirements < 100ms - LISTEN/NOTIFY má latenciu

Alternatívy

Change Data Capture (CDC)

Debezium číta WAL log a generuje eventy:

PostgreSQL WAL → Debezium → Kafka

Výhody: Žiadna zmena kódu, žiadna outbox tabuľka Nevýhody: Infraštruktúra navyše, eventy sú DB-level (nie domain events)

Transaction Log Tailing

Podobné ako CDC, ale custom implementácia.

Záver

Transactional Outbox je battle-tested pattern pre reliable messaging. Kľúčové princípy:

  1. Jeden write - všetko do databázy v jednej transakcii
  2. Async processing - worker číta a posiela
  3. Idempotent consumers - at-least-once delivery vyžaduje deduplikáciu
  4. Monitoring - queue depth je tvoj najlepší indikátor problémov

Pre Node.js projekty odporúčam PostgreSQL s LISTEN/NOTIFY - minimálna latencia bez pollingu.

FAQ

Čo ak worker spadne počas spracovania?

Event zostane processed_at = NULL, worker ho pri reštarte spracuje znova. Preto musia byť consumers idempotentní.

Ako škálovať workers?

FOR UPDATE SKIP LOCKED umožňuje paralelné spracovanie. Každý worker locku a spracuje iný event. Pozor na poradie - ak potrebuješ strict ordering, použi jeden worker per aggregate.

Koľko eventov držať v outbox?

Processed eventy maž po 7-30 dňoch (podľa potreby debugovania). Unprocessed by nemali byť viac ako stovky - ak sú tisíce, máš problém.

Prečo nie Kafka Transactions?

Kafka Transactions riešia exactly-once v rámci Kafky, nie medzi DB a Kafkou. Outbox pattern rieši DB → anywhere.


Súvisiace články

Súvisiace články

Citujte tento článok

Ak na článok odkazujete, pridajte pôvodnú URL a uveďte autora.

Michal Drozd. "Transactional Outbox: Ako vyriešiť Dual Write problém bez 2PC". https://www.michal-drozd.com/sk/blog/transactional-outbox/ (Publikované 27. marca 2025).