From f3487a4a0fc90e01367e4b681b610911c4fc4359 Mon Sep 17 00:00:00 2001 From: Nik Afiq Date: Mon, 11 May 2026 21:10:23 +0900 Subject: [PATCH] Add documentation for request coalescing in Kafka consumer --- .../request-coalescing-kafka-consumer.md | 139 ++++++++++++++++++ 1 file changed, 139 insertions(+) create mode 100644 content/posts/request-coalescing-kafka-consumer.md diff --git a/content/posts/request-coalescing-kafka-consumer.md b/content/posts/request-coalescing-kafka-consumer.md new file mode 100644 index 0000000..0dc6c04 --- /dev/null +++ b/content/posts/request-coalescing-kafka-consumer.md @@ -0,0 +1,139 @@ +--- +title: "Request Coalescing in a Kafka Consumer" +date: 2026-05-09 +draft: false +description: "Collapsing redundant upstream API calls in a Kafka consumer with Go's singleflight, applied to concurrent user-change events against a shared rate-limited API." +tags: [go, kafka, singleflight, spanner, google-cloud, event-driven] +github: "" +url: "" +--- + +## Overview + +Discord's post on [how they store trillions of messages](https://discord.com/blog/how-discord-stores-trillions-of-messages) +covers request coalescing as a way to collapse duplicate reads under concurrent +load. The same problem appeared in a Kafka consumer I built for processing +user-change events. + +Each event — email, username, address changes — is produced to a Kafka topic +keyed by `accountId`. All events for the same account land on the same +partition; each pod owns one or more partitions. For every message, the consumer +calls an internal API to fetch the current account state and writes the result +to Spanner. + +The constraint: the internal API has a **1500 TPS ceiling shared across the +entire company**. Without coalescing, events for the same account arriving 2–3 +seconds apart each trigger a separate API call — for data that hasn't changed +between calls. At scale, unnecessary duplicate calls erode the shared budget for +every other team consuming the same API. + +## Architecture + +``` +User change + │ + ▼ +┌──────────────────────────────────────────────┐ +│ Upstream pipeline │ +│ A (intake) → B (announce) → C (business │ +│ logic + DB write) ~20s to settle │ +└───────────────────────┬──────────────────────┘ + │ Internal API + │ (source of truth, 1500 TPS shared) + │ + Kafka topic (key = accountId) + │ + ▼ + ┌─────────────────────────┐ + │ Consumer pod │ + │ (1+ partitions) │ + │ │ + │ msg: account X ────────┤ + │ msg: account X ───wait─┤──► singleflight("X") + │ msg: account X ───wait─┤ │ + │ │ sleep 30s ← upstream settles + │ msg: account Y ────────┤ │ + │ msg: account Y ───wait─┤──► API call (once per account) + │ │ │ + └─────────────────────────┘ Spanner write + │ + Kafka commit + (all coalesced msgs) +``` + +## Stack + +- **Go** — consumer service +- `golang.org/x/sync/singleflight` — per-`accountId` call deduplication +- **Apache Kafka** — event stream, partitioned by `accountId` +- **Cloud Spanner** — write target; chosen for strong consistency, managed + replication, and no operational overhead for a small team + +## Highlights + +**Kafka partitioning is load-bearing, not incidental.** +Routing by `accountId` as the message key guarantees all events for one account +hit the same partition and are processed by one consumer instance. Without this, +two pods could independently fire API calls for the same account at the same +time, and singleflight — which is in-process — would not help. + +**The 30-second sleep is deliberate, not a workaround.** +The upstream pipeline takes roughly 20 seconds to apply business logic and +commit state. The consumer sleeps 30 seconds before calling the API, giving the +upstream time to settle. Messages for the same `accountId` that arrive during +the sleep join the existing singleflight group and block — they never reach the +API independently. When the call completes, all of them share the result. + +**Coalesced messages commit to Kafka together.** +At the end of a singleflight group's lifetime, all messages that waited on that +call have their Kafka offsets committed in a single batch. This is a natural +consequence of the blocking behaviour: no message in the group is acknowledged +until the group resolves. + +**Coalescing protects a shared rate limit, not just throughput.** +The internal API enforces a 1500 TPS ceiling across the whole company. Each +unnecessary duplicate call consumes capacity that other teams depend on. +Singleflight makes the consumer a cooperative user of that shared resource +rather than a potential source of contention. After deployment, API call volume +dropped 36% — measured via consumer metrics sent to Datadog. + +**Different `accountId`s run in parallel.** +Singleflight groups are keyed by `accountId`. An in-flight call for account X +does not block processing for account Y — each account gets its own group, +its own 30-second window, and its own API call. + +## Code + +> Illustrative only — this is not the production implementation. + +```go +var group singleflight.Group + +func processMessage(ctx context.Context, accountID string) error { + result, err, _ := group.Do(accountID, func() (interface{}, error) { + // All messages for the same accountID block here. + // Only the first goroutine executes this function. + + // Wait for the upstream pipeline to settle before fetching. + time.Sleep(30 * time.Second) + + return fetchFromInternalAPI(ctx, accountID) + }) + if err != nil { + return err + } + + if err := writeToSpanner(ctx, result.(*AccountData)); err != nil { + return err + } + + // Caller commits its Kafka offset after this returns. + // All goroutines that waited on the same group reach this point + // with the same result and commit their offsets together. + return nil +} +``` + +## Status + +Running in production. API call reduction is tracked as an ongoing metric in Datadog. \ No newline at end of file