Späť na blog

5000 Unacked správ a stúpa: Zastav RabbitMQ consumer meltdowny v CI

|
| rabbitmq, debugging, testing, ci, message-queue

RabbitMQ unacked message explózie ma naučili, že “funguje to v stagingu” neznamená nič ak netestuješ acknowledgment behavior. “Prečo queue rastie? Consumer beží.” Zízali sme do RabbitMQ management UI ukazujúceho messages_unacknowledged stúpajúce z 50 na 500 na 2000, kým messages_ready zostávalo nízke. Použitie pamäte na brokeri stúpalo. Consumer proces vyzeral zdravo - CPU normálne, žiadne crashe, logy ukazovali “správy sa úspešne spracúvajú.”

Potom prišiel redeploy. V momente keď sa consumer restartoval, všetkých 2000 unacked správ sa redeliverlo. Čas spracovania sa zdvojnásobil kvôli duplicate detection logike. Polovica zlyhala validáciou a išla späť do queue znova. Nechtiac sme vytvorili redelivery storm - tie isté správy sa cyklili systémom, každý pokus spotreboval zdroje, žiadna z nich sa skutočne nedokončila.

Root cause bol trápne jednoduchý: niekto “dočasne” zvýšil prefetch na 500 aby to “lepšie zvládalo burst traffic.” V kombinácii s pomalým spracovaním (external API call + database write na správu), to znamenalo že každá consumer inštancia mohla držať 500 správ unacked. Keď sa spracovanie spomalilo (API timeouty), unacked count explodoval. Keď sa consumer restartoval, RabbitMQ správne redeliveroval všetky unacked správy, ale naša idempotencia nebola perfektná, čo viedlo k stormu.

Čo robilo toto zvlášť frustrujúce bolo, že naše load testovanie vyzeralo v poriadku. Testovali sme throughput, latenciu, error rates - ale nikdy sme netestovali čo sa stane s unacked správami počas processing delays alebo restartov. RabbitMQ ack kontrakt je implicitný: musíš acknúť alebo nacknúť v rozumnom čase, alebo vytvoríš memory/redelivery bombu. Ale nikto toto nevaliduje v CI.

Prostredie: RabbitMQ 3.12+, Spring AMQP consumeri, prefetch=500, externé API závislosti

Pochopenie Unacked Message problému

Ako RabbitMQ Acknowledgment funguje

Šťastná cesta:
┌─────────┐     deliver      ┌──────────┐
│ RabbitMQ│ ─────────────────▶│ Consumer │
│  Queue  │                   │          │
└─────────┘                   └──────────┘
             1. Správa delivered, označená "unacknowledged"
             2. Consumer spracuje (zapíše DB, zavolá API)
             3. Consumer pošle ACK
             4. RabbitMQ odstráni z queue permanentne

Unacked count: normálne nízky (in-flight spracovanie)
Čo sa skutočne deje s vysokým prefetch + pomalým spracovaním:
┌─────────┐                   ┌──────────┐
│ RabbitMQ│ ─────────────────▶│ Consumer │
│  Queue  │  prefetch=500     │ (pomalý) │
└─────────┘                   └──────────┘

RabbitMQ deliveruje 500 správ okamžite (prefetch limit)
Consumer spracovanie každej trvá 5 sekúnd (API call + DB write)
Unacked count = 500

Ak sa spracovanie spomalí na 10s (API timeout):
- Consumer môže acknúť len ~50/s
- Nové správy prichádzajú rýchlosťou 100/s
- Unacked stále rastie!

Ak consumer crashne/restartuje:
- Všetky 500 unacked → redelivered
- Ak nie je idempotentné → duplicity/errory
- Ak niektoré zlyhajú validáciou → nack requeue=true → loop forever

Redelivery Storm pattern

1. Vysoký unacked count sa nabuilduje (500+ správ)
2. Consumer crashne alebo sa redeployne
3. RabbitMQ redeliveruje všetky unacked správy
4. Consumer ich spracuje znova:
   - Niektoré duplicity (už v DB) → nack requeue=false → DLQ
   - Niektoré zlyhajú validáciou → nack requeue=true → späť do queue
   - Niektoré uspejú → ack
5. Tie čo išli späť do queue (#4) sa redeliverujú
6. Cyklus sa opakuje

Výsledok: tie isté správy sa cyklia, spotrebúvajú CPU/memory/API kvótu

Toto je neviditeľné v štandardných metrikách pokiaľ nie je príliš neskoro.

Bežné scenáre (debugoval som všetky)

1. “Dočasné” zvýšenie prefetch

# Predtým: rozumné
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 10  # Max 10 unacked per consumer

# Potom: katastrofa
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 500  # "lepšie zvládne bursty!"

Funguje skvelo… pokiaľ sa spracovanie nespomalí. Potom 500 unacked per consumer × 10 consumerov = 5000 unacked správ = RabbitMQ memory spike.

2. Ack až po “všetkom”

@RabbitListener(queues = "billing.jobs")
public void process(BillingJob job) {
    // 1. Validuj
    validate(job);

    // 2. Zápis do DB
    repository.save(job);

    // 3. Zavolaj externé API (môže byť pomalé!)
    externalService.notify(job);  // ← Toto trvá 5-30s

    // 4. Aktualizuj analytics
    analytics.track(job);

    // ACK sa deje tu automaticky (Spring default)
}

Ak je externé API pomalé, unacked sa builduje. Ak zlyhá, správa je lost (už acked). Lepšie: acknúť po idempotent pointe (DB write).

3. Poison Message Loop

@RabbitListener(queues = "orders")
public void process(Order order) {
    try {
        processOrder(order);
        // auto-ack
    } catch (ValidationException e) {
        // Správa ide späť do queue (nack requeue=true)
        // Redeliveruje sa okamžite
        // Zlyhá validáciou znova
        // Nekonečný loop!
    }
}

Bez DLX (Dead Letter Exchange) alebo retry limitov, poison správy sa cyklia navždy.

Ack Contract prístup

Definuj explicitné očakávania pre consumer behavior:

# ack_contract.yml
version: 1

queues:
  billing.jobs:
    vhost: /
    max_unacked: 200              # Total unacked naprieč všetkými consumermi
    max_unacked_per_consumer: 50  # Per consumer inštancia
    max_redeliver_ratio: 0.02     # Max 2% správ redelivered
    sample_duration: 60s          # Testuj 1 minútu
    sample_interval: 1s           # Kontroluj každú sekundu

Tento kontrakt hovorí:

  • max_unacked: Nie viac ako 200 správ čakajúcich na ack v akomkoľvek čase
  • max_unacked_per_consumer: Každý consumer môže držať max 50 unacked (validuje prefetch)
  • max_redeliver_ratio: Menej ako 2% správ sa redeliveruje (chytí poison loops)

Implementácia: Testuj reálne Consumer behavior

Kľúčový insight: musíš testovať s reálnym RabbitMQ brokerom a tvojím reálnym consumer kódom. Mocking nechytí ack timing problémy.

Docker Compose pre CI

# docker-compose.rabbit.yml
services:
  rabbitmq:
    image: rabbitmq:3.12-management
    ports:
      - "5672:5672"
      - "15672:15672"  # Management UI/API
    environment:
      RABBITMQ_DEFAULT_USER: guest
      RABBITMQ_DEFAULT_PASS: guest
    healthcheck:
      test: rabbitmq-diagnostics -q ping
      interval: 5s
      timeout: 5s
      retries: 3

Test Scenár: Publish + Consume + Meraj

// ContractTest.java
@SpringBootTest
@Testcontainers
class AckContractTest {

    @Container
    static RabbitMQContainer rabbit = new RabbitMQContainer("rabbitmq:3.12-management");

    @Test
    void verifyAckContract() throws Exception {
        // 1. Spusti consumer (tvoj reálny consumer kód)
        startConsumer();

        // 2. Publishni workload (vrátane niektorých poison správ)
        publishTestWorkload(1000);

        // 3. Nechaj bežať 60s
        Thread.sleep(60_000);

        // 4. Validuj že unacked zostal v hraniciach
        RabbitStats stats = fetchQueueStats("billing.jobs");
        assertThat(stats.unacked).isLessThan(200);
        assertThat(stats.redeliverRatio).isLessThan(0.02);
    }

    private RabbitStats fetchQueueStats(String queue) {
        // Zavolaj RabbitMQ Management API:
        // GET http://localhost:15672/api/queues/%2F/billing.jobs
        // Vráti: messages_ready, messages_unacknowledged, message_stats.*
    }
}

Toto testuje:

  • Reálne ack timing (nie mocked)
  • Reálne processing delays
  • Reálne error handling (poison správy)
  • Reálne prefetch správanie

CI Integrácia (GitHub Actions)

name: rabbitmq-ack-contract

on: [pull_request]

jobs:
  ack:
    runs-on: ubuntu-latest
    services:
      rabbitmq:
        image: rabbitmq:3.12-management
        ports:
          - 5672:5672
          - 15672:15672
        options: >-
          --health-cmd "rabbitmq-diagnostics -q ping"
          --health-interval 5s
          --health-timeout 5s
          --health-retries 3

    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-java@v4
        with:
          distribution: temurin
          java-version: '21'

      - uses: actions/setup-go@v5
        with:
          go-version: '1.22'

      - name: Spusti consumer v pozadí
        run: |
          ./gradlew bootRun &
          sleep 10  # Počkaj na start consumera

      - name: Publishni test workload
        run: |
          # Pošli 1000 správ vrátane poison prípadov
          ./gradlew publishTestMessages

      - name: Over ack kontrakt
        run: |
          go run ./cmd/ackcontract \
            -mgmt http://localhost:15672 \
            -user guest \
            -pass guest \
            -queue billing.jobs \
            -max-unacked 200 \
            -max-unacked-per-consumer 50 \
            -max-redeliver-ratio 0.02 \
            -duration 60s \
            -interval 1s \
            -report ack_report.json

      - name: Uploadni report
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: ack-report
          path: ack_report.json

Toto ti dá reálny end-to-end test ack behaviouru v CI, chytajúci problémy ako:

  • Prefetch príliš vysoký
  • Ack príliš neskoro
  • Poison message loops
  • Chýbajúca DLX konfigurácia

Runtime Monitorovanie (Produkcia)

CI chytí design problémy. Produkcia potrebuje kontinuálne monitorovanie:

# Unacked správy (malo by zostávať nízke)
rabbitmq_queue_messages_unacknowledged{queue="billing.jobs"}

# Redeliver ratio (malo by byť blízko 0)
rate(rabbitmq_queue_messages_redelivered_total[5m])
/ rate(rabbitmq_queue_messages_delivered_total[5m])

# Alert na spike
rabbitmq_queue_messages_unacknowledged > 500

# Alert na redelivery storm
rate(rabbitmq_queue_messages_redelivered_total[5m])
/ rate(rabbitmq_queue_messages_delivered_total[5m])
> 0.05  # Viac ako 5% redelivered

Keď Unacked exploduje v produkcii

Krok 1: Diagnostikuj

# Skontroluj queue stats
rabbitmqctl list_queues name messages_unacknowledged consumers

# Skontroluj consumer prefetch
rabbitmqctl list_consumers | grep billing.jobs

# Skontroluj poison správy (vysoký redeliver count)
# Cez Management UI: Queue → Get Messages → Skontroluj "redelivered"

Krok 2: Okamžitá mitigácia

# Možnosť 1: Zníž prefetch (vyžaduje consumer restart)
# Aktualizuj config: prefetch: 10

# Možnosť 2: Presuň poison správy do DLQ manuálne
# Cez Management UI: Purge alebo presuň správy

# Možnosť 3: Scaluj consumery (ak je bottleneck processing kapacita)
kubectl scale deployment billing-consumer --replicas=10

Krok 3: Oprav Root Cause

Bežné fixy:

Vysoký Unacked:

// Predtým: ack po všetkom
@RabbitListener(queues = "billing.jobs", ackMode = "AUTO")
public void process(Job job) {
    db.save(job);
    slowApi.call(job);  // Drží správu unacked počas pomalého volania
}

// Potom: manuálny ack po idempotent pointe
@RabbitListener(queues = "billing.jobs", ackMode = "MANUAL")
public void process(Job job, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    db.save(job);  // Idempotent
    channel.basicAck(tag, false);  // Ack tu!

    try {
        slowApi.call(job);  // Ak toto zlyhá, správa už acked (bezpečné)
    } catch (Exception e) {
        // Handle ale nenackuj (správa už odstránená z queue)
    }
}

Poison Message Loop:

// Pridaj DLX (Dead Letter Exchange) konfiguráciu
@Bean
Queue billingQueue() {
    return QueueBuilder.durable("billing.jobs")
        .withArgument("x-dead-letter-exchange", "billing.dlx")
        .withArgument("x-dead-letter-routing-key", "billing.failed")
        .build();
}

// Handle validation errors správne
@RabbitListener(queues = "billing.jobs", ackMode = "MANUAL")
public void process(Job job, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
    try {
        validate(job);
        processJob(job);
        channel.basicAck(tag, false);
    } catch (ValidationException e) {
        // Nerequeue invalidné správy
        channel.basicNack(tag, false, false);  // requeue=false → ide do DLX
    }
}

Checklist

## RabbitMQ Ack Contract Checklist

### Pred deploymentom
- [ ] Prefetch nastavený podľa skutočného processing času (nie "cíti sa dobre")
- [ ] Ack po idempotent pointe, nie po celom flow
- [ ] DLX nakonfigurované pre poison message handling
- [ ] Validation errory → nack requeue=false (nie nekonečný loop)

### CI Kontrakt
- [ ] Testuj s reálnym RabbitMQ (nie mocked)
- [ ] Zahrňuj poison správy v test workload
- [ ] Validuj max_unacked počas 60s behu
- [ ] Validuj redeliver_ratio < 2%

### Produkčné Monitorovanie
- [ ] Alert na unacked > threshold
- [ ] Alert na redeliver_ratio > 5%
- [ ] Dashboard ukazujúci unacked trend
- [ ] Trackuj consumer count (chytí crashe)

### Keď Unacked vystrelie
- [ ] Skontroluj prefetch setting (príliš vysoký?)
- [ ] Skontroluj processing time (externé API pomalé?)
- [ ] Skontroluj poison správy (validation loop?)
- [ ] Zníž prefetch + redeploy ak treba
- [ ] Presuň poison správy do DLQ manuálne ak stuck

Záver

RabbitMQ acknowledgment system je klamne jednoduchý: consume správu, spracuj ju, pošli ack. Ale v praxi, timing toho ack a handling zlyhaní vytvára komplexný kontrakt ktorý väčšina tímov explicitne netestuje.

Ack Contracts spravia z “fungovalo to v stagingu” reprodukovateľnú, testovateľnú garantiu. Spustením reálnych consumerov proti reálnemu RabbitMQ v CI a meraním unacked správ, redeliver ratios a consumer stability, chytíš bugy ktoré sa objavujú len pod produkčným loadom.

Kľúčový insight: ack timing nie je implementačný detail, je to SLO. Treatuj ho tak.


Súvisiace články

Súvisiace články

Citujte tento článok

Ak na článok odkazujete, pridajte pôvodnú URL a uveďte autora.

Michal Drozd. "5000 Unacked správ a stúpa: Zastav RabbitMQ consumer meltdowny v CI". https://www.michal-drozd.com/sk/blog/rabbitmq-ack-contracts/ (Publikované 22. novembra 2025).