Refactor request coalescing logic in Kafka consumer to improve API call handling and reduce duplicate requests
This commit is contained in:
parent
f3487a4a0f
commit
8b38771cbf
@ -15,50 +15,68 @@ covers request coalescing as a way to collapse duplicate reads under concurrent
|
|||||||
load. The same problem appeared in a Kafka consumer I built for processing
|
load. The same problem appeared in a Kafka consumer I built for processing
|
||||||
user-change events.
|
user-change events.
|
||||||
|
|
||||||
Each event — email, username, address changes — is produced to a Kafka topic
|
Each event represents a change to user-related state and is produced to a Kafka
|
||||||
keyed by `accountId`. All events for the same account land on the same
|
topic keyed by `accountId`. All events for the same account land on the same
|
||||||
partition; each pod owns one or more partitions. For every message, the consumer
|
partition; each pod owns one or more partitions. For every message, the consumer
|
||||||
calls an internal API to fetch the current account state and writes the result
|
calls an internal API to fetch the current account state and writes the result
|
||||||
to Spanner.
|
to Spanner.
|
||||||
|
|
||||||
The constraint: the internal API has a **1500 TPS ceiling shared across the
|
The constraint: the internal API has a **1500 TPS ceiling shared across the
|
||||||
entire company**. Without coalescing, events for the same account arriving 2–3
|
entire company**. Without coalescing, events for the same account arriving 2–3
|
||||||
seconds apart each trigger a separate API call — for data that hasn't changed
|
seconds apart each trigger a separate API call, often before the upstream state
|
||||||
between calls. At scale, unnecessary duplicate calls erode the shared budget for
|
has fully settled. Those calls usually return the same effective account state,
|
||||||
every other team consuming the same API.
|
but each one still consumes part of the shared rate limit.
|
||||||
|
At scale, unnecessary duplicate calls erode the shared budget for every other
|
||||||
|
team consuming the same API.
|
||||||
|
|
||||||
## Architecture
|
## Architecture
|
||||||
|
|
||||||
```
|
```text
|
||||||
User change
|
User-change events
|
||||||
│
|
from upstream pipeline
|
||||||
▼
|
│
|
||||||
┌──────────────────────────────────────────────┐
|
│ upstream state settles after ~20s
|
||||||
│ Upstream pipeline │
|
▼
|
||||||
│ A (intake) → B (announce) → C (business │
|
┌───────────────────────────────┐
|
||||||
│ logic + DB write) ~20s to settle │
|
│ Kafka topic │
|
||||||
└───────────────────────┬──────────────────────┘
|
│ key = accountId │
|
||||||
│ Internal API
|
└───────────────┬───────────────┘
|
||||||
│ (source of truth, 1500 TPS shared)
|
│
|
||||||
│
|
│ same accountId → same partition
|
||||||
Kafka topic (key = accountId)
|
▼
|
||||||
│
|
┌───────────────────────────────┐
|
||||||
▼
|
│ Consumer pod │
|
||||||
┌─────────────────────────┐
|
│ │
|
||||||
│ Consumer pod │
|
│ account X message │
|
||||||
│ (1+ partitions) │
|
│ │ │
|
||||||
│ │
|
│ ▼ │
|
||||||
│ msg: account X ────────┤
|
│ singleflight.Do("account-X") │
|
||||||
│ msg: account X ───wait─┤──► singleflight("X")
|
│ │ │
|
||||||
│ msg: account X ───wait─┤ │
|
│ ├─ later account X │
|
||||||
│ │ sleep 30s ← upstream settles
|
│ │ messages wait here │
|
||||||
│ msg: account Y ────────┤ │
|
│ │ │
|
||||||
│ msg: account Y ───wait─┤──► API call (once per account)
|
│ ▼ │
|
||||||
│ │ │
|
│ wait 30s │
|
||||||
└─────────────────────────┘ Spanner write
|
│ │ │
|
||||||
│
|
│ ▼ │
|
||||||
Kafka commit
|
│ call Internal API once │
|
||||||
(all coalesced msgs)
|
│ │ │
|
||||||
|
│ ▼ │
|
||||||
|
│ write result to Spanner │
|
||||||
|
│ │ │
|
||||||
|
│ ▼ │
|
||||||
|
│ commit Kafka offset │
|
||||||
|
│ │
|
||||||
|
│ account Y uses another group │
|
||||||
|
│ and can run in parallel │
|
||||||
|
└───────────────────────────────┘
|
||||||
|
│
|
||||||
|
├──────────────► Internal API
|
||||||
|
│ source of truth
|
||||||
|
│ 1500 TPS shared
|
||||||
|
│
|
||||||
|
└──────────────► Cloud Spanner
|
||||||
|
durable write target
|
||||||
```
|
```
|
||||||
|
|
||||||
## Stack
|
## Stack
|
||||||
@ -67,40 +85,46 @@ User change
|
|||||||
- `golang.org/x/sync/singleflight` — per-`accountId` call deduplication
|
- `golang.org/x/sync/singleflight` — per-`accountId` call deduplication
|
||||||
- **Apache Kafka** — event stream, partitioned by `accountId`
|
- **Apache Kafka** — event stream, partitioned by `accountId`
|
||||||
- **Cloud Spanner** — write target; chosen for strong consistency, managed
|
- **Cloud Spanner** — write target; chosen for strong consistency, managed
|
||||||
replication, and no operational overhead for a small team
|
replication, and lower operational overhead for a small team
|
||||||
|
|
||||||
## Highlights
|
## Highlights
|
||||||
|
|
||||||
**Kafka partitioning is load-bearing, not incidental.**
|
**Kafka partitioning is load-bearing, not incidental.**
|
||||||
Routing by `accountId` as the message key guarantees all events for one account
|
Routing by `accountId` as the message key guarantees all events for one account
|
||||||
hit the same partition and are processed by one consumer instance. Without this,
|
hit the same partition and are processed by one consumer instance. This matters
|
||||||
two pods could independently fire API calls for the same account at the same
|
because `singleflight` is in-process. It does not deduplicate work across pods.
|
||||||
time, and singleflight — which is in-process — would not help.
|
Without this partitioning strategy, two pods could independently fire API calls
|
||||||
|
for the same account at the same time, and `singleflight` would not help. The
|
||||||
|
Kafka key is therefore part of the coalescing design, not just a stream-routing
|
||||||
|
detail.
|
||||||
|
|
||||||
**The 30-second sleep is deliberate, not a workaround.**
|
**The 30-second sleep is deliberate, not a workaround.**
|
||||||
The upstream pipeline takes roughly 20 seconds to apply business logic and
|
The upstream pipeline takes roughly 20 seconds to apply business logic and
|
||||||
commit state. The consumer sleeps 30 seconds before calling the API, giving the
|
commit state. The consumer waits 30 seconds before calling the API, giving the
|
||||||
upstream time to settle. Messages for the same `accountId` that arrive during
|
upstream time to settle.
|
||||||
the sleep join the existing singleflight group and block — they never reach the
|
Messages for the same `accountId` that arrive during this wait join the existing
|
||||||
API independently. When the call completes, all of them share the result.
|
`singleflight` group and block. They never reach the API independently. When the
|
||||||
|
call completes, all of them share the result.
|
||||||
|
|
||||||
**Coalesced messages commit to Kafka together.**
|
**Coalesced messages become eligible to commit after the shared call resolves.**
|
||||||
At the end of a singleflight group's lifetime, all messages that waited on that
|
Messages waiting on the same `singleflight` call are not acknowledged until the
|
||||||
call have their Kafka offsets committed in a single batch. This is a natural
|
shared API call and Spanner write complete. In the production implementation,
|
||||||
consequence of the blocking behaviour: no message in the group is acknowledged
|
offset handling is coordinated outside the fetch function, so the consumer
|
||||||
until the group resolves.
|
avoids committing messages before the shared result has been durably written.
|
||||||
|
This keeps the processing model simple: fetch once, write once, then acknowledge
|
||||||
|
the messages that depended on that result.
|
||||||
|
|
||||||
**Coalescing protects a shared rate limit, not just throughput.**
|
**Coalescing protects a shared rate limit, not just throughput.**
|
||||||
The internal API enforces a 1500 TPS ceiling across the whole company. Each
|
The internal API enforces a 1500 TPS ceiling across the whole company. Each
|
||||||
unnecessary duplicate call consumes capacity that other teams depend on.
|
unnecessary duplicate call consumes capacity that other teams depend on.
|
||||||
Singleflight makes the consumer a cooperative user of that shared resource
|
`singleflight` makes the consumer a cooperative user of that shared resource
|
||||||
rather than a potential source of contention. After deployment, API call volume
|
rather than a potential source of contention. After deployment, API call volume
|
||||||
dropped 36% — measured via consumer metrics sent to Datadog.
|
dropped 36% — measured via consumer metrics sent to Datadog.
|
||||||
|
|
||||||
**Different `accountId`s run in parallel.**
|
**Different `accountId`s run in parallel.**
|
||||||
Singleflight groups are keyed by `accountId`. An in-flight call for account X
|
`singleflight` groups are keyed by `accountId`. An in-flight call for account X
|
||||||
does not block processing for account Y — each account gets its own group,
|
does not block processing for account Y. Each account gets its own group, its own
|
||||||
its own 30-second window, and its own API call.
|
30-second window, and its own API call.
|
||||||
|
|
||||||
## Code
|
## Code
|
||||||
|
|
||||||
@ -110,30 +134,36 @@ its own 30-second window, and its own API call.
|
|||||||
var group singleflight.Group
|
var group singleflight.Group
|
||||||
|
|
||||||
func processMessage(ctx context.Context, accountID string) error {
|
func processMessage(ctx context.Context, accountID string) error {
|
||||||
result, err, _ := group.Do(accountID, func() (interface{}, error) {
|
result, err, _ := group.Do(accountID, func() (interface{}, error) {
|
||||||
// All messages for the same accountID block here.
|
// All messages for the same accountID block here.
|
||||||
// Only the first goroutine executes this function.
|
// Only the first goroutine executes this function.
|
||||||
|
// Wait for the upstream pipeline to settle before fetching.
|
||||||
// Wait for the upstream pipeline to settle before fetching.
|
timer := time.NewTimer(30 * time.Second)
|
||||||
time.Sleep(30 * time.Second)
|
defer timer.Stop()
|
||||||
|
select {
|
||||||
return fetchFromInternalAPI(ctx, accountID)
|
case <-ctx.Done():
|
||||||
})
|
return nil, ctx.Err()
|
||||||
if err != nil {
|
case <-timer.C:
|
||||||
return err
|
}
|
||||||
}
|
return fetchFromInternalAPI(ctx, accountID)
|
||||||
|
})
|
||||||
if err := writeToSpanner(ctx, result.(*AccountData)); err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
accountData, ok := result.(*AccountData)
|
||||||
// Caller commits its Kafka offset after this returns.
|
if !ok {
|
||||||
// All goroutines that waited on the same group reach this point
|
return fmt.Errorf("unexpected result type: %T", result)
|
||||||
// with the same result and commit their offsets together.
|
}
|
||||||
return nil
|
if err := writeToSpanner(ctx, accountData); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Caller commits its Kafka offset after this returns.
|
||||||
|
// In production, offset handling is coordinated outside this function.
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
## Status
|
## Status
|
||||||
|
|
||||||
Running in production. API call reduction is tracked as an ongoing metric in Datadog.
|
This pattern is currently running in production. API call reduction is tracked
|
||||||
|
as an ongoing Datadog metric.
|
||||||
Loading…
x
Reference in New Issue
Block a user