Add documentation for request coalescing in Kafka consumer
This commit is contained in:
parent
c6ca4d23fc
commit
f3487a4a0f
139
content/posts/request-coalescing-kafka-consumer.md
Normal file
139
content/posts/request-coalescing-kafka-consumer.md
Normal file
@ -0,0 +1,139 @@
|
|||||||
|
---
|
||||||
|
title: "Request Coalescing in a Kafka Consumer"
|
||||||
|
date: 2026-05-09
|
||||||
|
draft: false
|
||||||
|
description: "Collapsing redundant upstream API calls in a Kafka consumer with Go's singleflight, applied to concurrent user-change events against a shared rate-limited API."
|
||||||
|
tags: [go, kafka, singleflight, spanner, google-cloud, event-driven]
|
||||||
|
github: ""
|
||||||
|
url: ""
|
||||||
|
---
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
Discord's post on [how they store trillions of messages](https://discord.com/blog/how-discord-stores-trillions-of-messages)
|
||||||
|
covers request coalescing as a way to collapse duplicate reads under concurrent
|
||||||
|
load. The same problem appeared in a Kafka consumer I built for processing
|
||||||
|
user-change events.
|
||||||
|
|
||||||
|
Each event — email, username, address changes — is produced to a Kafka topic
|
||||||
|
keyed by `accountId`. All events for the same account land on the same
|
||||||
|
partition; each pod owns one or more partitions. For every message, the consumer
|
||||||
|
calls an internal API to fetch the current account state and writes the result
|
||||||
|
to Spanner.
|
||||||
|
|
||||||
|
The constraint: the internal API has a **1500 TPS ceiling shared across the
|
||||||
|
entire company**. Without coalescing, events for the same account arriving 2–3
|
||||||
|
seconds apart each trigger a separate API call — for data that hasn't changed
|
||||||
|
between calls. At scale, unnecessary duplicate calls erode the shared budget for
|
||||||
|
every other team consuming the same API.
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
```
|
||||||
|
User change
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
┌──────────────────────────────────────────────┐
|
||||||
|
│ Upstream pipeline │
|
||||||
|
│ A (intake) → B (announce) → C (business │
|
||||||
|
│ logic + DB write) ~20s to settle │
|
||||||
|
└───────────────────────┬──────────────────────┘
|
||||||
|
│ Internal API
|
||||||
|
│ (source of truth, 1500 TPS shared)
|
||||||
|
│
|
||||||
|
Kafka topic (key = accountId)
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
┌─────────────────────────┐
|
||||||
|
│ Consumer pod │
|
||||||
|
│ (1+ partitions) │
|
||||||
|
│ │
|
||||||
|
│ msg: account X ────────┤
|
||||||
|
│ msg: account X ───wait─┤──► singleflight("X")
|
||||||
|
│ msg: account X ───wait─┤ │
|
||||||
|
│ │ sleep 30s ← upstream settles
|
||||||
|
│ msg: account Y ────────┤ │
|
||||||
|
│ msg: account Y ───wait─┤──► API call (once per account)
|
||||||
|
│ │ │
|
||||||
|
└─────────────────────────┘ Spanner write
|
||||||
|
│
|
||||||
|
Kafka commit
|
||||||
|
(all coalesced msgs)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Stack
|
||||||
|
|
||||||
|
- **Go** — consumer service
|
||||||
|
- `golang.org/x/sync/singleflight` — per-`accountId` call deduplication
|
||||||
|
- **Apache Kafka** — event stream, partitioned by `accountId`
|
||||||
|
- **Cloud Spanner** — write target; chosen for strong consistency, managed
|
||||||
|
replication, and no operational overhead for a small team
|
||||||
|
|
||||||
|
## Highlights
|
||||||
|
|
||||||
|
**Kafka partitioning is load-bearing, not incidental.**
|
||||||
|
Routing by `accountId` as the message key guarantees all events for one account
|
||||||
|
hit the same partition and are processed by one consumer instance. Without this,
|
||||||
|
two pods could independently fire API calls for the same account at the same
|
||||||
|
time, and singleflight — which is in-process — would not help.
|
||||||
|
|
||||||
|
**The 30-second sleep is deliberate, not a workaround.**
|
||||||
|
The upstream pipeline takes roughly 20 seconds to apply business logic and
|
||||||
|
commit state. The consumer sleeps 30 seconds before calling the API, giving the
|
||||||
|
upstream time to settle. Messages for the same `accountId` that arrive during
|
||||||
|
the sleep join the existing singleflight group and block — they never reach the
|
||||||
|
API independently. When the call completes, all of them share the result.
|
||||||
|
|
||||||
|
**Coalesced messages commit to Kafka together.**
|
||||||
|
At the end of a singleflight group's lifetime, all messages that waited on that
|
||||||
|
call have their Kafka offsets committed in a single batch. This is a natural
|
||||||
|
consequence of the blocking behaviour: no message in the group is acknowledged
|
||||||
|
until the group resolves.
|
||||||
|
|
||||||
|
**Coalescing protects a shared rate limit, not just throughput.**
|
||||||
|
The internal API enforces a 1500 TPS ceiling across the whole company. Each
|
||||||
|
unnecessary duplicate call consumes capacity that other teams depend on.
|
||||||
|
Singleflight makes the consumer a cooperative user of that shared resource
|
||||||
|
rather than a potential source of contention. After deployment, API call volume
|
||||||
|
dropped 36% — measured via consumer metrics sent to Datadog.
|
||||||
|
|
||||||
|
**Different `accountId`s run in parallel.**
|
||||||
|
Singleflight groups are keyed by `accountId`. An in-flight call for account X
|
||||||
|
does not block processing for account Y — each account gets its own group,
|
||||||
|
its own 30-second window, and its own API call.
|
||||||
|
|
||||||
|
## Code
|
||||||
|
|
||||||
|
> Illustrative only — this is not the production implementation.
|
||||||
|
|
||||||
|
```go
|
||||||
|
var group singleflight.Group
|
||||||
|
|
||||||
|
func processMessage(ctx context.Context, accountID string) error {
|
||||||
|
result, err, _ := group.Do(accountID, func() (interface{}, error) {
|
||||||
|
// All messages for the same accountID block here.
|
||||||
|
// Only the first goroutine executes this function.
|
||||||
|
|
||||||
|
// Wait for the upstream pipeline to settle before fetching.
|
||||||
|
time.Sleep(30 * time.Second)
|
||||||
|
|
||||||
|
return fetchFromInternalAPI(ctx, accountID)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := writeToSpanner(ctx, result.(*AccountData)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Caller commits its Kafka offset after this returns.
|
||||||
|
// All goroutines that waited on the same group reach this point
|
||||||
|
// with the same result and commit their offsets together.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Status
|
||||||
|
|
||||||
|
Running in production. API call reduction is tracked as an ongoing metric in Datadog.
|
||||||
Loading…
x
Reference in New Issue
Block a user