From 8b38771cbffc9bf941318096f7150033cb05141c Mon Sep 17 00:00:00 2001 From: Nik Afiq Date: Mon, 11 May 2026 21:25:49 +0900 Subject: [PATCH] Refactor request coalescing logic in Kafka consumer to improve API call handling and reduce duplicate requests --- .../request-coalescing-kafka-consumer.md | 178 ++++++++++-------- 1 file changed, 104 insertions(+), 74 deletions(-) diff --git a/content/posts/request-coalescing-kafka-consumer.md b/content/posts/request-coalescing-kafka-consumer.md index 0dc6c04..9985663 100644 --- a/content/posts/request-coalescing-kafka-consumer.md +++ b/content/posts/request-coalescing-kafka-consumer.md @@ -15,50 +15,68 @@ covers request coalescing as a way to collapse duplicate reads under concurrent load. The same problem appeared in a Kafka consumer I built for processing user-change events. -Each event — email, username, address changes — is produced to a Kafka topic -keyed by `accountId`. All events for the same account land on the same +Each event represents a change to user-related state and is produced to a Kafka +topic keyed by `accountId`. All events for the same account land on the same partition; each pod owns one or more partitions. For every message, the consumer calls an internal API to fetch the current account state and writes the result to Spanner. The constraint: the internal API has a **1500 TPS ceiling shared across the entire company**. Without coalescing, events for the same account arriving 2–3 -seconds apart each trigger a separate API call — for data that hasn't changed -between calls. At scale, unnecessary duplicate calls erode the shared budget for -every other team consuming the same API. +seconds apart each trigger a separate API call, often before the upstream state +has fully settled. Those calls usually return the same effective account state, +but each one still consumes part of the shared rate limit. +At scale, unnecessary duplicate calls erode the shared budget for every other +team consuming the same API. ## Architecture -``` -User change - │ - ▼ -┌──────────────────────────────────────────────┐ -│ Upstream pipeline │ -│ A (intake) → B (announce) → C (business │ -│ logic + DB write) ~20s to settle │ -└───────────────────────┬──────────────────────┘ - │ Internal API - │ (source of truth, 1500 TPS shared) - │ - Kafka topic (key = accountId) - │ - ▼ - ┌─────────────────────────┐ - │ Consumer pod │ - │ (1+ partitions) │ - │ │ - │ msg: account X ────────┤ - │ msg: account X ───wait─┤──► singleflight("X") - │ msg: account X ───wait─┤ │ - │ │ sleep 30s ← upstream settles - │ msg: account Y ────────┤ │ - │ msg: account Y ───wait─┤──► API call (once per account) - │ │ │ - └─────────────────────────┘ Spanner write - │ - Kafka commit - (all coalesced msgs) +```text +User-change events +from upstream pipeline + │ + │ upstream state settles after ~20s + ▼ +┌───────────────────────────────┐ +│ Kafka topic │ +│ key = accountId │ +└───────────────┬───────────────┘ + │ + │ same accountId → same partition + ▼ +┌───────────────────────────────┐ +│ Consumer pod │ +│ │ +│ account X message │ +│ │ │ +│ ▼ │ +│ singleflight.Do("account-X") │ +│ │ │ +│ ├─ later account X │ +│ │ messages wait here │ +│ │ │ +│ ▼ │ +│ wait 30s │ +│ │ │ +│ ▼ │ +│ call Internal API once │ +│ │ │ +│ ▼ │ +│ write result to Spanner │ +│ │ │ +│ ▼ │ +│ commit Kafka offset │ +│ │ +│ account Y uses another group │ +│ and can run in parallel │ +└───────────────────────────────┘ + │ + ├──────────────► Internal API + │ source of truth + │ 1500 TPS shared + │ + └──────────────► Cloud Spanner + durable write target ``` ## Stack @@ -67,40 +85,46 @@ User change - `golang.org/x/sync/singleflight` — per-`accountId` call deduplication - **Apache Kafka** — event stream, partitioned by `accountId` - **Cloud Spanner** — write target; chosen for strong consistency, managed - replication, and no operational overhead for a small team + replication, and lower operational overhead for a small team ## Highlights **Kafka partitioning is load-bearing, not incidental.** Routing by `accountId` as the message key guarantees all events for one account -hit the same partition and are processed by one consumer instance. Without this, -two pods could independently fire API calls for the same account at the same -time, and singleflight — which is in-process — would not help. +hit the same partition and are processed by one consumer instance. This matters +because `singleflight` is in-process. It does not deduplicate work across pods. +Without this partitioning strategy, two pods could independently fire API calls +for the same account at the same time, and `singleflight` would not help. The +Kafka key is therefore part of the coalescing design, not just a stream-routing +detail. **The 30-second sleep is deliberate, not a workaround.** The upstream pipeline takes roughly 20 seconds to apply business logic and -commit state. The consumer sleeps 30 seconds before calling the API, giving the -upstream time to settle. Messages for the same `accountId` that arrive during -the sleep join the existing singleflight group and block — they never reach the -API independently. When the call completes, all of them share the result. +commit state. The consumer waits 30 seconds before calling the API, giving the +upstream time to settle. +Messages for the same `accountId` that arrive during this wait join the existing +`singleflight` group and block. They never reach the API independently. When the +call completes, all of them share the result. -**Coalesced messages commit to Kafka together.** -At the end of a singleflight group's lifetime, all messages that waited on that -call have their Kafka offsets committed in a single batch. This is a natural -consequence of the blocking behaviour: no message in the group is acknowledged -until the group resolves. +**Coalesced messages become eligible to commit after the shared call resolves.** +Messages waiting on the same `singleflight` call are not acknowledged until the +shared API call and Spanner write complete. In the production implementation, +offset handling is coordinated outside the fetch function, so the consumer +avoids committing messages before the shared result has been durably written. +This keeps the processing model simple: fetch once, write once, then acknowledge +the messages that depended on that result. **Coalescing protects a shared rate limit, not just throughput.** The internal API enforces a 1500 TPS ceiling across the whole company. Each unnecessary duplicate call consumes capacity that other teams depend on. -Singleflight makes the consumer a cooperative user of that shared resource +`singleflight` makes the consumer a cooperative user of that shared resource rather than a potential source of contention. After deployment, API call volume dropped 36% — measured via consumer metrics sent to Datadog. **Different `accountId`s run in parallel.** -Singleflight groups are keyed by `accountId`. An in-flight call for account X -does not block processing for account Y — each account gets its own group, -its own 30-second window, and its own API call. +`singleflight` groups are keyed by `accountId`. An in-flight call for account X +does not block processing for account Y. Each account gets its own group, its own +30-second window, and its own API call. ## Code @@ -110,30 +134,36 @@ its own 30-second window, and its own API call. var group singleflight.Group func processMessage(ctx context.Context, accountID string) error { - result, err, _ := group.Do(accountID, func() (interface{}, error) { - // All messages for the same accountID block here. - // Only the first goroutine executes this function. - - // Wait for the upstream pipeline to settle before fetching. - time.Sleep(30 * time.Second) - - return fetchFromInternalAPI(ctx, accountID) - }) - if err != nil { - return err - } - - if err := writeToSpanner(ctx, result.(*AccountData)); err != nil { - return err - } - - // Caller commits its Kafka offset after this returns. - // All goroutines that waited on the same group reach this point - // with the same result and commit their offsets together. - return nil + result, err, _ := group.Do(accountID, func() (interface{}, error) { + // All messages for the same accountID block here. + // Only the first goroutine executes this function. + // Wait for the upstream pipeline to settle before fetching. + timer := time.NewTimer(30 * time.Second) + defer timer.Stop() + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-timer.C: + } + return fetchFromInternalAPI(ctx, accountID) + }) + if err != nil { + return err + } + accountData, ok := result.(*AccountData) + if !ok { + return fmt.Errorf("unexpected result type: %T", result) + } + if err := writeToSpanner(ctx, accountData); err != nil { + return err + } + // Caller commits its Kafka offset after this returns. + // In production, offset handling is coordinated outside this function. + return nil } ``` ## Status -Running in production. API call reduction is tracked as an ongoing metric in Datadog. \ No newline at end of file +This pattern is currently running in production. API call reduction is tracked +as an ongoing Datadog metric. \ No newline at end of file