Späť na blog

Adaptive Concurrency Limits: Prestaňte Hádať Veľkosti Thread Poolov

|
| concurrency, performance, java, go, netflix, rate-limiting, adaptive

Na konkurencnych limitoch som sa popalil viac krat, nez by som chcel priznat. “Nastav thread pool na 200.” Prečo 200? “Tak to máme vždy.” O dva týždne: latency spikes lebo 200 je príliš vysoké pre túto závislosť.

Netflix adaptive concurrency limits dynamicky upravujú podľa skutočného správania systému. Koniec s hádaním.

Testované na: Java 21, concurrency-limits library 0.4, Spring Boot 3.2

Problém s Fixnými Limitmi

Statická Konfigurácia

# "Štandardná" konfigurácia
server:
  tomcat:
    threads:
      max: 200

spring:
  task:
    execution:
      pool:
        max-size: 100

Prečo Fixné Limity Zlyhávajú

Scenár 1: Limit príliš vysoký
- Závislosť sa spomalí (100ms → 2s)
- 200 threadov všetky blokované čakajúc
- Memory pressure, GC pauzy
- Kaskádové zlyhanie

Scenár 2: Limit príliš nízky
- Závislosť je rýchla (5ms)
- Len 50 threadov, mohlo by zvládnuť 4x viac
- Premrhaná kapacita, zbytočné čakanie vo fronte

Správny limit závisí na:
- Aktuálnej latencii závislostí
- Dostupnom CPU
- Sieťových podmienkach
- Čase dňa

Adaptive Concurrency: Little’s Law Znova

Algoritmus

Little's Law: L = λ × W

L = počet concurrent requestov
λ = request rate (throughput)
W = priemerná latencia

Ak poznáme optimálnu latenciu (W_optimal), vieme vypočítať optimálne L:
L_optimal = λ × W_optimal

Ako latencia rastie (W ↑), mali by sme znížiť konkurenciu (L ↓)
Ako latencia klesá (W ↓), môžeme zvýšiť konkurenciu (L ↑)

AIMD (Additive Increase, Multiplicative Decrease)

Algoritmus:
1. Začni s nízkym limitom (napr. 10)
2. Ak requesty uspejú s dobrou latenciou:
   → limit = limit + 1 (aditívne zvýšenie)
3. Ak latencia degraduje alebo nastanú chyby:
   → limit = limit × 0.9 (multiplikatívne zníženie)
4. Opakuj kontinuálne

Výsledok: Limit automaticky nájde optimálnu hodnotu

Implementácia

Netflix concurrency-limits Knižnica

// pom.xml
// <dependency>
//     <groupId>com.netflix.concurrency-limits</groupId>
//     <artifactId>concurrency-limits-core</artifactId>
//     <version>0.4.1</version>
// </dependency>

import com.netflix.concurrency.limits.Limiter;
import com.netflix.concurrency.limits.limit.AIMDLimit;
import com.netflix.concurrency.limits.limiter.SimpleLimiter;

public class AdaptiveLimiter {

    private final Limiter<Void> limiter;

    public AdaptiveLimiter() {
        // AIMD limit s min 10, max 200
        var limit = AIMDLimit.newBuilder()
            .initialLimit(20)
            .minLimit(10)
            .maxLimit(200)
            .backoffRatio(0.9)  // Zníž o 10% pri zlyhaní
            .build();

        this.limiter = SimpleLimiter.newBuilder()
            .limit(limit)
            .build();
    }

    public <T> T execute(Supplier<T> action) {
        Optional<Limiter.Listener> listener = limiter.acquire(null);

        if (listener.isEmpty()) {
            throw new RejectedExecutionException("Limit reached");
        }

        try {
            T result = action.get();
            listener.get().onSuccess();  // Limit sa môže zvýšiť
            return result;
        } catch (Exception e) {
            listener.get().onDropped();  // Limit sa zníži
            throw e;
        }
    }

    public int currentLimit() {
        return ((SimpleLimiter<?>) limiter).getLimit();
    }
}

Gradient-Based Limit

// Sofistikovanejší: upravuje podľa gradientu latencie
import com.netflix.concurrency.limits.limit.Gradient2Limit;

var limit = Gradient2Limit.newBuilder()
    .initialLimit(20)
    .minLimit(10)
    .maxLimit(200)
    .smoothing(0.2)        // Vyhlaď merania latencie
    .longWindow(600)       // Dlhodobý baseline (vzorky)
    .rttTolerance(1.5)     // Povoľ 50% zvýšenie latencie pred limitovaním
    .build();

// Gradient2 sleduje:
// - Dlhodobú priemernú latenciu (baseline)
// - Krátkodobú latenciu (aktuálnu)
// - Upravuje limit podľa gradientu (aktuálna/baseline)

Spring Boot Integrácia

// ConcurrencyConfig.java
@Configuration
public class ConcurrencyConfig {

    @Bean
    public Limiter<String> httpClientLimiter() {
        var limit = Gradient2Limit.newBuilder()
            .initialLimit(20)
            .minLimit(5)
            .maxLimit(100)
            .build();

        return SimpleLimiter.<String>newBuilder()
            .named("http-client")
            .limit(limit)
            .metricRegistry(new SpectatorMetricRegistry())
            .build();
    }
}

// HttpClientWrapper.java
@Component
public class AdaptiveHttpClient {

    private final RestTemplate restTemplate;
    private final Limiter<String> limiter;

    public <T> T get(String url, Class<T> responseType) {
        Optional<Limiter.Listener> listener = limiter.acquire(url);

        if (listener.isEmpty()) {
            throw new ServiceUnavailableException("Service overloaded");
        }

        long start = System.nanoTime();
        try {
            T result = restTemplate.getForObject(url, responseType);
            long rtt = System.nanoTime() - start;
            listener.get().onSuccess();
            return result;
        } catch (Exception e) {
            if (isServerError(e)) {
                listener.get().onDropped();
            } else {
                listener.get().onIgnore();  // Client error, neupravuj limit
            }
            throw e;
        }
    }
}

Go Implementácia

// adaptive_limiter.go
package limiter

import (
    "sync"
    "sync/atomic"
    "time"
)

type AdaptiveLimiter struct {
    limit       int64
    inFlight    int64
    minLimit    int64
    maxLimit    int64
    backoff     float64
    mu          sync.Mutex
    latencies   []time.Duration
    windowSize  int
}

func NewAdaptiveLimiter(initial, min, max int64) *AdaptiveLimiter {
    return &AdaptiveLimiter{
        limit:      initial,
        minLimit:   min,
        maxLimit:   max,
        backoff:    0.9,
        windowSize: 100,
        latencies:  make([]time.Duration, 0, 100),
    }
}

func (l *AdaptiveLimiter) Acquire() bool {
    for {
        current := atomic.LoadInt64(&l.inFlight)
        limit := atomic.LoadInt64(&l.limit)

        if current >= limit {
            return false
        }

        if atomic.CompareAndSwapInt64(&l.inFlight, current, current+1) {
            return true
        }
    }
}

func (l *AdaptiveLimiter) Release(latency time.Duration, success bool) {
    atomic.AddInt64(&l.inFlight, -1)

    l.mu.Lock()
    defer l.mu.Unlock()

    l.latencies = append(l.latencies, latency)
    if len(l.latencies) > l.windowSize {
        l.latencies = l.latencies[1:]
    }

    if success && l.isLatencyGood() {
        // Aditívne zvýšenie
        newLimit := atomic.LoadInt64(&l.limit) + 1
        if newLimit <= l.maxLimit {
            atomic.StoreInt64(&l.limit, newLimit)
        }
    } else if !success {
        // Multiplikatívne zníženie
        newLimit := int64(float64(atomic.LoadInt64(&l.limit)) * l.backoff)
        if newLimit >= l.minLimit {
            atomic.StoreInt64(&l.limit, newLimit)
        }
    }
}

func (l *AdaptiveLimiter) isLatencyGood() bool {
    if len(l.latencies) < 10 {
        return true
    }
    // Porovnaj aktuálnu vs baseline
    // Implementácia: porovnaj p99 s baseline p99
    return true
}

Benchmarky

Test Setup

// Simulovaná služba s variabilnou latenciou
@GetMapping("/api")
public Response api() {
    // Simuluj latenciu podľa záťaže
    int activeRequests = activeCounter.get();
    int baseLatency = 10;
    int latencyPerRequest = activeRequests > 50 ? (activeRequests - 50) * 2 : 0;

    Thread.sleep(baseLatency + latencyPerRequest);
    return new Response("ok");
}

Výsledky: Fixný vs Adaptívny

Load test: 500 RPS po dobu 10 minút, závislosť degraduje v 5. minúte

Fixný limit = 200:
  Pred degradáciou (0-5 min):
    p50: 12ms, p99: 45ms, chyby: 0%
  Po degradácii (5-10 min):
    p50: 850ms, p99: 4200ms, chyby: 35%
    Thread pool vyčerpaný, kaskádové zlyhanie

Adaptívny limit (10-200):
  Pred degradáciou (0-5 min):
    p50: 12ms, p99: 42ms, chyby: 0%
    Limit stabilizovaný na: 180
  Po degradácii (5-10 min):
    p50: 85ms, p99: 280ms, chyby: 5%
    Limit klesol na: 25
    Systém zostal stabilný!

Monitoring

Prometheus Metriky

// Custom metriky
@Bean
MeterBinder adaptiveLimiterMetrics(Limiter<?> limiter) {
    return registry -> {
        Gauge.builder("adaptive_limiter.limit", limiter,
            l -> ((SimpleLimiter<?>) l).getLimit())
            .register(registry);

        Gauge.builder("adaptive_limiter.inflight", limiter,
            l -> ((SimpleLimiter<?>) l).getInflight())
            .register(registry);
    };
}

Grafana Dashboard

# Aktuálny limit
adaptive_limiter_limit

# Inflight requesty
adaptive_limiter_inflight

# Využitie
adaptive_limiter_inflight / adaptive_limiter_limit

# Zmeny limitu v čase (pre tuning)
changes(adaptive_limiter_limit[5m])

Kedy Použiť Adaptívne Limity

Dobrá Zhoda

✅ HTTP client volania k závislostiam
✅ Database connection pools
✅ Message queue consumers
✅ Akékoľvek volanie externej služby

Prečo: Externé systémy majú variabilnú latenciu

Nie Dobrá Zhoda

❌ Request rate limiting (použi token bucket)
❌ Memory-bound operácie (použi fixný pool)
❌ CPU-bound operácie (použi podľa počtu CPU)

Prečo: Tieto majú predvídateľnú, fixnú kapacitu

Checklist

## Adaptive Concurrency Setup

### Implementácia
- [ ] Pridaj concurrency-limits knižnicu
- [ ] Vyber algoritmus (AIMD vs Gradient2)
- [ ] Nastav min/max hranice vhodne
- [ ] Obal externé volania limiterom

### Konfigurácia
- [ ] Počiatočný limit: začni nízko (10-20)
- [ ] Min limit: dosť pre health checky (5-10)
- [ ] Max limit: rozumná horná hranica (100-500)

### Monitoring
- [ ] Sleduj aktuálny limit v čase
- [ ] Sleduj inflight count
- [ ] Alert keď limit dosiahne min (problémy závislosti)

### Testovanie
- [ ] Load test s degradáciou závislosti
- [ ] Over že limit klesá pod stresom
- [ ] Over že limit sa zotaví po zotavení

Záver

Prestaňte hádať concurrency limity:

  1. Fixné limity zlyhávajú keď sa podmienky zmenia
  2. Adaptívne limity sa upravujú podľa skutočnej latencie
  3. AIMD algoritmus je jednoduchý a efektívny
  4. Gradient2 je sofistikovanejší pre komplexné scenáre

Nechajte algoritmus nájsť optimálny limit za vás.


Súvisiace články

Súvisiace články

Citujte tento článok

Ak na článok odkazujete, pridajte pôvodnú URL a uveďte autora.

Michal Drozd. "Adaptive Concurrency Limits: Prestaňte Hádať Veľkosti Thread Poolov". https://www.michal-drozd.com/sk/blog/adaptive-concurrency-limits/ (Publikované 11. apríla 2025).