Transactional Outbox: Ako vyriešiť Dual Write problém bez 2PC
Prvy outbox mi prisiel ako overkill, kym nezachranil deploy. “Uložíme objednávku a pošleme email.” Jedna veta, ktorá mi spôsobila tri mesiace debugovania. Zákazníci sa sťažovali, že nedostali potvrdenie objednávky, ale objednávka v systéme bola. Alebo opačne - dostali email, ale objednávka nikde.
Problém? Dual Write - dva nezávislé systémy (databáza + email/message queue) bez spoločnej transakcie. Riešenie? Transactional Outbox pattern.
Testované na: PostgreSQL 14+, Node.js 20+, TypeScript 5.x. Produkčne overené na systémoch s 50k+ správami denne.
Čo je Dual Write problém
Pozri sa na tento kód:
async function createOrder(order: Order): Promise<void> {
// Krok 1: Ulož do DB
await db.orders.insert(order);
// Krok 2: Pošli event do Kafka/RabbitMQ
await messageQueue.publish('order.created', order);
// Krok 3: Pošli notifikáciu
await emailService.send(order.customerEmail, 'Order confirmed');
}
Čo sa môže pokaziť?
- DB OK, Kafka FAIL - objednávka existuje, ale žiadny iný systém o nej nevie
- DB OK, Kafka OK, Email FAIL - zákazník nedostane potvrdenie
- DB timeout po Kafka - správa odoslaná, ale transakcia rollbacknutá
Race condition v praxi
Ešte horšie - aj keď všetko prebehne:
// Request 1 (pomalý network)
await db.orders.insert(order); // t=0ms
// ... network delay ...
await kafka.publish(event); // t=500ms
// Request 2 (rýchly)
await db.orders.update(order); // t=100ms
await kafka.publish(updateEvent); // t=150ms
// Kafka dostane: updateEvent (t=150ms), createEvent (t=500ms)
// Consumer vidí UPDATE pred CREATE!
Prečo 2PC nie je odpoveď
Two-Phase Commit (2PC) teoreticky rieši distributed transactions:
Coordinator: "Prepare?"
DB: "Ready"
Kafka: "Ready"
Coordinator: "Commit!"
DB: "Committed"
Kafka: "Committed"
Problémy:
- Kafka nepodporuje 2PC - väčšina message brokers nie
- Performance - blokovanie na najpomalší systém
- Dostupnosť - ak jeden systém padne, všetko stojí
- Komplexita - coordinator failures, timeouts, recovery
Transactional Outbox Pattern
Namiesto písania do dvoch systémov, píš LEN do databázy:
┌─────────────────────────────────────┐
│ PostgreSQL │
│ ┌─────────────┐ ┌──────────────┐ │
│ │ orders │ │ outbox │ │
│ │ │ │ │ │
│ │ id, data... │ │ id, payload │ │
│ │ │ │ processed_at │ │
│ └─────────────┘ └──────────────┘ │
└─────────────────────────────────────┘
│
▼ (async worker)
┌─────────┐ ┌─────────┐
│ Kafka │ │ Email │
└─────────┘ └─────────┘
Princíp:
- V jednej DB transakcii ulož order + outbox event
- Separátny worker číta outbox a posiela do externých systémov
- Worker označí event ako spracovaný
Implementácia v TypeScript
Outbox tabuľka
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
processed_at TIMESTAMP WITH TIME ZONE,
retry_count INT DEFAULT 0,
last_error TEXT
);
-- Index pre worker
CREATE INDEX idx_outbox_unprocessed
ON outbox(created_at)
WHERE processed_at IS NULL;
-- Index pre cleanup
CREATE INDEX idx_outbox_processed
ON outbox(processed_at)
WHERE processed_at IS NOT NULL;
Business logika s Outbox
interface OutboxEvent {
aggregateType: string;
aggregateId: string;
eventType: string;
payload: unknown;
}
class OrderService {
constructor(
private db: Pool,
private outbox: OutboxRepository
) {}
async createOrder(order: CreateOrderInput): Promise<Order> {
return await this.db.transaction(async (tx) => {
// 1. Ulož order
const savedOrder = await tx.orders.insert(order);
// 2. Ulož event do outbox - v tej istej transakcii!
await this.outbox.insertInTransaction(tx, {
aggregateType: 'Order',
aggregateId: savedOrder.id,
eventType: 'order.created',
payload: {
orderId: savedOrder.id,
customerId: savedOrder.customerId,
items: savedOrder.items,
total: savedOrder.total,
}
});
// 3. Ďalší event pre email
await this.outbox.insertInTransaction(tx, {
aggregateType: 'Order',
aggregateId: savedOrder.id,
eventType: 'notification.send',
payload: {
type: 'email',
to: order.customerEmail,
template: 'order-confirmation',
data: { orderId: savedOrder.id }
}
});
return savedOrder;
});
}
}
Outbox Worker s LISTEN/NOTIFY
PostgreSQL LISTEN/NOTIFY umožňuje real-time notifikácie bez pollingu:
-- Trigger na nové outbox eventy
CREATE OR REPLACE FUNCTION notify_outbox_insert()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('outbox_events', NEW.id::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER tr_outbox_notify
AFTER INSERT ON outbox
FOR EACH ROW
EXECUTE FUNCTION notify_outbox_insert();
Worker v TypeScript:
import { Pool, Client } from 'pg';
class OutboxWorker {
private listenerClient: Client;
private processingPool: Pool;
private isRunning = false;
constructor(
private handlers: Map<string, EventHandler>,
connectionString: string
) {
this.listenerClient = new Client(connectionString);
this.processingPool = new Pool({
connectionString,
max: 5 // Paralelné spracovanie
});
}
async start(): Promise<void> {
this.isRunning = true;
await this.listenerClient.connect();
await this.listenerClient.query('LISTEN outbox_events');
this.listenerClient.on('notification', async (msg) => {
if (msg.channel === 'outbox_events' && msg.payload) {
await this.processEvent(msg.payload);
}
});
// Spracuj existujúce nespracované eventy pri štarte
await this.processBacklog();
console.log('Outbox worker started');
}
private async processEvent(eventId: string): Promise<void> {
const client = await this.processingPool.connect();
try {
// Lock riadok pre exclusive spracovanie
const result = await client.query(`
SELECT * FROM outbox
WHERE id = $1
AND processed_at IS NULL
FOR UPDATE SKIP LOCKED
`, [eventId]);
if (result.rows.length === 0) {
return; // Už spracované iným workerom
}
const event = result.rows[0];
const handler = this.handlers.get(event.event_type);
if (!handler) {
console.error(`No handler for event type: ${event.event_type}`);
return;
}
try {
await handler.handle(event.payload);
// Označ ako spracované
await client.query(`
UPDATE outbox
SET processed_at = NOW()
WHERE id = $1
`, [eventId]);
} catch (error) {
// Retry logika
await client.query(`
UPDATE outbox
SET retry_count = retry_count + 1,
last_error = $2
WHERE id = $1
`, [eventId, error.message]);
// Dead letter po X pokusoch
if (event.retry_count >= 5) {
await this.moveToDeadLetter(client, event);
}
}
} finally {
client.release();
}
}
private async processBacklog(): Promise<void> {
const result = await this.processingPool.query(`
SELECT id FROM outbox
WHERE processed_at IS NULL
ORDER BY created_at
LIMIT 100
`);
for (const row of result.rows) {
await this.processEvent(row.id);
}
}
async stop(): Promise<void> {
this.isRunning = false;
await this.listenerClient.query('UNLISTEN outbox_events');
await this.listenerClient.end();
await this.processingPool.end();
}
}
Event Handlers
interface EventHandler {
handle(payload: unknown): Promise<void>;
}
class KafkaEventHandler implements EventHandler {
constructor(private kafka: KafkaProducer) {}
async handle(payload: OrderCreatedPayload): Promise<void> {
await this.kafka.send({
topic: 'orders',
messages: [{ value: JSON.stringify(payload) }]
});
}
}
class EmailEventHandler implements EventHandler {
constructor(private emailService: EmailService) {}
async handle(payload: NotificationPayload): Promise<void> {
await this.emailService.sendTemplate(
payload.to,
payload.template,
payload.data
);
}
}
// Registrácia handlers
const handlers = new Map<string, EventHandler>();
handlers.set('order.created', new KafkaEventHandler(kafka));
handlers.set('notification.send', new EmailEventHandler(emailService));
const worker = new OutboxWorker(handlers, DATABASE_URL);
await worker.start();
Garantované poradie eventov
Outbox pattern garantuje poradie pre jeden aggregate:
// V transakcii - poradie garantované
await this.outbox.insert(tx, { eventType: 'order.created', ... });
await this.outbox.insert(tx, { eventType: 'order.confirmed', ... });
await this.outbox.insert(tx, { eventType: 'order.shipped', ... });
Worker spracuje v poradí created_at:
SELECT * FROM outbox
WHERE aggregate_id = $1
AND processed_at IS NULL
ORDER BY created_at
FOR UPDATE SKIP LOCKED
Idempotencia a at-least-once delivery
Outbox garantuje at-least-once delivery (event sa doručí minimálne raz). Consumers musia byť idempotentní:
class OrderCreatedConsumer {
async handle(event: OrderCreatedEvent): Promise<void> {
// Idempotency check
const existing = await this.db.processedEvents.findOne({
eventId: event.id
});
if (existing) {
console.log(`Event ${event.id} already processed, skipping`);
return;
}
// Process event
await this.db.transaction(async (tx) => {
await this.processOrder(tx, event);
// Mark as processed
await tx.processedEvents.insert({ eventId: event.id });
});
}
}
Production Checklist
## Outbox Pattern Checklist
### Databáza
- [ ] Outbox tabuľka s indexami pre worker
- [ ] LISTEN/NOTIFY trigger
- [ ] Monitoring veľkosti outbox tabuľky
- [ ] Cleanup job pre staré processed eventy
### Worker
- [ ] Graceful shutdown (dokončí rozpracované)
- [ ] Health check endpoint
- [ ] Retry s exponential backoff
- [ ] Dead letter queue pre failed eventy
- [ ] Metrics: processing time, queue depth, error rate
### Monitoring
- [ ] Alert: outbox queue > N nespracovaných
- [ ] Alert: event processing time > X ms
- [ ] Alert: retry_count > threshold
- [ ] Dashboard: throughput, latency, errors
### High Availability
- [ ] Multiple worker instances (SKIP LOCKED)
- [ ] Worker restart policy (K8s, systemd)
- [ ] Database connection pool sizing
Kedy Outbox nepoužívať
- Nepotrebuješ guaranteed delivery - fire-and-forget je OK
- Single service - nepotrebuješ distributed transactions
- Máš event store - Event Sourcing už rieši tento problém
- Real-time requirements < 100ms - LISTEN/NOTIFY má latenciu
Alternatívy
Change Data Capture (CDC)
Debezium číta WAL log a generuje eventy:
PostgreSQL WAL → Debezium → Kafka
Výhody: Žiadna zmena kódu, žiadna outbox tabuľka Nevýhody: Infraštruktúra navyše, eventy sú DB-level (nie domain events)
Transaction Log Tailing
Podobné ako CDC, ale custom implementácia.
Záver
Transactional Outbox je battle-tested pattern pre reliable messaging. Kľúčové princípy:
- Jeden write - všetko do databázy v jednej transakcii
- Async processing - worker číta a posiela
- Idempotent consumers - at-least-once delivery vyžaduje deduplikáciu
- Monitoring - queue depth je tvoj najlepší indikátor problémov
Pre Node.js projekty odporúčam PostgreSQL s LISTEN/NOTIFY - minimálna latencia bez pollingu.
FAQ
Čo ak worker spadne počas spracovania?
Event zostane processed_at = NULL, worker ho pri reštarte spracuje znova. Preto musia byť consumers idempotentní.
Ako škálovať workers?
FOR UPDATE SKIP LOCKED umožňuje paralelné spracovanie. Každý worker locku a spracuje iný event. Pozor na poradie - ak potrebuješ strict ordering, použi jeden worker per aggregate.
Koľko eventov držať v outbox?
Processed eventy maž po 7-30 dňoch (podľa potreby debugovania). Unprocessed by nemali byť viac ako stovky - ak sú tisíce, máš problém.
Prečo nie Kafka Transactions?
Kafka Transactions riešia exactly-once v rámci Kafky, nie medzi DB a Kafkou. Outbox pattern rieši DB → anywhere.
Súvisiace články
- Zero-downtime migrácie PostgreSQL - Ako bezpečne migrovať outbox tabuľku
- Soft Delete past - Prečo archivovať staré outbox eventy
Súvisiace články
Soft Delete past: Prečo is_deleted zabíja tvoju databázu (a čo s tým)
Praktický rozbor prečo soft delete po rokoch rozbije výkon databázy. Benchmarky, partitioning riešenie a migračný checklist.
Idempotencia API: Ako navrhnúť endpointy odolné voči retry
Kompletný návod na implementáciu idempotentných API. Od Idempotency-Key cez Redis locking až po stavový diagram spracovania.
Dvojité Účtovanie z Idempotency Keys: Pasca Replica Lag
Perfektná idempotency logika, ale zákazníci sú stále účtovaní dvakrát. Príčina: kontrola idempotency keys voči read replice ktorá je sekundy za primary počas špičiek.
Architectural Linting: Automatizovaná ochrana proti spaghetti kódu
Ako vynútiť architektonické pravidlá v CI/CD. Dependency Cruiser pre JS/TS, ArchUnit pre Java a praktické príklady konfigurácie.
Citujte tento článok
Ak na článok odkazujete, pridajte pôvodnú URL a uveďte autora.