5000 Unacked správ a stúpa: Zastav RabbitMQ consumer meltdowny v CI
RabbitMQ unacked message explózie ma naučili, že “funguje to v stagingu” neznamená nič ak netestuješ acknowledgment behavior. “Prečo queue rastie? Consumer beží.” Zízali sme do RabbitMQ management UI ukazujúceho messages_unacknowledged stúpajúce z 50 na 500 na 2000, kým messages_ready zostávalo nízke. Použitie pamäte na brokeri stúpalo. Consumer proces vyzeral zdravo - CPU normálne, žiadne crashe, logy ukazovali “správy sa úspešne spracúvajú.”
Potom prišiel redeploy. V momente keď sa consumer restartoval, všetkých 2000 unacked správ sa redeliverlo. Čas spracovania sa zdvojnásobil kvôli duplicate detection logike. Polovica zlyhala validáciou a išla späť do queue znova. Nechtiac sme vytvorili redelivery storm - tie isté správy sa cyklili systémom, každý pokus spotreboval zdroje, žiadna z nich sa skutočne nedokončila.
Root cause bol trápne jednoduchý: niekto “dočasne” zvýšil prefetch na 500 aby to “lepšie zvládalo burst traffic.” V kombinácii s pomalým spracovaním (external API call + database write na správu), to znamenalo že každá consumer inštancia mohla držať 500 správ unacked. Keď sa spracovanie spomalilo (API timeouty), unacked count explodoval. Keď sa consumer restartoval, RabbitMQ správne redeliveroval všetky unacked správy, ale naša idempotencia nebola perfektná, čo viedlo k stormu.
Čo robilo toto zvlášť frustrujúce bolo, že naše load testovanie vyzeralo v poriadku. Testovali sme throughput, latenciu, error rates - ale nikdy sme netestovali čo sa stane s unacked správami počas processing delays alebo restartov. RabbitMQ ack kontrakt je implicitný: musíš acknúť alebo nacknúť v rozumnom čase, alebo vytvoríš memory/redelivery bombu. Ale nikto toto nevaliduje v CI.
Prostredie: RabbitMQ 3.12+, Spring AMQP consumeri, prefetch=500, externé API závislosti
Pochopenie Unacked Message problému
Ako RabbitMQ Acknowledgment funguje
Šťastná cesta:
┌─────────┐ deliver ┌──────────┐
│ RabbitMQ│ ─────────────────▶│ Consumer │
│ Queue │ │ │
└─────────┘ └──────────┘
1. Správa delivered, označená "unacknowledged"
2. Consumer spracuje (zapíše DB, zavolá API)
3. Consumer pošle ACK
4. RabbitMQ odstráni z queue permanentne
Unacked count: normálne nízky (in-flight spracovanie)
Čo sa skutočne deje s vysokým prefetch + pomalým spracovaním:
┌─────────┐ ┌──────────┐
│ RabbitMQ│ ─────────────────▶│ Consumer │
│ Queue │ prefetch=500 │ (pomalý) │
└─────────┘ └──────────┘
RabbitMQ deliveruje 500 správ okamžite (prefetch limit)
Consumer spracovanie každej trvá 5 sekúnd (API call + DB write)
Unacked count = 500
Ak sa spracovanie spomalí na 10s (API timeout):
- Consumer môže acknúť len ~50/s
- Nové správy prichádzajú rýchlosťou 100/s
- Unacked stále rastie!
Ak consumer crashne/restartuje:
- Všetky 500 unacked → redelivered
- Ak nie je idempotentné → duplicity/errory
- Ak niektoré zlyhajú validáciou → nack requeue=true → loop forever
Redelivery Storm pattern
1. Vysoký unacked count sa nabuilduje (500+ správ)
2. Consumer crashne alebo sa redeployne
3. RabbitMQ redeliveruje všetky unacked správy
4. Consumer ich spracuje znova:
- Niektoré duplicity (už v DB) → nack requeue=false → DLQ
- Niektoré zlyhajú validáciou → nack requeue=true → späť do queue
- Niektoré uspejú → ack
5. Tie čo išli späť do queue (#4) sa redeliverujú
6. Cyklus sa opakuje
Výsledok: tie isté správy sa cyklia, spotrebúvajú CPU/memory/API kvótu
Toto je neviditeľné v štandardných metrikách pokiaľ nie je príliš neskoro.
Bežné scenáre (debugoval som všetky)
1. “Dočasné” zvýšenie prefetch
# Predtým: rozumné
spring:
rabbitmq:
listener:
simple:
prefetch: 10 # Max 10 unacked per consumer
# Potom: katastrofa
spring:
rabbitmq:
listener:
simple:
prefetch: 500 # "lepšie zvládne bursty!"
Funguje skvelo… pokiaľ sa spracovanie nespomalí. Potom 500 unacked per consumer × 10 consumerov = 5000 unacked správ = RabbitMQ memory spike.
2. Ack až po “všetkom”
@RabbitListener(queues = "billing.jobs")
public void process(BillingJob job) {
// 1. Validuj
validate(job);
// 2. Zápis do DB
repository.save(job);
// 3. Zavolaj externé API (môže byť pomalé!)
externalService.notify(job); // ← Toto trvá 5-30s
// 4. Aktualizuj analytics
analytics.track(job);
// ACK sa deje tu automaticky (Spring default)
}
Ak je externé API pomalé, unacked sa builduje. Ak zlyhá, správa je lost (už acked). Lepšie: acknúť po idempotent pointe (DB write).
3. Poison Message Loop
@RabbitListener(queues = "orders")
public void process(Order order) {
try {
processOrder(order);
// auto-ack
} catch (ValidationException e) {
// Správa ide späť do queue (nack requeue=true)
// Redeliveruje sa okamžite
// Zlyhá validáciou znova
// Nekonečný loop!
}
}
Bez DLX (Dead Letter Exchange) alebo retry limitov, poison správy sa cyklia navždy.
Ack Contract prístup
Definuj explicitné očakávania pre consumer behavior:
# ack_contract.yml
version: 1
queues:
billing.jobs:
vhost: /
max_unacked: 200 # Total unacked naprieč všetkými consumermi
max_unacked_per_consumer: 50 # Per consumer inštancia
max_redeliver_ratio: 0.02 # Max 2% správ redelivered
sample_duration: 60s # Testuj 1 minútu
sample_interval: 1s # Kontroluj každú sekundu
Tento kontrakt hovorí:
- max_unacked: Nie viac ako 200 správ čakajúcich na ack v akomkoľvek čase
- max_unacked_per_consumer: Každý consumer môže držať max 50 unacked (validuje prefetch)
- max_redeliver_ratio: Menej ako 2% správ sa redeliveruje (chytí poison loops)
Implementácia: Testuj reálne Consumer behavior
Kľúčový insight: musíš testovať s reálnym RabbitMQ brokerom a tvojím reálnym consumer kódom. Mocking nechytí ack timing problémy.
Docker Compose pre CI
# docker-compose.rabbit.yml
services:
rabbitmq:
image: rabbitmq:3.12-management
ports:
- "5672:5672"
- "15672:15672" # Management UI/API
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 5s
timeout: 5s
retries: 3
Test Scenár: Publish + Consume + Meraj
// ContractTest.java
@SpringBootTest
@Testcontainers
class AckContractTest {
@Container
static RabbitMQContainer rabbit = new RabbitMQContainer("rabbitmq:3.12-management");
@Test
void verifyAckContract() throws Exception {
// 1. Spusti consumer (tvoj reálny consumer kód)
startConsumer();
// 2. Publishni workload (vrátane niektorých poison správ)
publishTestWorkload(1000);
// 3. Nechaj bežať 60s
Thread.sleep(60_000);
// 4. Validuj že unacked zostal v hraniciach
RabbitStats stats = fetchQueueStats("billing.jobs");
assertThat(stats.unacked).isLessThan(200);
assertThat(stats.redeliverRatio).isLessThan(0.02);
}
private RabbitStats fetchQueueStats(String queue) {
// Zavolaj RabbitMQ Management API:
// GET http://localhost:15672/api/queues/%2F/billing.jobs
// Vráti: messages_ready, messages_unacknowledged, message_stats.*
}
}
Toto testuje:
- Reálne ack timing (nie mocked)
- Reálne processing delays
- Reálne error handling (poison správy)
- Reálne prefetch správanie
CI Integrácia (GitHub Actions)
name: rabbitmq-ack-contract
on: [pull_request]
jobs:
ack:
runs-on: ubuntu-latest
services:
rabbitmq:
image: rabbitmq:3.12-management
ports:
- 5672:5672
- 15672:15672
options: >-
--health-cmd "rabbitmq-diagnostics -q ping"
--health-interval 5s
--health-timeout 5s
--health-retries 3
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
with:
distribution: temurin
java-version: '21'
- uses: actions/setup-go@v5
with:
go-version: '1.22'
- name: Spusti consumer v pozadí
run: |
./gradlew bootRun &
sleep 10 # Počkaj na start consumera
- name: Publishni test workload
run: |
# Pošli 1000 správ vrátane poison prípadov
./gradlew publishTestMessages
- name: Over ack kontrakt
run: |
go run ./cmd/ackcontract \
-mgmt http://localhost:15672 \
-user guest \
-pass guest \
-queue billing.jobs \
-max-unacked 200 \
-max-unacked-per-consumer 50 \
-max-redeliver-ratio 0.02 \
-duration 60s \
-interval 1s \
-report ack_report.json
- name: Uploadni report
if: always()
uses: actions/upload-artifact@v4
with:
name: ack-report
path: ack_report.json
Toto ti dá reálny end-to-end test ack behaviouru v CI, chytajúci problémy ako:
- Prefetch príliš vysoký
- Ack príliš neskoro
- Poison message loops
- Chýbajúca DLX konfigurácia
Runtime Monitorovanie (Produkcia)
CI chytí design problémy. Produkcia potrebuje kontinuálne monitorovanie:
# Unacked správy (malo by zostávať nízke)
rabbitmq_queue_messages_unacknowledged{queue="billing.jobs"}
# Redeliver ratio (malo by byť blízko 0)
rate(rabbitmq_queue_messages_redelivered_total[5m])
/ rate(rabbitmq_queue_messages_delivered_total[5m])
# Alert na spike
rabbitmq_queue_messages_unacknowledged > 500
# Alert na redelivery storm
rate(rabbitmq_queue_messages_redelivered_total[5m])
/ rate(rabbitmq_queue_messages_delivered_total[5m])
> 0.05 # Viac ako 5% redelivered
Keď Unacked exploduje v produkcii
Krok 1: Diagnostikuj
# Skontroluj queue stats
rabbitmqctl list_queues name messages_unacknowledged consumers
# Skontroluj consumer prefetch
rabbitmqctl list_consumers | grep billing.jobs
# Skontroluj poison správy (vysoký redeliver count)
# Cez Management UI: Queue → Get Messages → Skontroluj "redelivered"
Krok 2: Okamžitá mitigácia
# Možnosť 1: Zníž prefetch (vyžaduje consumer restart)
# Aktualizuj config: prefetch: 10
# Možnosť 2: Presuň poison správy do DLQ manuálne
# Cez Management UI: Purge alebo presuň správy
# Možnosť 3: Scaluj consumery (ak je bottleneck processing kapacita)
kubectl scale deployment billing-consumer --replicas=10
Krok 3: Oprav Root Cause
Bežné fixy:
Vysoký Unacked:
// Predtým: ack po všetkom
@RabbitListener(queues = "billing.jobs", ackMode = "AUTO")
public void process(Job job) {
db.save(job);
slowApi.call(job); // Drží správu unacked počas pomalého volania
}
// Potom: manuálny ack po idempotent pointe
@RabbitListener(queues = "billing.jobs", ackMode = "MANUAL")
public void process(Job job, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
db.save(job); // Idempotent
channel.basicAck(tag, false); // Ack tu!
try {
slowApi.call(job); // Ak toto zlyhá, správa už acked (bezpečné)
} catch (Exception e) {
// Handle ale nenackuj (správa už odstránená z queue)
}
}
Poison Message Loop:
// Pridaj DLX (Dead Letter Exchange) konfiguráciu
@Bean
Queue billingQueue() {
return QueueBuilder.durable("billing.jobs")
.withArgument("x-dead-letter-exchange", "billing.dlx")
.withArgument("x-dead-letter-routing-key", "billing.failed")
.build();
}
// Handle validation errors správne
@RabbitListener(queues = "billing.jobs", ackMode = "MANUAL")
public void process(Job job, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
validate(job);
processJob(job);
channel.basicAck(tag, false);
} catch (ValidationException e) {
// Nerequeue invalidné správy
channel.basicNack(tag, false, false); // requeue=false → ide do DLX
}
}
Checklist
## RabbitMQ Ack Contract Checklist
### Pred deploymentom
- [ ] Prefetch nastavený podľa skutočného processing času (nie "cíti sa dobre")
- [ ] Ack po idempotent pointe, nie po celom flow
- [ ] DLX nakonfigurované pre poison message handling
- [ ] Validation errory → nack requeue=false (nie nekonečný loop)
### CI Kontrakt
- [ ] Testuj s reálnym RabbitMQ (nie mocked)
- [ ] Zahrňuj poison správy v test workload
- [ ] Validuj max_unacked počas 60s behu
- [ ] Validuj redeliver_ratio < 2%
### Produkčné Monitorovanie
- [ ] Alert na unacked > threshold
- [ ] Alert na redeliver_ratio > 5%
- [ ] Dashboard ukazujúci unacked trend
- [ ] Trackuj consumer count (chytí crashe)
### Keď Unacked vystrelie
- [ ] Skontroluj prefetch setting (príliš vysoký?)
- [ ] Skontroluj processing time (externé API pomalé?)
- [ ] Skontroluj poison správy (validation loop?)
- [ ] Zníž prefetch + redeploy ak treba
- [ ] Presuň poison správy do DLQ manuálne ak stuck
Záver
RabbitMQ acknowledgment system je klamne jednoduchý: consume správu, spracuj ju, pošli ack. Ale v praxi, timing toho ack a handling zlyhaní vytvára komplexný kontrakt ktorý väčšina tímov explicitne netestuje.
Ack Contracts spravia z “fungovalo to v stagingu” reprodukovateľnú, testovateľnú garantiu. Spustením reálnych consumerov proti reálnemu RabbitMQ v CI a meraním unacked správ, redeliver ratios a consumer stability, chytíš bugy ktoré sa objavujú len pod produkčným loadom.
Kľúčový insight: ack timing nie je implementačný detail, je to SLO. Treatuj ho tak.
Súvisiace články
- Database Connection Pool Exhaustion - Podobný resource exhaustion pattern
- Circuit Breaker Anti-Patterns - Failure handling patterny
Súvisiace články
Jedna partition na 99% CPU: Zastav Kafka hotspoty skôr ako dorazia do produkcie
Všetky partitiony vyzerajú vyvážené v testovaní, potom príde produkčný traffic a jedna partition sa roztopí. Vinník: tvoj partition key má otrásnú kardinalitu a nikto si toho nevšimol.
Polia zmizli ale nič nespadlo: Zachyť Schema Evolution bugy pred produkciou
Producer upgradol Protobuf, consumer ešte na starej verzii. Žiadne errory, žiadne warningy—len tichá strata dát v produkcii. Tvoja schema evolúcia rozbila backward compatibility a CI si toho nevšimlo.
pg_waldump WAL Forenzika: Rekonštrukcia Čo Sa Stalo s Tvojimi Dátami
Niečo zmazalo riadky z produkcie ale nikto nepriznáva že spustil DELETE. Použi pg_waldump na analýzu WAL súborov a rekonštruuj presne čo sa stalo a kedy.
RSS Contracts: Ako prestat zabijat Java pody v Kubernetes (OOMKilled) testovanim RSS ako API
Cgroup RSS budgety, CI sampling a runtime headroom ti chytia JVM memory regresie skor, nez trafia produkciu.
Citujte tento článok
Ak na článok odkazujete, pridajte pôvodnú URL a uveďte autora.