Rýchly štart
Inštalácia
# Docker (odporúčané pre development)
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=secret \
rabbitmq:3.13-management
# Ubuntu/Debian
curl -1sLf 'https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey' | sudo gpg --dearmor -o /usr/share/keyrings/rabbitmq.gpg
sudo apt-get install rabbitmq-server
# Povoliť management plugin
rabbitmq-plugins enable rabbitmq_management
Prístupové body
| Služba | Predvolená URL/Port | Popis |
|---|
| AMQP Protokol | localhost:5672 | Klientske pripojenia |
| Management UI | http://localhost:15672 | Webové rozhranie |
| Management API | http://localhost:15672/api | REST API |
| Predvolené prihlasovacie údaje | guest/guest | Len lokálne, zmeniť v produkcii |
CLI príkazy (rabbitmqctl)
Správa clustra
# Skontrolovať stav clustra
rabbitmqctl cluster_status
# Pripojiť sa ku clustru
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# Zmeniť typ nodu v clustri
rabbitmqctl change_cluster_node_type disc # alebo ram
# Odstrániť nod z clustra
rabbitmqctl forget_cluster_node rabbit@node2
# Vynútený boot (použiť keď sú cluster metadata poškodené)
rabbitmqctl force_boot
Správa používateľov
# Vypísať používateľov
rabbitmqctl list_users
# Pridať používateľa
rabbitmqctl add_user username password
# Zmazať používateľa
rabbitmqctl delete_user username
# Zmeniť heslo
rabbitmqctl change_password username newpassword
# Nastaviť user tags (role)
rabbitmqctl set_user_tags username administrator
rabbitmqctl set_user_tags username monitoring management
# Nastaviť oprávnenia (vhost, configure, write, read regex)
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
rabbitmqctl set_permissions -p /production username "^app-.*" "^app-.*" "^app-.*"
# Vypísať oprávnenia
rabbitmqctl list_permissions -p /
rabbitmqctl list_user_permissions username
Virtual Hosty (vhosts)
# Vypísať vhosty
rabbitmqctl list_vhosts
# Pridať vhost
rabbitmqctl add_vhost /production
# Zmazať vhost (zmaže všetky exchanges, queues, bindings)
rabbitmqctl delete_vhost /old-vhost
# Nastaviť limity vhostu
rabbitmqctl set_vhost_limits -p / '{"max-connections": 1000}'
rabbitmqctl set_vhost_limits -p / '{"max-queues": 500}'
Správa front
# Vypísať fronty s detailmi
rabbitmqctl list_queues name messages consumers memory state
# Vypísať všetky info o frontách
rabbitmqctl list_queues name messages_ready messages_unacknowledged
# Vyprázdniť frontu
rabbitmqctl purge_queue queue_name -p /
# Zmazať frontu
rabbitmqctl delete_queue queue_name -p /
Správa exchanges
# Vypísať exchanges
rabbitmqctl list_exchanges name type durable auto_delete
# Zmazať exchange
rabbitmqctl delete_exchange exchange_name -p /
Pripojenia a kanály
# Vypísať pripojenia
rabbitmqctl list_connections name peer_host peer_port state channels
# Vypísať kanály
rabbitmqctl list_channels connection name number messages_unacknowledged
# Zatvoriť pripojenie
rabbitmqctl close_connection "<connection_name>" "Údržba"
# Zatvoriť všetky pripojenia
rabbitmqctl close_all_connections "Údržba clustra"
Health checks
# Health check nodu
rabbitmqctl node_health_check
# Skontrolovať či nod beží
rabbitmqctl status
# Kontrola disk space
rabbitmqctl check_running
rabbitmqctl check_local_alarms
# Kontrola pamäte
rabbitmqctl environment | grep memory
Údržba
# Zastaviť aplikáciu (Erlang VM ďalej beží)
rabbitmqctl stop_app
# Spustiť aplikáciu
rabbitmqctl start_app
# Úplný restart
rabbitmqctl shutdown
systemctl start rabbitmq-server
# Rotovať logy
rabbitmqctl rotate_logs
# Reset nodu (POZOR: zmaže všetky dáta)
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
Konfigurácia (rabbitmq.conf)
Základné nastavenia
# Sieť & Listenery
listeners.tcp.default = 5672
listeners.ssl.default = 5671
# Management plugin
management.tcp.port = 15672
management.ssl.port = 15671
# Limity pamäte
vm_memory_high_watermark.relative = 0.6 # 60% dostupnej RAM
vm_memory_high_watermark.absolute = 2GB
# Limity disk priestoru
disk_free_limit.absolute = 5GB
disk_free_limit.relative = 1.5 # 1.5x RAM
# Heartbeat
heartbeat = 60
# Channel max
channel_max = 2048
# Frame max (max veľkosť správy)
frame_max = 131072 # 128KB
# Predvolený vhost
default_vhost = /
# Predvolený používateľ
default_user = guest
default_pass = guest
default_user_tags.administrator = true
# Nastavenia front
queue_master_locator = min-masters # Vyvážiť queue masters
queue_index_embed_msgs_below = 4096
# Lazy queues (diskové)
queue_mode = lazy
# Message TTL (milisekundy)
message_ttl = 3600000 # 1 hodina
# Consumer timeout
consumer_timeout = 1800000 # 30 minút
# Limity pripojení a kanálov na vhost
vhost_limits.max_connections = 1024
vhost_limits.max_queues = 512
Clustering
# Názov clustra
cluster_name = production-cluster
# Spracovanie cluster partícií
cluster_partition_handling = autoheal # alebo pause_minority
# Network tick time (cluster heartbeat)
net_ticktime = 60
TLS/SSL
listeners.ssl.default = 5671
ssl_options.cacertfile = /path/to/ca_certificate.pem
ssl_options.certfile = /path/to/server_certificate.pem
ssl_options.keyfile = /path/to/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
ssl_options.versions.1 = tlsv1.3
ssl_options.versions.2 = tlsv1.2
Management API
Autentifikácia
# Basic auth
curl -u admin:secret http://localhost:15672/api/overview
# Získať všetky vhosty
curl -u admin:secret http://localhost:15672/api/vhosts
Časté endpointy
| Metóda | Endpoint | Popis |
|---|
GET | /api/overview | Prehľad clustra |
GET | /api/nodes | Zoznam všetkých nodov |
GET | /api/connections | Zoznam pripojení |
GET | /api/channels | Zoznam kanálov |
GET | /api/queues | Zoznam všetkých front |
GET | /api/queues/{vhost}/{name} | Detail fronty |
GET | /api/exchanges | Zoznam exchanges |
POST | /api/queues/{vhost}/{name} | Vytvoriť frontu |
DELETE | /api/queues/{vhost}/{name} | Zmazať frontu |
POST | /api/exchanges/{vhost}/{name} | Vytvoriť exchange |
POST | /api/bindings/{vhost}/e/{ex}/q/{queue} | Vytvoriť binding |
Príklady
# Získať detail fronty
curl -u admin:secret http://localhost:15672/api/queues/%2F/my-queue
# Vytvoriť frontu
curl -u admin:secret -X PUT \
-H "content-type:application/json" \
-d '{"durable":true,"auto_delete":false}' \
http://localhost:15672/api/queues/%2F/new-queue
# Vyprázdniť frontu
curl -u admin:secret -X DELETE \
http://localhost:15672/api/queues/%2F/my-queue/contents
# Získať správy (nedeštruktívne)
curl -u admin:secret -X POST \
-H "content-type:application/json" \
-d '{"count":10,"ackmode":"ack_requeue_true","encoding":"auto"}' \
http://localhost:15672/api/queues/%2F/my-queue/get
# Publikovať správu
curl -u admin:secret -X POST \
-H "content-type:application/json" \
-d '{"properties":{},"routing_key":"my-queue","payload":"ahoj","payload_encoding":"string"}' \
http://localhost:15672/api/exchanges/%2F/amq.default/publish
Klientske knižnice
Python (pika)
import pika
# Pripojenie
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost',
port=5672,
credentials=pika.PlainCredentials('admin', 'secret'),
heartbeat=60,
blocked_connection_timeout=300
)
)
channel = connection.channel()
# Vytvoriť frontu
channel.queue_declare(
queue='tasks',
durable=True,
arguments={'x-max-length': 10000}
)
# Publikovať správu
channel.basic_publish(
exchange='',
routing_key='tasks',
body='Hello World',
properties=pika.BasicProperties(
delivery_mode=2, # persistent
content_type='application/json',
expiration='3600000' # 1 hodina TTL
)
)
# Konzumovať správy
def callback(ch, method, properties, body):
print(f"Prijatá správa {body}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=10)
channel.basic_consume(queue='tasks', on_message_callback=callback)
channel.start_consuming()
Java (Spring AMQP)
@Configuration
public class RabbitConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory("localhost");
factory.setUsername("admin");
factory.setPassword("secret");
factory.setRequestedHeartBeat(60);
factory.setConnectionTimeout(30000);
return factory;
}
@Bean
public Queue tasksQueue() {
return QueueBuilder.durable("tasks")
.maxLength(10000L)
.ttl(3600000) // 1 hodina
.build();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
RabbitTemplate template = new RabbitTemplate(factory);
template.setMandatory(true);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
}
// Publisher
@Service
public class Publisher {
@Autowired
private RabbitTemplate template;
public void send(String message) {
template.convertAndSend("tasks", message);
}
}
// Consumer
@Service
public class Consumer {
@RabbitListener(queues = "tasks", concurrency = "5-10")
public void process(String message) {
System.out.println("Prijatá správa: " + message);
}
}
Node.js (amqplib)
const amqp = require('amqplib');
async function main() {
// Pripojiť
const conn = await amqp.connect({
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'admin',
password: 'secret',
heartbeat: 60
});
const channel = await conn.createChannel();
// Vytvoriť frontu
await channel.assertQueue('tasks', {
durable: true,
arguments: { 'x-max-length': 10000 }
});
// Publikovať
channel.sendToQueue('tasks', Buffer.from('Ahoj'), {
persistent: true,
expiration: '3600000' // 1 hodina
});
// Konzumovať
channel.prefetch(10);
channel.consume('tasks', async (msg) => {
console.log('Prijatá správa:', msg.content.toString());
channel.ack(msg);
});
}
main().catch(console.error);
Typy Exchange a vzory
Direct Exchange
# Use case: Point-to-point routing
# Routing: Presná zhoda routing key
rabbitmqadmin declare exchange name=logs type=direct durable=true
rabbitmqadmin declare queue name=error-logs durable=true
rabbitmqadmin declare binding source=logs destination=error-logs routing_key=error
Fanout Exchange
# Use case: Broadcast na všetky fronty
# Routing: Ignoruje routing key
rabbitmqadmin declare exchange name=notifications type=fanout durable=true
rabbitmqadmin declare queue name=email-queue durable=true
rabbitmqadmin declare queue name=sms-queue durable=true
rabbitmqadmin declare binding source=notifications destination=email-queue
rabbitmqadmin declare binding source=notifications destination=sms-queue
Topic Exchange
# Use case: Routing podľa vzoru
# Routing: Wildcard matching (* = jedno slovo, # = žiadne alebo viac slov)
rabbitmqadmin declare exchange name=events type=topic durable=true
rabbitmqadmin declare binding source=events destination=user-created routing_key="user.created.*"
rabbitmqadmin declare binding source=events destination=all-errors routing_key="*.*.error"
# Use case: Routing podľa hlavičiek správ
# Routing: Zhoda kľúč-hodnota hlavičiek
rabbitmqadmin declare exchange name=tasks type=headers durable=true
rabbitmqadmin declare binding source=tasks destination=high-priority \
arguments='{"x-match":"all","priority":"high","type":"critical"}'
Monitoring & Metriky
Kľúčové metriky na sledovanie
| Metrika | Príkaz | Varovný prah |
|---|
| Využitie pamäte | rabbitmqctl status | grep memory | >70% limitu |
| Voľný disk | rabbitmqctl environment | grep disk_free | <1.5x RAM |
| Pripojenia | rabbitmqctl list_connections | wc -l | >80% limitu |
| Fronty | rabbitmqctl list_queues | wc -l | >1000 front |
| Nepotvrdené správy | rabbitmqctl list_queues messages_unacknowledged | Rastúce v čase |
| Message Rate | Pozrieť Management UI | Trvalé >10k msg/s |
| File Descriptory | rabbitmqctl status | >80% ulimitu |
Prometheus metriky
# Povoliť Prometheus plugin
rabbitmq-plugins enable rabbitmq_prometheus
# Metrics endpoint
curl http://localhost:15692/metrics
# Kľúčové metriky:
# - rabbitmq_queue_messages
# - rabbitmq_queue_messages_ready
# - rabbitmq_queue_messages_unacknowledged
# - rabbitmq_connections
# - rabbitmq_channels
# - rabbitmq_erlang_vm_memory_bytes
Production Best Practices
Vysoká dostupnosť
| Praktika | Konfigurácia | Benefit |
|---|
| Quorum Queues | x-queue-type: quorum | Bezpečnosť dát, replikácia |
| Lazy Queues | x-queue-mode: lazy | Spracovanie miliónov správ |
| Veľkosť clustra | 3 alebo 5 nodov (nepárny počet) | Odolnosť voči zlyhaniu |
| Load Balancer | HAProxy/nginx pred RabbitMQ | Distribúcia pripojení |
| Publisher Confirms | Povoliť v klientovi | Garantované doručenie |
Konfigurácia fronty
# Production-ready fronta
channel.queue_declare(
queue='tasks',
durable=True, # Prežije reštart
arguments={
'x-queue-type': 'quorum', # HA fronta
'x-max-length': 100000, # Limit veľkosti fronty
'x-message-ttl': 86400000, # 24h TTL
'x-dead-letter-exchange': 'dlx', # DLX pre zlyhané správy
'x-dead-letter-routing-key': 'failed.tasks',
'x-max-priority': 10 # Podpora priority
}
)
| Nastavenie | Produkčná hodnota | Dôvod |
|---|
| Prefetch Count | 10-50 | Vyvážiť throughput & fairness |
| Connection Pool | 5-10 na aplikáciu | Znovu použiť pripojenia |
| Channel Pool | 1 na vlákno | Kanály nie sú thread-safe |
| Heartbeat | 60s | Detekovať mŕtve pripojenia |
| Publisher Confirms | Povolené | Garancia trvanlivosti |
| Consumer ACK Mode | Manuálny | Kontrola spracovania |
Security checklist
# 1. Zmeniť predvolené prihlasovacie údaje
rabbitmqctl change_password guest "$(openssl rand -base64 32)"
rabbitmqctl delete_user guest # V produkcii
# 2. Vytvoriť app-špecifických používateľov
rabbitmqctl add_user myapp "$(openssl rand -base64 32)"
rabbitmqctl set_permissions -p / myapp "^myapp-.*" "^myapp-.*" "^myapp-.*"
# 3. Povoliť TLS
# Pridať do rabbitmq.conf:
# listeners.ssl.default = 5671
# ssl_options.cacertfile = /path/to/ca.pem
# ssl_options.certfile = /path/to/cert.pem
# ssl_options.keyfile = /path/to/key.pem
# 4. Limitovať zdroje vhostu
rabbitmqctl set_vhost_limits -p / '{"max-connections": 1024, "max-queues": 500}'
# 5. Povoliť audit logging
rabbitmq-plugins enable rabbitmq_event_exchange
Troubleshooting
Časté problémy
# 1. Connection refused
# Skontrolovať: Beží RabbitMQ?
systemctl status rabbitmq-server
rabbitmqctl status
# 2. Memory alarm
# Skontrolovať využitie pamäte
rabbitmqctl environment | grep memory
# Vyprázdniť zbytočné fronty
rabbitmqctl list_queues name messages | sort -k2 -nr
rabbitmqctl purge_queue <large_queue>
# 3. Disk alarm
# Skontrolovať disk space
df -h
rabbitmqctl environment | grep disk_free_limit
# Vyčistiť staré logy
rabbitmqctl rotate_logs
# 4. Authentication failed
# Vypísať používateľov a oprávnenia
rabbitmqctl list_users
rabbitmqctl list_permissions -p /
# 5. Fronta zaseknutá/nespracováva
# Skontrolovať consumerov
rabbitmqctl list_queues name consumers
rabbitmqctl list_consumers
# Skontrolovať blokovanie
rabbitmqctl list_connections state
# 6. Správy sa nesmerujú
# Skontrolovať bindingy
rabbitmqctl list_bindings
# Povoliť tracing (dočasne)
rabbitmqctl trace_on -p /
rabbitmqctl trace_off -p /
Debug logging
# Povoliť debug logging
rabbitmqctl set_log_level debug
# Skontrolovať logy
tail -f /var/log/rabbitmq/[email protected]
# Vrátiť na info level
rabbitmqctl set_log_level info
Rýchle referenčné tabuľky
Vlastnosti správ
| Vlastnosť | Typ | Popis | Príklad |
|---|
delivery_mode | int | 1=transient, 2=persistent | 2 |
priority | int | 0-255 (ak fronta podporuje) | 5 |
expiration | string | TTL v milisekundách | "60000" |
content_type | string | MIME type | "application/json" |
reply_to | string | Callback fronta | "rpc-replies" |
correlation_id | string | ID požiadavky | "req-12345" |
message_id | string | Unikátne ID | "msg-uuid" |
timestamp | int | Unix timestamp | 1640000000 |
Argumenty fronty
| Argument | Typ | Popis |
|---|
x-message-ttl | int | Message TTL (ms) |
x-expires | int | Auto-delete fronty po (ms) |
x-max-length | int | Max veľkosť fronty (správy) |
x-max-length-bytes | int | Max veľkosť fronty (bajty) |
x-dead-letter-exchange | string | DLX názov |
x-dead-letter-routing-key | string | DLX routing key |
x-max-priority | int | Max priorita (0-255) |
x-queue-mode | string | lazy alebo default |
x-queue-type | string | classic alebo quorum |
Časté porty
| Port | Služba |
|---|
| 4369 | Erlang Port Mapper Daemon (epmd) |
| 5672 | AMQP |
| 5671 | AMQPS (TLS) |
| 15672 | Management HTTP |
| 15671 | Management HTTPS |
| 15692 | Prometheus metriky |
| 25672 | Inter-node komunikácia |
Zdroje