Apache Kafka

A distributed event streaming platform built for high-throughput, fault-tolerant, real-time data pipelines. This guide covers every concept from fundamentals to production failure scenarios.

Distributed Event Streaming Fault Tolerant High Throughput Horizontally Scalable Durable Storage
Chapter 01

Event-Driven Architecture (EDA)

Services don't call each other directly. Instead, a service emits events and other services react to those events.

What is an Event?

REST Microservices vs EDA Microservices

REST (Synchronous) Challenges
  • Availability — all services must be up. If any one service is down, the entire API call fails.
  • Latency — total latency = sum of all service latencies in the chain.
  • Cascading Failure — if Service 3 is slow or failing, Service 2 backs up, then Service 1 goes down too.
  • Tight Coupling — services know about each other, changes ripple across teams.
  • Scaling Issue — scaling Service 3 has no benefit if Service 1 and 2 can only handle the same request rate.
EDA (Asynchronous) Advantages
  • Loose Coupling — producer doesn't know or care about consumers.
  • Independent Scaling — each service scales on its own based on its throughput.
  • Resilience — temporary failures don't break the whole system. Events wait in the broker.
  • Replay — you can replay old events for recovery, debugging, or new consumer onboarding.
  • Latency Improvement — fire-and-forget pattern. Producer doesn't block waiting for downstream.

Two Types of EDA

Push-Based (Pub/Sub)
  • Broker pushes events to active consumers.
  • Only cares about delivery.
  • You cannot replay old events.
  • Events published only to currently active subscribers.
  • Examples: RabbitMQ, AWS SNS, Google Pub/Sub.
Pull-Based (Streaming / Kafka)
  • Consumer pulls events at its own pace.
  • Events are appended to logs — persisted on disk.
  • You can replay events from any offset.
  • Retention-based: events stay forever or until configured TTL.
  • Examples: Apache Kafka, Amazon Kinesis, Redpanda.
Chapter 02

What is Apache Kafka?

A distributed event streaming platform that allows you to publish, store, and subscribe to streams of events in real-time.

In simple words: Kafka is like a super-fast, never-forgetting diary. Applications write events (things that happened) into this diary, and other applications can read those events whenever they want — even days later. The diary can handle millions of entries per second and never loses a page.

P
Publish Events

Producers write events (messages) to Kafka topics. Events are key-value pairs with optional headers and a timestamp. Producers choose which topic and optionally which partition.

S
Store Events

Events are durably stored on disk across multiple brokers with configurable replication. Kafka can retain events for hours, days, or forever. Storage is sequential and immutable.

C
Subscribe to Events

Consumers pull events from topics at their own pace. Multiple consumer groups can independently read the same data. Each group tracks its own position (offset) in the log.

High-Level Kafka Architecture
Producer 1 ──┐ ┌── Consumer Group 1 Producer 2 ──┤ ┌─────────────────────────────┐ ├── Consumer Group 2 Producer 3 ──┼───▶│ KAFKA CLUSTER │──────▶├── Consumer Group 3 Producer N ──┤ │ Broker 1 Broker 2 Broker 3│ ├── Consumer Group N │ │ ┌──────┐ ┌──────┐ ┌──────┐ │ │ └───▶│ │Topic │ │Topic │ │Topic │ │───────┘ │ │P0 P1 │ │P0 P1 │ │P0 P1 │ │ │ └──────┘ └──────┘ └──────┘ │ └─────────────────────────────┘
Millions
Events/sec throughput
ms
End-to-end latency
PB
Scale (Petabytes of data)
99.99%
Availability with replication
Chapter 03

Producer

An application that publishes events to Kafka. The producer does not care who will consume the event. It's fire-and-forward.

In simple words: A producer is like a news reporter. They write the news story and send it to the newspaper office (Kafka). They don't know or care who reads the newspaper — that's not their job. Their job is just to report what happened.

Producer Record Structure

// A Kafka Producer Record contains: { "topic": "order-events", // Required: which topic "key": "order-123", // Optional: determines partition "value": "{ orderJson }", // Required: the actual event data "headers": { "source": "api-gw" }, // Optional: metadata key-value pairs "timestamp": 1709424000000 // Optional: event time or create time }

Producer ACK Configurations

The producer can control how many acknowledgements it waits for before considering the write successful.

0
acks = 0 (Fire & Forget)

Producer sends the record and does NOT wait for any response. Fastest but data can be lost if broker crashes before writing. Use for metrics/logs where loss is acceptable.

1
acks = 1 (Leader Only)

Producer waits for the leader broker to write to its local log and acknowledge. Balanced between speed and safety. Data can be lost if leader crashes before followers replicate.

all
acks = all (All ISR)

Producer waits for the leader AND all in-sync replicas (ISR) to write the event. Slowest but safest. No data loss as long as at least one ISR is alive. Production recommended.

Chapter 04

Topic

A Topic is a logical category or folder to which events are published. Think of it as a named stream. Many producers can write to the same topic, and many consumer groups can read from the same topic independently.

In simple words: A topic is like a TV channel. Channel "order-events" shows all order-related news. Channel "payment-events" shows all payment-related news. Producers broadcast to a channel, and consumers tune into whichever channels they care about.

Key Points A topic is a logical concept — it does NOT reside on a single machine. Its partitions are distributed across multiple brokers. A topic never "fails" because it's not a physical entity. What fails are the partitions (and they have replicas for recovery).

Topic Properties

PropertyDescriptionDefault
num.partitionsNumber of partitions when topic is created1
replication.factorHow many copies of each partition across brokers1 (set to 3 in prod)
retention.msHow long to keep events before deletion604800000 (7 days)
retention.bytesMax size of partition log before old segments are deleted-1 (unlimited)
cleanup.policyWhether to delete or compact old eventsdelete
min.insync.replicasMinimum ISR count for acks=all writes to succeed1 (set to 2 in prod)
segment.bytesSize of each segment file in the partition log1073741824 (1 GB)

Internal Topics (created by Kafka itself)

Topic NamePartitionsPurpose
__consumer_offsets50 (default)Stores committed offsets for every consumer group. Keyed by (groupId, topic, partition).
__transaction_state50 (default)Stores transaction metadata for exactly-once semantics.
_cluster_metadata.log1KRaft cluster metadata log (replicated across controllers).
Chapter 05

Partitions

A partition is a physical, ordered, append-only log that is part of a topic. It's where events are actually stored on disk.

In simple words: If a topic is a book, then partitions are the chapters. The book "order-events" might have 3 chapters (partitions). Each chapter is stored on a different shelf (broker). Events are written at the end of a chapter — you never insert pages in the middle. This makes writing super fast.

Three Key Properties

P
Physical

Events are actually stored on disk inside the partition as log files. Each partition maps to a directory on the broker's filesystem containing segment files. Example: /data/order-events-0/

O
Ordered

Events are written sequentially and assigned an incrementing offset (starting from 0). Consumers read in offset order. Within a single partition, order is guaranteed. Kafka never re-orders events inside a partition.

A
Append-Only

Events are only appended to the end of the log. No insert in the middle. No updates. No deletes of individual records. The log only grows forward. This is what makes Kafka's sequential disk I/O so fast.

Topic: order-events — 3 Partitions
Partition 0 (order-events-0)
E0
off:0
E3
off:1
E6
off:2
E9
off:3
next
Partition 1 (order-events-1)
E1
off:0
E4
off:1
E7
off:2
next
Partition 2 (order-events-2)
E2
off:0
E5
off:1
E8
off:2
next
Ordering Guarantee Order is guaranteed within a single partition. In P0, E3 definitely happened after E0. But you cannot compare across partitions — E4 in P1 may have happened before or after E3 in P0. Offsets are per-partition, not global.
Chapter 06

Partitioning Strategies

Producer chooses which partition of a topic the event goes to. This decision impacts ordering guarantees and load distribution.

#
Key-Based Partitioning

partition = hash(key) % num_partitions

All events with the same key go to the same partition. This guarantees ordering for related events. Example: all events for orderId "123" always go to the same partition.

Ordering Guaranteed Possible Hotspots

R
Round Robin (No Key)

When key = null, events are distributed in round-robin across partitions.

E0 → P0, E1 → P1, E2 → P2, E3 → P0 ... Even distribution but ordering is NOT guaranteed. Related events may land in different partitions.

Even Distribution No Ordering

C
Custom Partitioner

You implement your own Partitioner interface with business logic.

Example: country == "India" → P0, country == "US" → P1. Gives full control over data locality and processing affinity.

Full Control Custom Logic

Key-Based Partitioning Example
// Topic: order-events, 3 partitions Publish: { topic: "order-events", key: "order-123", value: orderJson } partition = hash("order-123") % 3 = 1 → always goes to Partition 1 E0: hash("123") % 3 = 1 → Partition 1 E1: hash("456") % 3 = 2 → Partition 2 E2: hash("789") % 3 = 0 → Partition 0 E3: hash("123") % 3 = 1 → Partition 1 (same key = same partition)
Chapter 07

Segments & Indexing

Each partition log file is further divided into segments. Instead of one massive log file, Kafka splits it into smaller, manageable pieces for faster reads and efficient cleanup.

In simple words: Imagine writing a very long diary. Instead of one 10,000-page book, you split it into volumes: Volume 1 (pages 1-500), Volume 2 (pages 501-1000), etc. When someone asks for page 750, you grab Volume 2 directly instead of flipping through the entire diary. That's what segments do for Kafka.

Why Segments?

If a consumer says "read from offset 550 to 800 of Partition 0", Kafka can skip directly to the right segment file instead of scanning from the beginning. Also, retention and compaction operate at the segment level — Kafka deletes or compacts entire segment files.

Partition 0 → Segment Files
Topic: order-events └── Partition 0: order-events-0/ ├── 00000000000000000000.log ← offsets 0-499 ├── 00000000000000000000.index ← sparse index for segment 1 ├── 00000000000000000000.timeindex ├── 00000000000000000500.log ← offsets 500-999 ├── 00000000000000000500.index ← sparse index for segment 2 ├── 00000000000000000500.timeindex ├── 00000000000000001000.log ← offsets 1000+ (active) ├── 00000000000000001000.index └── 00000000000000001000.timeindex File name = first offset in that segment

Sparse Index (.index files)

Even with segments, a 1 GB segment file would still need sequential scanning. So Kafka maintains a sparse index per segment. Not every offset is indexed. Instead, it creates one index entry after every N bytes written (configured by log.index.interval.bytes, default 4096 bytes).

Index Lookup Example

Segment Log (00000000.log)

OffsetEvent SizeFile Position
0300 bytes0
1500 bytes300
21000 bytes800
32000 bytes1800
4500 bytes3800

Total bytes: 300+500+1000+2000+500 = 4300 bytes

Sparse Index (00000000.index)

OffsetPosition (bytes)
43800

Index interval = 4096 bytes. After 4300 bytes written (> 4096), Kafka records offset 4 at position 3800. To find offset 3, Kafka looks at index, finds nearest lower entry, then scans sequentially from there.

How Offset Lookup Works 1. Identify the correct segment file by comparing the target offset with segment file names.   2. Binary search the .index file to find the nearest lower indexed offset.   3. Sequential scan from that position in the .log file to the exact offset.   This gives O(1) segment lookup + O(log n) index lookup + small sequential scan.
Chapter 08

Broker

A Broker is a single Kafka server instance. It's the one that actually stores data on disk and serves client requests (both producers and consumers).

In simple words: A broker is like a warehouse worker. Each worker (broker) manages some shelves (partitions). Producers give packages (events) to the right worker, and consumers ask the right worker for their packages. Multiple workers together form a team (cluster).

Critical Understanding Topics and partitions are distributed across multiple brokers. A single broker does NOT hold all topics. It also does NOT hold all partitions of the same topic. Each broker stores some partitions of some topics.

Broker Responsibilities

Broker Networking

Kafka uses a custom binary protocol over TCP. Default port is 9092. Each broker has a unique broker.id. Clients connect to any broker (bootstrap) to discover the full cluster topology, then connect directly to the partition leaders they need.

Chapter 09

Kafka Cluster

A Kafka Cluster is a group of brokers working together to provide scalability, fault tolerance, and high availability.

S
Scalability

Distribute load across multiple servers. More brokers = more partitions = more throughput. Producers and consumers talk to different brokers in parallel.

F
Fault Tolerance

If a broker goes down, its partitions have replicas on other brokers. Data is never lost (with replication factor >= 2). Automatic failover via leader election.

H
High Availability

No single point of failure. Controller has standby nodes. Every partition has replicas. Clients automatically reconnect to new leaders. System continues even if minority of brokers fail.

3-Broker Cluster with Replication Factor = 2
Broker 1
P0 Leader
P1 Follower
Broker 2
P1 Leader
P2 Follower
Broker 3
P2 Leader
P0 Follower
Chapter 10

Leader-Follower Partition Replication

For each partition, one broker is the Leader and the rest are Followers. This is the core of Kafka's fault tolerance.

In simple words: Imagine a group project. One person (leader) does the original work and everyone (producers/consumers) talks to them. The other members (followers) silently make copies of the leader's work. If the leader gets sick, one of the followers who has a complete copy can immediately take over. No work is lost.

Leader Responsibilities
  • Handle ALL producer writes for that partition.
  • Handle ALL consumer reads for that partition.
  • Maintain partition log (segments, indexes).
  • Coordinate with followers — track their replication progress.
  • Manage ISR list — add/remove followers based on sync status.
Follower Responsibilities
  • Continuously fetch (pull) data from the leader.
  • Replicate data to stay in sync.
  • Ready to become leader if current leader fails.
  • Do NOT serve any client (producer/consumer) requests directly.
  • Acknowledge successful replication back to leader (for acks=all).
Who Decides Leader/Follower? The Controller (a special broker) decides which broker hosts which partition replica, which replica is the leader, and which are followers. When a leader fails, the Controller elects a new leader from the ISR list.
Chapter 11

ISR (In-Sync Replicas)

ISR is the list of replicas (including the leader) that are fully caught up with the leader's log. This is cluster metadata maintained by the Controller.

In simple words: Imagine you're a teacher (leader) and you have 2 students (followers) copying your notes. ISR is the list of students who are up-to-date with your notes. If a student falls behind (didn't copy the last 5 pages), they're temporarily removed from the ISR until they catch up.

ISR Example — All In Sync
Partition 1 (Replication Factor = 3) Broker1 (Leader): [0][1][2][3][4][5][6][7][8][9] ← Latest offset: 9 Broker2 (Follower): [0][1][2][3][4][5][6][7][8][9] ← In-Sync ✓ Broker3 (Follower): [0][1][2][3][4][5][6][7][8][9] ← In-Sync ✓ ISR = {Broker1, Broker2, Broker3}
ISR Example — Broker 3 Lagging
Broker1 (Leader): [0][1][2][3][4][5][6][7][8][9] ← Latest offset: 9 Broker2 (Follower): [0][1][2][3][4][5][6][7][8][9] ← In-Sync ✓ Broker3 (Follower): [0][1][2][3][4][5][6] ← Out-of-Sync ✗ (lagging) ISR = {Broker1, Broker2} ← Broker3 temporarily REMOVED from ISR

How Does the Leader Decide a Follower is Out-of-Sync?

Why ISR Matters

When producer uses acks=all, the leader waits for ALL replicas in the ISR (not all replicas, just the in-sync ones) to acknowledge the write. This means:

min.insync.replicas This config sets the minimum number of replicas that must be in the ISR for a write (with acks=all) to succeed. If ISR count drops below this, the partition becomes UNAVAILABLE for writes. Typical production setting: min.insync.replicas=2 with replication factor 3.
Chapter 12

Controller

The Controller is a special broker responsible for managing all cluster metadata. It's the brain of the Kafka cluster.

In simple words: The controller is like the manager of the warehouse. Workers (brokers) store packages, but the manager decides which worker handles which shelf, who takes over when a worker is sick, and keeps track of everything. There's only one active manager at a time, but backup managers are ready to step in.

Controller Responsibilities

Topic Management

Creates/deletes topics. Decides how partitions are distributed across brokers when a new topic is created.

Partition Leader Election

Elects leader and follower replicas for every partition. Re-elects leaders when a broker fails.

Failure Detection

Monitors broker heartbeats. Detects when brokers go down and triggers recovery actions.

Metadata Distribution

Notifies all brokers about changes — new topics, leader changes, ISR updates. Brokers cache this metadata to serve client requests.

Single Point of Failure? Only 1 controller is active (leader) at any time. Others are standby (followers). If the active controller fails, a new one is elected using consensus. This is where KRaft comes in.

Controller can have dual roles

Chapter 13

KRaft (Kafka Raft Consensus)

KRaft is the built-in consensus mechanism that replaced ZooKeeper (deprecated in Kafka >= 3.x). It uses the Raft consensus algorithm to coordinate multiple controllers.

In simple words: Imagine 3 managers in a company. Only 1 can be the CEO at a time. When the current CEO leaves, the remaining managers vote to pick a new CEO. That voting process is "consensus." KRaft is Kafka's built-in voting system. Before KRaft, Kafka used an external voting system called ZooKeeper (like hiring an outside agency to run the election).

Why Do We Need Consensus?

KRaft vs ZooKeeper

ZooKeeper (Legacy)
  • External distributed system.
  • Deployed separately.
  • Monitored separately.
  • Maintained separately from Kafka.
  • Uses ZAB consensus protocol.
  • Additional operational overhead.
  • Deprecated in Kafka 3.x+.
KRaft (Modern)
  • Built-in to Kafka.
  • No external dependency.
  • Single system to deploy and monitor.
  • Uses Raft consensus protocol.
  • Less operational overhead.
  • Faster failover.
  • Default since Kafka 3.3+.

Quorum

Quorum means majority of nodes must agree before any decision is finalized.

Controller NodesQuorum (N/2 + 1)Can Tolerate Failures
321
532
743

KRaft Metadata Commit Flow

1
Request arrives

Producer or Admin CLI sends a request (e.g., create topic) to any broker, which forwards it to the active controller.

2
Active Controller processes

Creates the topic/partitions, decides leader/followers, creates a metadata record with the next offset (e.g., offset 100). Writes it to its local _cluster_metadata.log but marks it as NOT committed yet.

3
Initiates Quorum

Active controller sends the new metadata record to standby controllers via heartbeats.

4
Standby Controllers Write

Each standby appends the record to their local _cluster_metadata.log (not committed yet) and sends ACK back.

5
Majority ACK = Commit

Once a majority (quorum) of controllers have written successfully, the active controller marks the record as committed. Updates last committed offset to 100.

6
Propagate to Brokers

Active controller sends heartbeats with the last committed offset to all brokers. Brokers fetch and apply the new metadata.

7
Standby Controllers Commit

When standby controllers receive the next heartbeat with the committed offset, they also mark the record as committed in their local log.

Chapter 14

Consumer & Consumer Groups

A Consumer is an application that reads (pulls) events from Kafka. Kafka uses a PULL model — the consumer asks for data, Kafka never pushes.

In simple words: A consumer is like a newspaper reader. Kafka doesn't throw the newspaper at your door — YOU go and pick it up when you're ready. A Consumer Group is a team of readers sharing the workload. If there are 3 newspaper sections, 3 readers can each take 1 section and read in parallel. But two readers from the same team never read the same section.

Pull Model Consumer says: "Give me data from topic X, partition Y, starting from offset Z, max N bytes." This gives consumers full control over their read rate and position.

Consumer Groups

Every consumer belongs to a group.id. Within a group, work is divided. Across different groups, the same data is read independently.

Golden Rule Inside one consumer group: the same partition is NEVER read by multiple consumers. This guarantees that events in a partition are processed in order by exactly one consumer within the group.

Partition Assignment Scenarios

Consumers == Partitions (Ideal)

Topic: 3 partitions, Group: 3 consumers

C1 → P0
C2 → P1
C3 → P2

Perfect Balance

Consumers < Partitions

Topic: 6 partitions, Group: 3 consumers

C1 → P0, P3
C2 → P1, P4
C3 → P2, P5

Each consumer handles more load

Consumers > Partitions

Topic: 3 partitions, Group: 5 consumers

C1 → P0
C2 → P1
C3 → P2
C4 → IDLE
C5 → IDLE

Wasted consumers

Multiple Consumer Groups

The same partition CAN be read by multiple consumers that are in different groups. Each group independently tracks its own offset.

Same Partition, Multiple Groups
Topic: order-events, Partition 0 Consumer Group 1: notification-service └── Consumer 1 → reads from Partition 0 (offset: 105) Consumer Group 2: analytics-service └── Consumer 2 → reads from Partition 0 (offset: 60) Consumer Group 3: audit-service └── Consumer 3 → reads from Partition 0 (offset: 200) Each group has its own committed offset — completely independent.
Chapter 15

Offset Management

Each consumer group maintains its partition-wise committed offset independently. This tells Kafka "I've processed up to this point."

In simple words: An offset is like a bookmark in a book. It tells you "I've read up to page 105." If you put the book down and come back later, you start from page 106. Each reader (consumer group) has their own bookmark — so one reader might be on page 105 while another is on page 60.

Offset Tracking Structure

Consumer GroupTopicPartitionCommitted Offset
notificationorder-events0105
notificationorder-events198
notificationorder-events2210
analyticsorder-events060
analyticsorder-events145

Where Are Offsets Stored?

Offsets are stored in the __consumer_offsets internal topic (50 partitions by default). The partition is determined by:

partition = hash(group_id) % 50 // Example: hash("notification-service") % 50 = 23 // All offset updates for "notification-service" go to Partition 23 of __consumer_offsets // The broker that is LEADER of this partition becomes the Group Coordinator
Group Coordinator The broker that is leader of the relevant __consumer_offsets partition becomes the Group Coordinator for that consumer group. It handles: group membership, partition assignment, and offset commits. This is NOT cluster metadata — it's stored like normal topic data, NOT on controller nodes.
Chapter 16

Offset Commit Strategies

How and when the consumer tells Kafka "I've successfully processed up to this offset" — this determines your delivery guarantee.

Strategy 1: Auto-Commit (Risky)
enable.auto.commit=true auto.commit.interval.ms=5000
T=0s: Consumer polls, gets events 0-99
T=1s: Consumer processing events...
T=5s: Auto-commit triggers → commits offset 99
T=6s: Consumer CRASHES (only processed 50 events)
T=7s: Consumer restarts from offset 100

Events 51-99 are LOST. At-most-once delivery.

Strategy 2: Manual Commit (Safe)
enable.auto.commit=false // Consumer commits explicitly after processing
Step 1: Poll events 0-99
Step 2: Process ALL events
Step 3: Commit offset 99 (wait for ACK)
Step 4: Kafka confirms commit
Step 5: Continue to next batch

If crash before commit: events reprocessed (duplicates). At-least-once delivery.

Chapter 17

Complete Producer Write Flow

End-to-end flow from event creation to acknowledgement.

1
Producer creates event

{ topic: "order-events", key: "order-123", value: orderJson, acks: all }

2
Producer requests metadata (first time or periodically)

Connects to any broker (bootstrap server). Broker either has cached metadata or fetches from the active controller. Returns: which broker is leader for each partition of each topic.

3
Producer calculates partition

partition = hash("order-123") % 3 = 1

4
Looks up leader broker for Partition 1

From metadata: Partition 1 leader = Broker 2

5
Producer sends event directly to Broker 2

No intermediary. Direct TCP connection to the partition leader.

6
Broker 2 writes event to Partition 1 log

Appends to active segment file. Assigns next offset (e.g., offset 101). Writes to OS page cache (async flush to disk).

7
Followers replicate (since acks=all)

Follower brokers continuously poll the leader. They fetch the new event, write to their own partition logs, and ACK back to the leader.

8
All ISR acknowledged

Once all in-sync replicas confirm the write, the leader sends a success response back to the producer.

Chapter 18

Complete Consumer Read Flow

End-to-end flow from consumer startup to continuous event processing.

1
Consumer starts, wants to join group

group.id = "notification-service"

2
Finds the Group Coordinator partition

hash("notification-service-group-id") % 50 = 23 → Partition 23 of __consumer_offsets

3
Consumer requests metadata

Asks any broker: "Who is the leader of __consumer_offsets Partition 23?" Answer: Broker 3.

4
Invokes Broker 3 (Group Coordinator)

Sends JoinGroup request. Broker 3 manages membership, waits for all group members, then assigns partitions. Response: "You handle Partition 2 of order-events."

5
Group Coordinator replicates assignment

The assignment is written to __consumer_offsets Partition 23 and replicated to all followers. Internally acts like acks=all (no config option for this).

6
Consumer fetches last committed offset

Asks Group Coordinator: "What's my last offset for order-events Partition 2?" Coordinator looks up key "notificationGroupId_order-events_Partition-2" and returns offset 100.

7
Consumer fetches events from partition leader

Checks metadata: leader of order-events Partition 2 = Broker 2. Sends FetchRequest: "Give me events from offset 101, max 200 bytes." Broker 2 returns events 101-501.

8
Consumer processes events

Application logic runs on each event sequentially.

9
Consumer commits offset (manual batch)

Sends OffsetCommit to Group Coordinator: "I've processed up to offset 501 for order-events Partition 2." Coordinator writes to __consumer_offsets.

Continuous Polling Loop

Consumer repeats from step 7. This is the consumer's poll loop — continuously fetching, processing, and committing.

Chapter 19

Log Compaction & Retention

Kafka can't keep data forever (disk fills up). Two strategies control how old data is cleaned up.

In simple words: Think of your phone gallery. Delete policy = delete all photos older than 30 days. Compact policy = for each person, keep only the most recent photo and delete older ones. Both free up space, but in different ways.

Policy 1: Delete (Default)
cleanup.policy=delete retention.ms=604800000 // 7 days retention.bytes=1073741824 // 1 GB

Deletes entire old segment files based on:

  • Time-based: Segment created 8 days ago → DELETED. Segment created 6 days ago → KEPT.
  • Size-based: If total partition log exceeds retention.bytes, oldest segments are deleted until under the limit.

Deletion is async — does not block writes. Runs periodically in background.

Policy 2: Compact
cleanup.policy=compact

Keeps only the latest value for each key. Old duplicates are removed.

BeforeAfter Compaction
off:100 user1 → v1removed
off:101 user2 → v1removed
off:102 user1 → v2KEPT
off:103 user3 → v1KEPT
off:104 user2 → v2KEPT

Used for: state stores, CDC, config topics. Compaction is async, per-segment, does not block writes.

Chapter 20

Why Kafka is Fast (Despite Using Disk)

Kafka reads and writes to DISK. Disk I/O is slow, right? Then how is Kafka so fast? Two fundamental OS-level optimizations.

Page Cache (Write Optimization)
Producer Event │ ▼ Kafka Broker "writes to disk" │ ▼ OS Page Cache (RAM) ← instant return │ ▼ (async) Physical Disk ← no blocking wait
  • Kafka trusts the OS to cache log files efficiently.
  • Write goes to RAM (page cache), returns immediately.
  • OS flushes to disk asynchronously in the background.
  • Because Kafka is append-only, writes are always sequential.
  • No insert in middle, no random access, no updates — disk head keeps moving forward.
  • Sequential disk writes can be as fast as RAM in many cases.
Zero-Copy (Read Optimization)
Normal Read: Disk → Kernel → User Space → Kernel → Network (4 copies, 4 context switches) Kafka Zero-Copy (sendfile syscall): Disk → Kernel Page Cache → Network (0 copies to user space, 2 context switches)
  • Kafka uses sendfile() system call.
  • Data goes directly from disk/page cache to the network socket.
  • No copy to Kafka JVM heap. No serialization/deserialization.
  • Saves CPU cycles, memory, and time.
  • This is why Kafka can serve consumers at network speed, not application speed.

Additional Performance Factors

Batching

Producers batch multiple events into a single network request. Configurable via batch.size and linger.ms. Reduces network overhead dramatically.

Compression

Entire batches can be compressed (gzip, snappy, lz4, zstd). Reduces network bandwidth and disk usage. Decompressed only at the consumer.

Partitioned Parallelism

Multiple partitions allow multiple consumers to read in parallel. More partitions = more parallelism = more throughput. Each partition is an independent log.

Chapter 21

Edge Cases & Failure Scenarios

What actually happens when things go wrong in a Kafka cluster. These are common interview questions.

1. Active Controller Fails

Initial State: Controller1 = Leader, Controller2 = Follower, Controller3 = Follower
Controller1 crashes
Controller2 & 3: No heartbeat from leader detected.
Election starts: Controller2 requests votes from Controller3.
Controller3 votes for Controller2.
Controller2 wins with 2/3 majority (quorum met). Becomes new active controller.
Impact: New leader elected. Cluster continues. Brief metadata unavailability during election (typically milliseconds to seconds).

2. Leader Partition Broker Fails

Setup: Partition 0 (RF=3) — Leader: Broker1, Followers: Broker2, Broker3
Broker 1 (leader) crashes
Controller detects no heartbeat from Broker 1.
Controller triggers leader election. Candidates: Broker 2, Broker 3 (both in ISR).
New Leader: Broker 2 (selected from ISR). ISR = {Broker 2, Broker 3}.
Clients receive metadata update. Producers and consumers reconnect to Broker 2.
Impact: NO data loss (new leader has all committed data). Automatic recovery.

3. Follower Partition Broker Fails

Setup: Partition 0 (RF=3) — Leader: Broker1, Followers: Broker2, Broker3
Broker 3 crashes
Leader (Broker 1) detects: no timely fetch calls from Broker 3.
Controller removes Broker 3 from ISR. ISR: {B1, B2, B3} → {B1, B2}
System continues normally. Producers still write (acks=all waits for 2 replicas). Consumers still read. NO downtime.
Later: Broker 3 recovers, catches up with leader, added back to ISR. {B1, B2} → {B1, B2, B3}
Impact: NO data loss. NO downtime.

4. What if a "Topic Fails"?

Interview Answer A topic never fails. A topic is a logical category/folder. It's the partitions that fail (and they have replicas). The topic remains as long as at least one replica of each partition is alive.

5. Consumer Fails Before Committing Offset

Consumer polls events (offsets 100-199)
Consumer processes events 100-150
Consumer CRASHES (before committing offset)
Consumer restarts. Asks Kafka: "What's my last committed offset?" Kafka: "99."
Consumer reprocesses events 100-150 (DUPLICATES)
Result: At-least-once delivery. No data loss but duplicates possible. Make processing idempotent to handle this.

6. Consumer Processing Takes Too Long

Consumer polls events
Consumer starts processing (takes 6 minutes)
max.poll.interval.ms (5 min) exceeded. Group Coordinator thinks consumer is DEAD (no heartbeat during busy processing).
Rebalance triggered. Partitions reassigned to other consumers.
Original consumer finishes processing, tries to commit offset.
Group Coordinator REJECTS commit — consumer is no longer in the group.
Result: Wasted processing.

Solutions

7. Entire Broker Fails

Setup: 3 brokers, Topic: order-events (6 partitions, RF=3)
Broker 1 crashes
Controller detects failure (no heartbeat).
For each partition where Broker 1 was LEADER: new leader elected from ISR, metadata updated.
For each partition where Broker 1 was FOLLOWER: removed from ISR, partition continues with remaining replicas.
Clients receive metadata update. Reconnect to new leaders.
Impact: NO data loss. Automatic recovery.

8. What if ISR List Becomes Empty?

min.insync.replicas Kafka maintains min.insync.replicas as a safety net. If the ISR count drops below this threshold, the partition becomes UNAVAILABLE for writes (producers get NotEnoughReplicasException). This prevents data loss by refusing to accept writes that can't be sufficiently replicated. Reads may still work from the remaining leader.
Chapter 22

EDA Challenges & When to Use Kafka

Challenges in Event-Driven Architecture

Eventually Consistent

Data is not instantly synchronized across services. After an event is published, there's a delay before all consumers process it. Not suitable when strict real-time consistency is required.

Duplicate Events

At-least-once delivery means events can be processed more than once. Consumers must be idempotent — processing the same event twice should produce the same result.

Ordering Problem

Across partitions, ordering is not guaranteed. The 2nd event can be processed before the 1st if they're in different partitions. Use key-based partitioning for related events.

Schema Evolution

If the producer changes the event schema (adds/removes fields), all consumers may break. Use a Schema Registry (Avro, Protobuf) with backward/forward compatibility rules.

Debugging Complexity

No single request-response trace. Events flow through multiple services asynchronously. Requires distributed tracing (correlation IDs, OpenTelemetry) for end-to-end visibility.

Poison Messages

One bad message can block an entire partition's consumer. If processing throws an unhandled exception, the consumer retries infinitely. Solution: Dead Letter Queue (DLQ) for failed events.

Operational Overhead

Must monitor: consumer lag, throughput, partition balance, broker health, disk usage. More infrastructure to manage compared to simple REST calls.

Exactly-Once is Hard

Achieving exactly-once processing requires idempotent producers + transactions + consumer-side deduplication. Kafka supports it but adds complexity and latency overhead.

When to Use Kafka

Long-running workflows

Processes that take minutes/hours. Decouple the trigger from the execution.

Eventual consistency is acceptable

When millisecond consistency isn't required between services.

Real-time analytics & monitoring

Stream processing, dashboards, alerting, metrics aggregation.

Audit logs & event sourcing

Immutable log of everything that happened. Replay for debugging or rebuilding state.

Decoupled microservices

Services communicate without knowing about each other. Independent deployment and scaling.

High-throughput data pipelines

ETL, CDC (Change Data Capture), log aggregation, clickstream processing.

Chapter 23

Idempotent Producer

What happens when a producer sends an event but the network fails before it gets the ACK? The producer retries — and now Kafka has the same event twice. Idempotent producer solves this.

The Problem — In Simple Words

Imagine you're ordering food online. You click "Place Order" but the page freezes. You're not sure if the order went through, so you click again. Now the restaurant has 2 orders for the same meal. That's exactly what happens with Kafka producers without idempotency.

Duplicate Problem Without Idempotency
Producer Broker (Leader) │ │ │── Send Event "order-123" ──▶ │ ← Broker writes it (offset 50) │ │ │ ✗ ACK lost (network) ✗ │ │ │ │── RETRY same event ────────▶ │ ← Broker writes AGAIN (offset 51) │ │ │◀──── ACK (offset 51) ────────│ │ │ Result: Same event stored TWICE at offset 50 and 51!

The Solution — Idempotent Producer

Turn on one config and Kafka handles it for you. Each producer gets a unique ID, and every event gets a sequence number. If the broker sees the same sequence number twice, it ignores the duplicate.

// Just one config change: enable.idempotence=true // What happens behind the scenes: // 1. Kafka assigns this producer a unique Producer ID (PID) // 2. Each event gets a Sequence Number (per partition) // 3. Broker tracks: PID + Partition + Sequence Number // 4. If it sees the same combo again → silently ignores (no duplicate)
With Idempotent Producer
Producer (PID=5) Broker (Leader) │ │ │── Event (PID=5, Seq=0) ────▶ │ ← Writes it (offset 50) ✓ │ │ │ ✗ ACK lost ✗ │ │ │ │── RETRY (PID=5, Seq=0) ───▶ │ ← Sees PID=5 Seq=0 already exists │ │ SKIPS the write, returns ACK │◀──── ACK (offset 50) ────────│ │ │ Result: Event stored ONCE. No duplicates!
Simple Rule Idempotent producer = no duplicates within a single partition. It does NOT prevent duplicates across partitions or across different producer instances. For that, you need Transactions (next chapter).

What Configs Change Automatically?

ConfigValue When Idempotence = trueWhy
acksallMust confirm write on all ISR replicas
retriesInteger.MAX_VALUEKeep retrying until success
max.in.flight.requests.per.connection5 (max)Allows up to 5 unacknowledged requests but maintains order using sequence numbers
Chapter 24

Transactions & Exactly-Once Semantics (EOS)

Idempotent producer prevents duplicates within one partition. But what if you need to write to multiple partitions or topics as ONE atomic operation? That's where Transactions come in.

In Simple Words

Think of a bank transfer: debit from Account A and credit to Account B. Both must happen together — either BOTH succeed or BOTH fail. You can't debit A and then crash before crediting B. Kafka Transactions work the same way — all writes in a transaction either ALL go through or NONE of them do.

When Do You Need Transactions?

1
Read-Process-Write Pattern

Read from Topic A, process the data, write to Topic B, and commit the offset for Topic A — all as ONE atomic operation. If any step fails, everything rolls back.

2
Multi-Partition Writes

Writing events to 3 different partitions at once. Either all 3 partitions get the event, or none of them do. No partial writes.

3
Multi-Topic Writes

Writing an order event to "orders" topic AND a payment event to "payments" topic. Both must succeed together.

How It Works (Step by Step)

// Producer config enable.idempotence=true transactional.id="order-processor-1" // unique ID for this transactional producer // Code flow: producer.initTransactions(); // one-time setup producer.beginTransaction(); // START transaction producer.send(record1); // write to topic-A partition-0 producer.send(record2); // write to topic-B partition-1 producer.sendOffsetsToTransaction(...) // commit consumer offsets too producer.commitTransaction(); // COMMIT — all or nothing // If anything fails: producer.abortTransaction(); // ROLLBACK everything
Transaction Flow
Producer Transaction Coordinator Brokers │ │ │ │── initTransactions() ────▶ │ │ │◀── PID assigned ──────────│ │ │ │ │ │── beginTransaction() ────▶ │ │ │ │ │ │── send(record1) ────────────────────────────────────▶ │ P0 │── send(record2) ────────────────────────────────────▶ │ P1 │ │ │ │── commitTransaction() ──▶ │── write COMMIT marker ─▶ │ │ │ │ │◀── success ────────────────│ │ Consumer (isolation.level=read_committed): Only sees record1 and record2 AFTER the COMMIT marker is written. If aborted, consumer never sees them.

Consumer Side — Isolation Levels

read_uncommitted (Default)
  • Consumer sees ALL events, even from incomplete transactions.
  • Faster but may read data that later gets rolled back.
  • Use when you don't care about transactional guarantees.
read_committed
  • Consumer only sees events from committed transactions.
  • Slightly slower (waits for commit markers).
  • Use when you need exactly-once guarantees.
Exactly-Once = Idempotent Producer + Transactions + read_committed Consumer All three pieces together give you exactly-once semantics. The producer won't create duplicates (idempotent), writes are atomic (transactions), and the consumer only reads committed data (read_committed). This is the strongest guarantee Kafka offers.
Chapter 25

Consumer Rebalancing

When consumers join or leave a group, Kafka redistributes partitions among the remaining consumers. This process is called rebalancing.

In Simple Words

Imagine 3 workers splitting 6 tasks equally (2 each). If one worker goes home sick, the remaining 2 workers must split all 6 tasks (3 each). If a new worker joins, tasks are redistributed again. That's rebalancing.

What Triggers a Rebalance?

+
New Consumer Joins

A new consumer starts with the same group.id. Partitions are redistributed to include the new member.

-
Consumer Leaves or Crashes

A consumer calls close() or stops sending heartbeats. Its partitions must be given to someone else.

T
Topic Changes

New partitions are added to a subscribed topic, or the consumer subscribes to a new topic.

Rebalancing Process (Step by Step)

1
Trigger detected

Group Coordinator detects a change (new member, missed heartbeat, etc.).

2
All consumers revoke current partitions

Every consumer in the group stops reading. They commit their current offsets and release their partitions.

3
JoinGroup phase

All consumers send a JoinGroup request to the Group Coordinator. The first consumer to join becomes the Group Leader (not the same as partition leader).

4
Group Leader assigns partitions

The Group Leader runs the partition assignment strategy (Range, RoundRobin, or Sticky) and sends the new assignment back to the Coordinator.

5
SyncGroup phase

Coordinator distributes the new partition assignment to all consumers. Each consumer starts reading from their newly assigned partitions.

Assignment Strategies

StrategyHow It WorksBest For
RangeDivides partitions of each topic into contiguous ranges per consumer. Consumer 1 gets P0-P1, Consumer 2 gets P2-P3.When you need co-partitioning (same consumer reads same partition number from different topics).
RoundRobinDistributes all partitions one-by-one across consumers. P0 → C1, P1 → C2, P2 → C3, P3 → C1...When you want even distribution across consumers.
StickyLike RoundRobin but tries to keep previous assignments. Only moves partitions that must move.Reducing rebalance disruption. Recommended for most cases.
CooperativeStickyIncremental rebalancing — only revokes partitions that need to move, others keep reading.Minimizing downtime during rebalancing. Best choice.
Rebalancing is Expensive! During a rebalance, all consumers in the group stop reading (with Eager protocol). This causes a processing pause. For a group with 100 consumers and 500 partitions, a rebalance can take seconds to minutes. Use CooperativeSticky assignor and tune session.timeout.ms and heartbeat.interval.ms to minimize unnecessary rebalances.
Chapter 26

Dead Letter Queue (DLQ)

What happens when a consumer can't process an event? A bad event (poison message) can block the entire partition. DLQ is the solution.

In Simple Words

Imagine a post office. Most letters are delivered fine. But some letters have wrong addresses. Instead of throwing them away or blocking all other mail, the post office puts them in a separate "undeliverable" pile. Someone later checks that pile and fixes the addresses. A Dead Letter Queue is that "undeliverable" pile for Kafka events.

The Problem

Without DLQ — One Bad Event Blocks Everything
Partition 0: [E1] [E2] [E3-BAD] [E4] [E5] [E6] ▲ Consumer reads E3 → Processing FAILS → Retry → FAILS → Retry... E4, E5, E6 are STUCK waiting. Consumer is blocked forever.

The Solution — DLQ Pattern

With DLQ — Bad Events Move Aside
Main Topic: [E1] [E2] [E3-BAD] [E4] [E5] [E6] │ Consumer reads E3 → FAILS │ Retry 1 → FAILS │ Retry 2 → FAILS │ Max retries reached → Move E3 ──┘ ▼ DLQ Topic: [E3-BAD] ← stored with error details Consumer moves on → E4 → E5 → E6 ← no longer blocked!

How to Implement DLQ

// Pseudo-code for DLQ pattern: for each event in poll(): retries = 0 while retries < MAX_RETRIES: try: process(event) break // success, move to next event catch Exception: retries += 1 if retries == MAX_RETRIES: // Send failed event to DLQ topic with error info producer.send( topic: "order-events.DLQ", key: event.key, value: event.value, headers: { "error": exception.message, "original-topic": "order-events", "original-partition": event.partition, "original-offset": event.offset, "retry-count": MAX_RETRIES } ) commit_offset()
DLQ Best Practices 1. Name your DLQ topic clearly: {original-topic}.DLQ.   2. Include the original topic, partition, offset, and error reason in the DLQ event headers.   3. Set up monitoring/alerts when events land in the DLQ.   4. Build a tool or process to review DLQ events, fix them, and replay them back to the original topic.
Chapter 27

Schema Registry

Kafka stores events as raw bytes. It doesn't know or care what's inside the event. But producers and consumers need to agree on the format. Schema Registry solves this.

In Simple Words

Imagine two people writing letters to each other. If person A starts writing in English and person B expects Hindi, they can't understand each other. Schema Registry is like a shared dictionary that both people agree to use. If person A wants to add a new word, the dictionary checks if person B can still understand the letter.

The Problem Without Schema Registry

Schema Mismatch Breaks Consumers
Producer (v1): { "orderId": "123", "amount": 500 } │ Kafka Topic │ Consumer expects: { "orderId": "123", "amount": 500 } ← Works fine ✓ --- Later, producer changes schema --- Producer (v2): { "order_id": "123", "total": 500, "currency": "INR" } │ Kafka Topic │ Consumer expects: { "orderId": "123", "amount": 500 } ← CRASHES! ✗ "orderId" is now "order_id", "amount" is now "total"

How Schema Registry Works

1
Register the schema

When a producer starts, it registers the event schema (Avro, Protobuf, or JSON Schema) with the Schema Registry. The registry assigns a Schema ID (e.g., ID=1).

2
Producer sends data with Schema ID

Instead of sending raw JSON, the producer sends: [Magic Byte][Schema ID=1][Serialized Data]. The data is compact (Avro binary, not JSON text).

3
Consumer reads and looks up schema

Consumer reads the Schema ID from the event, fetches the schema from the Registry, and deserializes the data correctly. Schema is cached locally after the first fetch.

4
Schema evolution with compatibility checks

When the producer wants to change the schema, the Registry checks if the new version is compatible with older versions before allowing it.

Compatibility Modes

ModeWhat You Can DoSimple Explanation
BACKWARDDelete fields, add fields with defaultsNew consumers can read old events. "I can understand old letters."
FORWARDAdd fields, delete fields with defaultsOld consumers can read new events. "Old readers can understand new letters."
FULLAdd/delete fields only with defaultsBoth old and new consumers can read both old and new events. Safest.
NONEAnything goesNo checks. Dangerous. Can break consumers at any time.
Serialization Formats Avro is the most popular choice with Kafka. It's compact (binary), has a built-in schema, and works great with Schema Registry. Protobuf is also good, especially if you already use gRPC. JSON Schema is human-readable but less efficient. Avoid plain JSON in production — it's wasteful (field names repeated in every event) and has no schema enforcement.
Chapter 28

Kafka Connect

A framework for moving data between Kafka and other systems without writing any code. Just configure and run.

In Simple Words

You want to move data from your MySQL database into Kafka. Or from Kafka into Elasticsearch. You could write a custom producer/consumer for each system. But that's a lot of repeated work. Kafka Connect gives you ready-made "connectors" — like USB adapters that plug different systems into Kafka.

Two Types of Connectors

Source Connector (Data INTO Kafka)
MySQL ──────┐ PostgreSQL ─┤ MongoDB ────┤── Source ──▶ Kafka Topics S3 Files ───┤ Connectors REST API ───┘
  • Reads data from external systems.
  • Converts it into Kafka events.
  • Publishes to Kafka topics.
  • Example: Every new row in MySQL automatically becomes a Kafka event.
Sink Connector (Data OUT OF Kafka)
┌── Elasticsearch ├── HDFS / S3 Kafka Topics ──▶ ├── PostgreSQL Sink Connectors ├── Redis └── Snowflake
  • Reads events from Kafka topics.
  • Writes data to external systems.
  • Example: Every Kafka event automatically gets indexed in Elasticsearch.

Popular Connectors

ConnectorTypeWhat It Does
JDBC SourceSourceReads rows from any SQL database (MySQL, PostgreSQL, Oracle) into Kafka
DebeziumSourceCDC (Change Data Capture) — captures every INSERT/UPDATE/DELETE from databases in real-time
S3 SinkSinkWrites Kafka events to Amazon S3 as files (Parquet, JSON, Avro)
Elasticsearch SinkSinkIndexes Kafka events into Elasticsearch for search
HDFS SinkSinkWrites Kafka events to Hadoop HDFS for batch analytics
MongoDB Source/SinkBothReads from and writes to MongoDB collections
Key Advantage Kafka Connect handles fault tolerance, offset tracking, scaling, and retries for you. You just write a JSON config file. No custom producer/consumer code needed. It also supports distributed mode where multiple Connect workers share the load.
Chapter 29

Kafka Streams

A Java library for processing data in real-time directly from Kafka topics. No separate cluster needed — it runs inside your application.

In Simple Words

Normally, you read from Kafka, process data in your app, and write results back to Kafka. Kafka Streams makes this easier by giving you a powerful API to filter, transform, join, and aggregate events — all in real-time, as events flow through. Think of it like Excel formulas but for streaming data.

Why Not Just Use a Consumer?

Plain Consumer
  • You write all processing logic yourself.
  • You handle state management yourself.
  • You handle fault tolerance yourself.
  • You handle time windowing yourself.
  • Joining two topics? Build it from scratch.
Kafka Streams
  • Built-in operators: filter, map, join, aggregate.
  • Automatic state management (RocksDB).
  • Automatic fault tolerance and recovery.
  • Built-in windowing (tumbling, hopping, sliding).
  • Joining topics is one line of code.

Common Operations

Filter

Keep only events that match a condition. Example: only keep orders above $100.

orders.filter((key, order) -> order.amount > 100)
Map / Transform

Change the event shape. Example: extract just the customer ID and amount.

orders.mapValues(order -> new Summary(order.customerId, order.amount))
Group & Aggregate

Group events by key and compute running totals. Example: total spending per customer.

orders.groupByKey() .reduce((a, b) -> a.amount + b.amount)
Windowed Aggregation

Aggregate events within a time window. Example: count orders per 5-minute window.

orders.groupByKey() .windowedBy(TimeWindows .ofSizeWithNoGrace( Duration.ofMinutes(5))) .count()

Kafka Streams vs Other Stream Processors

FeatureKafka StreamsApache FlinkApache Spark Streaming
DeploymentLibrary (runs in your app)Separate clusterSeparate cluster
ComplexitySimple — just add a JARMedium — needs cluster setupMedium — needs cluster setup
ScalingAdd more app instancesAdd more TaskManagersAdd more executors
Best ForKafka-to-Kafka processingComplex event processing, multiple sourcesBatch + streaming hybrid
LanguageJava / Kotlin onlyJava, Python, SQLJava, Python, Scala, SQL
Chapter 30

Kafka Security

By default, Kafka has no security at all. Anyone who can reach the broker can read/write any topic. In production, you need authentication, authorization, and encryption.

In Simple Words

Think of Kafka as a building. Without security: no doors, no locks, no ID checks. Anyone can walk in, read any document, and write on any whiteboard. Security adds: a front door with a lock (encryption), ID cards for entry (authentication), and rules about who can access which room (authorization).

Three Layers of Security

E
Encryption (SSL/TLS)

What: Encrypts data in transit between clients and brokers, and between brokers.

Simple: Like sending a letter in a sealed envelope instead of a postcard. Nobody can read the data while it's moving over the network.

Port 9093 (default for SSL)

A
Authentication (SASL)

What: Verifies WHO is connecting. Clients must prove their identity.

Options:

  • SASL/PLAIN — Username + password (simple, use with SSL).
  • SASL/SCRAM — Salted password (more secure than PLAIN).
  • SASL/GSSAPI — Kerberos (enterprise, Active Directory).
  • SASL/OAUTHBEARER — OAuth 2.0 tokens.
Z
Authorization (ACLs)

What: Controls WHO can do WHAT on WHICH resource.

Simple: Even after proving your identity, you can only access what you're allowed to. The payment service can read "payments" topic but not "user-profiles" topic.

// Allow user "order-svc" to write to "orders" topic kafka-acls --add --allow-principal User:order-svc \ --operation Write --topic orders // Allow user "analytics" to read from "orders" topic kafka-acls --add --allow-principal User:analytics \ --operation Read --topic orders --group analytics-group
Production Checklist 1. Enable SSL/TLS for all client-broker and broker-broker communication.   2. Enable SASL authentication (SCRAM-SHA-256 or SCRAM-SHA-512 recommended).   3. Set up ACLs to restrict topic access per service.   4. Disable anonymous access.   5. Use separate credentials for each service (not one shared password for everything).
Chapter 31

Kafka vs Other Message Systems

Kafka is not the only messaging system. Here's how it compares to other popular choices and when to pick which one.

Comparison Table

FeatureApache KafkaRabbitMQAWS SQSAWS SNS
ModelPull-based logPush-based queuePull-based queuePush-based pub/sub
Message RetentionConfigurable (days/forever)Until consumedUp to 14 daysNo retention
ReplayYes (any offset)NoNoNo
OrderingPer partitionPer queue (with limits)FIFO queues onlyNo ordering
ThroughputMillions/secTens of thousands/secNearly unlimited (managed)Nearly unlimited (managed)
Consumer GroupsBuilt-inManual setupOne consumer per messageFan-out to subscribers
Operational CostHigh (self-managed) / Medium (managed)MediumLow (fully managed)Low (fully managed)
Best ForEvent streaming, high throughput, replay neededTask queues, routing logic, request-replySimple job queues, decouplingFan-out notifications

When to Pick What?

Pick Kafka When
  • You need to replay old events (debugging, new consumers).
  • You need very high throughput (millions of events/sec).
  • Multiple services need to read the same data independently.
  • You need event sourcing or an immutable audit log.
  • You need stream processing (real-time transformations).
Pick RabbitMQ When
  • You need complex routing (topic exchanges, header-based routing).
  • You need request-reply pattern (RPC over messages).
  • You need message priority (urgent messages first).
  • Your throughput is moderate (thousands/sec, not millions).
  • You want simpler setup for basic task queues.
Pick SQS/SNS When
  • You're on AWS and want fully managed (zero ops).
  • You need a simple job queue (no streaming features needed).
  • You want fan-out notifications (SNS → email, SMS, Lambda).
  • You don't need replay, ordering, or consumer groups.
  • You want pay-per-use pricing with no cluster management.
Chapter 32

Real-World Use Cases

How big companies actually use Apache Kafka in production. These examples help you understand where Kafka fits in real systems.

1
E-Commerce Order Flow

Problem: When a customer places an order, many things must happen: payment processing, inventory update, shipping notification, email confirmation, analytics tracking. These should not be tightly coupled.

User places order │ Order Service ──▶ "order-events" topic │ ┌───────────────┼───────────────┐ ▼ ▼ ▼ Payment Service Inventory Service Email Service (Consumer Grp 1) (Consumer Grp 2) (Consumer Grp 3)

Each service processes the same order event independently. If the email service is slow, it doesn't affect payments.

2
Real-Time Analytics / Dashboards

Problem: You want a live dashboard showing website activity: page views, clicks, sign-ups — all updated in real-time.

Website events (clicks, views) → Kafka topic → Kafka Streams aggregates (counts per minute) → Results written to another Kafka topic → Dashboard reads from that topic.

Kafka Streams Real-time

3
Change Data Capture (CDC)

Problem: Your main database is MySQL. You want every change (INSERT/UPDATE/DELETE) to be available for search in Elasticsearch and analytics in Snowflake.

MySQL → Debezium (Kafka Connect Source) → Kafka topics → Elasticsearch Sink Connector → Elasticsearch. No custom code needed.

Kafka Connect Debezium

4
Log Aggregation

Problem: You have 500 servers generating logs. You need all logs in one place for monitoring and debugging.

Each server sends logs to a "logs" Kafka topic. A central service reads from the topic and stores them in Elasticsearch/Loki/S3. Kafka acts as a buffer so log spikes don't crash your storage system.

High Throughput Buffering

5
Fraud Detection

Problem: You need to detect suspicious transactions within seconds, not hours.

Transaction events → Kafka topic → Kafka Streams / Flink compares each transaction against rules (unusual amount, unknown location, rapid frequency) → Suspicious events written to "fraud-alerts" topic → Alert service blocks the transaction.

Low Latency Stream Processing

6
Microservices Communication

Problem: 20+ microservices need to share data without creating a tangled web of REST calls.

Each service publishes events about its domain (UserCreated, OrderPlaced, PaymentCompleted). Other services subscribe only to the events they care about. Services are fully decoupled — you can add, remove, or update services without affecting others.

Loose Coupling Scalable

Companies Using Kafka

CompanyHow They Use KafkaScale
LinkedInCreated Kafka. Uses it for activity tracking, metrics, data pipelines.7+ trillion messages/day
NetflixReal-time monitoring, event sourcing, data pipeline between microservices.700+ billion messages/day
UberReal-time pricing, trip matching, driver tracking, analytics.Trillions of messages/day
SpotifyEvent delivery, logging, real-time recommendations.Hundreds of billions/day
WalmartInventory management, order processing, real-time supply chain.Billions of messages/day
Key Takeaway Kafka is not just for "big companies." If your system produces events and multiple services need to react to them independently — Kafka is a great fit. Start small with 3 brokers and scale as needed. Managed services like Confluent Cloud, AWS MSK, and Redpanda Cloud make it even easier to get started.
Chapter 33

Setup: Running Kafka (Docker & Cloud)

Before writing code, you need a running Kafka cluster. You have two choices: run it locally with Docker, or use a managed cloud service.

Option 1: Cloud Kafka (Zero Setup)

If you don't want to install anything, use a managed Kafka service. They give you a Kafka cluster in the cloud — you just get a connection URL and start coding.

C
Confluent Cloud

Made by the creators of Kafka. Most popular managed Kafka. Free tier available (no credit card needed).

Free Tier Best Kafka Support

Link: confluent.cloud

A
AWS MSK (Managed Streaming for Kafka)

Amazon's managed Kafka. Good if you're already on AWS. Supports MSK Serverless (pay per use).

AWS Ecosystem Serverless Option

Link: aws.amazon.com/msk

U
Upstash Kafka

Serverless Kafka with a REST API. Generous free tier (10,000 messages/day). Perfect for learning and small projects.

Free Tier REST API

Link: upstash.com/kafka

R
Redpanda Cloud

Kafka-compatible but faster. Drop-in replacement — same Kafka client code works. Free tier available.

Free Tier Fastest

Link: redpanda.com/cloud

Option 2: Docker (Local Setup — Recommended for Learning)

This runs a full Kafka cluster on your machine. You need Docker Desktop installed.

Step 1: Create docker-compose.yml

Create a file called docker-compose.yml in your project root:

# docker-compose.yml # This file tells Docker to start Kafka and its dependencies version: "3.8" services: # Kafka broker using KRaft mode (no ZooKeeper needed!) kafka: image: apache/kafka:3.7.0 # Official Apache Kafka Docker image container_name: kafka ports: - "9092:9092" # Port your app connects to environment: # ---------- KRaft Mode (no ZooKeeper) ---------- KAFKA_NODE_ID: 1 # Unique ID for this broker KAFKA_PROCESS_ROLES: broker,controller # This node is BOTH broker + controller KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093 # List of controller voters # ---------- Listeners ---------- # WHY: Kafka needs separate listeners for inside-Docker and outside-Docker traffic KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT # ---------- Topic Defaults ---------- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # 1 because single broker KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" # We'll create topics manually KAFKA_LOG_RETENTION_HOURS: 168 # Keep data for 7 days # Kafka UI — visual dashboard to see topics, messages, consumers kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui ports: - "8080:8080" # Open localhost:8080 to see the UI environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 depends_on: - kafka
Why These Configs? KAFKA_PROCESS_ROLES: broker,controller — In production, you'd separate these. For local dev, one node does both.   KAFKA_LISTENERS vs KAFKA_ADVERTISED_LISTENERS — Listeners define what Kafka listens on. Advertised listeners tell clients how to reach it. Since Docker networking is different from your host machine, we need both.   KAFKA_AUTO_CREATE_TOPICS_ENABLE: false — We disable auto-create so you learn to create topics explicitly. In production, you should always create topics manually with proper configs.

Step 2: Start Kafka

# Start everything in the background docker compose up -d # Check if containers are running docker compose ps # You should see: # kafka running 0.0.0.0:9092->9092/tcp # kafka-ui running 0.0.0.0:8080->8080/tcp # Open Kafka UI in browser: # http://localhost:8080

Step 3: Verify Kafka is Working

# Create a test topic using the Kafka CLI inside the container docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \ --create \ --topic test-topic \ --partitions 3 \ --replication-factor 1 # List all topics docker exec -it kafka /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \ --list # Output: test-topic
You're Ready! Kafka is running on localhost:9092. Kafka UI is on localhost:8080. Now let's write some code.
Chapter 34

Project Setup (Node.js + TypeScript)

Setting up a TypeScript project with the KafkaJS library — the most popular Kafka client for Node.js.

Step 1: Initialize the Project

# Create project folder mkdir kafka-nodejs-demo && cd kafka-nodejs-demo # Initialize Node.js project npm init -y # Install dependencies npm install kafkajs # Kafka client for Node.js npm install -D typescript tsx @types/node # TypeScript + runner # Initialize TypeScript config npx tsc --init
Why KafkaJS? kafkajs is a pure JavaScript implementation — no native C/C++ dependencies (unlike node-rdkafka). This means no build issues, works everywhere, and is easier to install. It supports all Kafka features: producers, consumers, transactions, admin API, and more.

Update tsconfig.json

{ "compilerOptions": { "target": "ES2020", // Modern JS features like async/await "module": "commonjs", // Node.js module system "strict": true, // Enable all strict type checks "esModuleInterop": true, // Allows default imports from CommonJS "outDir": "./dist", // Compiled JS output folder "rootDir": "./src" // Source TypeScript files }, "include": ["src/**/*"] }

Step 2: Create the Kafka Client (Shared Config)

Create src/client.ts — This file creates a reusable Kafka connection that all other files will import.

// src/client.ts // This is the "entry point" — it creates one Kafka instance that // producer, consumer, and admin will all share. import { Kafka, logLevel } from "kafkajs"; // ── Create the Kafka instance ──────────────────────────────────── const kafka = new Kafka({ // clientId: A human-readable name for this application. // WHY: Kafka uses this to identify your app in logs and monitoring. // Shows up in broker logs as "order-service connected". // OTHER OPTIONS: Any string. Use your service name. clientId: "order-service", // brokers: List of Kafka broker addresses to connect to. // WHY: The client connects to ANY of these to discover the full cluster. // Called "bootstrap servers" — you don't need to list ALL brokers, // just enough to find at least one alive. // OTHER OPTIONS: ["broker1:9092", "broker2:9092", "broker3:9092"] brokers: ["localhost:9092"], // logLevel: Controls how much KafkaJS prints to console. // WHY: WARN is a good default — shows errors and warnings, not every request. // OTHER OPTIONS: // logLevel.NOTHING — silence everything // logLevel.ERROR — only errors // logLevel.WARN — errors + warnings // logLevel.INFO — errors + warnings + info // logLevel.DEBUG — everything (very verbose) logLevel: logLevel.WARN, // ── Connection options (optional) ── // connectionTimeout: 3000, // ms to wait for initial connection (default: 1000) // requestTimeout: 30000, // ms to wait for a response (default: 30000) // retry: { // retries: 5, // how many times to retry failed requests // initialRetryTime: 300, // ms before first retry // maxRetryTime: 30000, // max ms between retries // }, // ── SSL (for cloud Kafka like Confluent/AWS MSK) ── // ssl: true, // sasl: { // mechanism: "plain", // or "scram-sha-256", "scram-sha-512" // username: "your-api-key", // password: "your-api-secret", // }, }); export default kafka;

Project Structure (What We'll Build)

File Structure
kafka-nodejs-demo/ ├── docker-compose.yml ← Kafka + UI containers ├── package.json ├── tsconfig.json └── src/ ├── client.ts ← Shared Kafka connection (Step 2 above) ├── admin.ts ← Create/delete topics ├── producer.ts ← Send events to Kafka ├── consumer.ts ← Read events from Kafka └── advanced/ ├── producer-batch.ts ← Batch sending + idempotent producer ├── consumer-dlq.ts ← Consumer with Dead Letter Queue └── transaction.ts ← Exactly-once transactions

Add Run Scripts to package.json

// Add to your package.json "scripts" section: { "scripts": { "admin": "npx tsx src/admin.ts", "producer": "npx tsx src/producer.ts", "consumer": "npx tsx src/consumer.ts", "batch": "npx tsx src/advanced/producer-batch.ts", "dlq": "npx tsx src/advanced/consumer-dlq.ts", "txn": "npx tsx src/advanced/transaction.ts" } }
Chapter 35

Admin: Create & Manage Topics

Before producing or consuming, you need to create topics. The Admin API lets you create, delete, and inspect topics programmatically.

// src/admin.ts // This script creates the topics our application needs. // Run once before starting producers/consumers. import kafka from "./client"; async function main() { // ── Create an Admin instance ────────────────────────────────── // WHY: Admin API is separate from producer/consumer because // topic management is an administrative task, not a data task. const admin = kafka.admin(); // ── Connect to the Kafka cluster ───────────────────────────── // WHY: Every Kafka operation requires an active connection first. // This establishes a TCP connection to one of the brokers. console.log("Connecting to Kafka..."); await admin.connect(); console.log("Connected!"); // ── Create topics ──────────────────────────────────────────── const topicsCreated = await admin.createTopics({ // waitForLeaders: Wait until leader is elected for each partition. // WHY: If you start producing immediately after creating a topic, // it might fail because partitions don't have leaders yet. // Setting this to true ensures the topic is fully ready. // OTHER OPTIONS: false (return immediately, don't wait) waitForLeaders: true, // topics: Array of topic configurations to create. topics: [ { topic: "order-events", // numPartitions: How many partitions this topic will have. // WHY: More partitions = more parallelism = more consumers can // read simultaneously. But too many partitions adds overhead. // RULE OF THUMB: Start with partitions = expected consumer count. // OTHER OPTIONS: Any positive integer. Common: 3, 6, 12, 24. numPartitions: 3, // replicationFactor: How many copies of each partition. // WHY: 1 = no redundancy (fine for local dev). // 3 = production standard (survives 2 broker failures). // RULE: Must be <= number of brokers in cluster. // OTHER OPTIONS: 1 (dev), 2 (staging), 3 (production) replicationFactor: 1, // configEntries: Override default topic-level configs. // WHY: Each topic can have different retention, cleanup, etc. configEntries: [ // How long to keep events before auto-deleting. // 604800000 ms = 7 days. Set to "-1" for infinite retention. { name: "retention.ms", value: "604800000" }, // cleanup.policy: "delete" (remove old) or "compact" (keep latest per key) { name: "cleanup.policy", value: "delete" }, // min.insync.replicas: Minimum replicas for acks=all to succeed. // Set to 2 in production with replication-factor=3. { name: "min.insync.replicas", value: "1" }, ], }, // DLQ topic for failed events { topic: "order-events.DLQ", numPartitions: 1, // DLQ doesn't need high parallelism replicationFactor: 1, }, // Topic for processed results { topic: "order-confirmations", numPartitions: 3, replicationFactor: 1, }, ], }); // createTopics returns true if topics were created, false if they already exist console.log(`Topics created: ${topicsCreated}`); // ── List all topics ────────────────────────────────────────── const topics = await admin.listTopics(); console.log("All topics:", topics); // ── Get detailed topic metadata ────────────────────────────── // WHY: Shows partition count, leader broker, replica brokers, ISR const metadata = await admin.fetchTopicMetadata({ topics: ["order-events"], }); console.log("Topic metadata:", JSON.stringify(metadata, null, 2)); // ── Disconnect (always clean up!) ──────────────────────────── await admin.disconnect(); console.log("Disconnected."); } main().catch(console.error);

Run It

npm run admin # Output: # Connecting to Kafka... # Connected! # Topics created: true # All topics: [ "order-events", "order-events.DLQ", "order-confirmations" ] # Topic metadata: { topics: [{ name: "order-events", partitions: [...] }] } # Disconnected.
Other Admin Operations You Can Do admin.deleteTopics({ topics: ["old-topic"] }) — Delete topics.   admin.createPartitions({ topicPartitions: [{ topic: "x", count: 6 }] }) — Add partitions (can't reduce).   admin.describeGroups(["group-id"]) — See consumer group details.   admin.listGroups() — List all consumer groups.   admin.fetchTopicOffsets("topic") — See current offset for each partition.
Chapter 36

Build a Producer

The producer sends events to Kafka topics. We'll build it step-by-step, explaining every line and every option.

// src/producer.ts // A complete Kafka producer that sends order events. // Run: npm run producer import kafka from "./client"; import { Partitioners, CompressionTypes } from "kafkajs"; // ── Define the shape of our events (TypeScript interface) ──── // WHY: This ensures we always send events with the correct structure. // TypeScript will catch mistakes at compile time. interface OrderEvent { orderId: string; customerId: string; product: string; quantity: number; price: number; status: "PLACED" | "CONFIRMED" | "SHIPPED" | "DELIVERED"; timestamp: string; } async function main() { // ── Create a Producer instance ──────────────────────────────── const producer = kafka.producer({ // idempotent: Prevents duplicate events on retry. // WHY: If the network drops AFTER Kafka writes but BEFORE you get the ACK, // the producer retries. Without idempotence, Kafka stores the event twice. // With idempotence, Kafka detects the duplicate and ignores it. // WHAT IT CHANGES: Automatically sets acks=all, retries=MAX, maxInFlightRequests=5 // OTHER OPTIONS: false (default — duplicates possible on retry) idempotent: true, // maxInFlightRequests: How many unacknowledged requests can be "in flight". // WHY: Higher = faster (more parallel requests), but risks out-of-order delivery. // With idempotent=true, max is 5 (Kafka tracks sequence numbers to reorder). // OTHER OPTIONS: 1 (strict ordering, slower), 5 (max for idempotent) maxInFlightRequests: 5, // allowAutoTopicCreation: Should the producer auto-create topics if they don't exist? // WHY: Set to false in production — topics should be created deliberately with // proper partition count and replication. Auto-creation uses bad defaults. // OTHER OPTIONS: true (Kafka auto-creates with default configs) allowAutoTopicCreation: false, // createPartitioner: Strategy for choosing which partition an event goes to. // WHY: DefaultPartitioner uses the event key to determine partition. // Same key → same partition → ordering guarantee for that key. // OTHER OPTIONS: // Partitioners.LegacyPartitioner — older algorithm (KafkaJS v1 behavior) // Partitioners.JavaCompatiblePartitioner — matches Java client's partitioning // Custom function — (topic, key, message) => partitionNumber createPartitioner: Partitioners.DefaultPartitioner, }); // ── Connect ────────────────────────────────────────────────── console.log("Connecting producer..."); await producer.connect(); console.log("Producer connected!"); // ── Send a Single Event ────────────────────────────────────── const order: OrderEvent = { orderId: "ORD-001", customerId: "CUST-42", product: "Mechanical Keyboard", quantity: 1, price: 2499, status: "PLACED", timestamp: new Date().toISOString(), }; const result = await producer.send({ // topic: Which topic to send to. topic: "order-events", // acks: How many brokers must confirm the write. // WHY: -1 (all) = safest. Leader + all ISR replicas must confirm. // OTHER OPTIONS: // 0 — fire-and-forget (fastest, data can be lost) // 1 — only leader confirms (balanced) // -1 — all ISR confirm (safest, required for idempotent producer) acks: -1, // timeout: How long the broker waits for ISR replicas to acknowledge. // WHY: If ISR replicas are slow, the write times out instead of hanging forever. // OTHER OPTIONS: Any ms value. Default is 30000 (30 seconds). timeout: 30000, // compression: Compress the batch before sending over the network. // WHY: Reduces network bandwidth and disk usage. Especially good for large events. // OTHER OPTIONS: // CompressionTypes.None — no compression (default) // CompressionTypes.GZIP — best compression ratio, slower // CompressionTypes.Snappy — fast compression, moderate ratio // CompressionTypes.LZ4 — fastest compression // CompressionTypes.ZSTD — best balance of speed and ratio compression: CompressionTypes.GZIP, // messages: Array of events to send (even for one event, it's an array). messages: [ { // key: Determines which partition this event goes to. // WHY: All events with key "ORD-001" go to the SAME partition. // This guarantees ordering for this order's lifecycle events. // (PLACED → CONFIRMED → SHIPPED → DELIVERED stay in order) // OTHER OPTIONS: // null — round-robin across partitions (no ordering guarantee) // "CUST-42" — partition by customer (all customer events together) key: order.orderId, // value: The actual event data. Must be a string or Buffer. // WHY: Kafka stores raw bytes. We JSON.stringify our TypeScript object. // OTHER OPTIONS: Buffer.from(avroEncodedBytes) for Avro/Protobuf value: JSON.stringify(order), // headers: Optional metadata attached to the event. // WHY: Headers let you add metadata WITHOUT changing the event body. // Useful for routing, tracing, versioning, filtering. // OTHER OPTIONS: Any key-value pairs. Values must be strings or Buffers. headers: { "event-type": "ORDER_PLACED", // what kind of event "source": "order-service", // which service sent it "correlation-id": "req-abc-123", // for distributed tracing "schema-version": "1", // event schema version }, // timestamp: Event time in ms since epoch (optional). // WHY: Kafka records when the event HAPPENED (not when it was stored). // Useful for time-based queries and windowed aggregations. // OTHER OPTIONS: omit (Kafka uses broker's clock), or any epoch ms value timestamp: Date.now().toString(), // partition: Force a specific partition (optional). // WHY: Overrides the key-based partitioning. Rarely used. // OTHER OPTIONS: 0, 1, 2, ... (any valid partition index) // partition: 0, }, ], }); // ── Result ──────────────────────────────────────────────────── // result is an array (one entry per topic-partition) with: // topicName, partition, errorCode, baseOffset, logAppendTime, logStartOffset console.log("Event sent!", result); // Output: [{ topicName: "order-events", partition: 1, errorCode: 0, baseOffset: "0" }] // ── Send Multiple Events (Simulating Order Lifecycle) ──────── const statuses: OrderEvent["status"][] = ["CONFIRMED", "SHIPPED", "DELIVERED"]; for (const status of statuses) { await producer.send({ topic: "order-events", acks: -1, messages: [ { key: "ORD-001", // Same key = same partition = ordered! value: JSON.stringify({ ...order, status, timestamp: new Date().toISOString() }), headers: { "event-type": `ORDER_${status}` }, }, ], }); console.log(`Sent: ORDER_${status}`); } // ── Disconnect ──────────────────────────────────────────────── await producer.disconnect(); console.log("Producer disconnected."); } main().catch(console.error);

Run It

npm run producer # Output: # Connecting producer... # Producer connected! # Event sent! [{ topicName: "order-events", partition: 2, baseOffset: "0" }] # Sent: ORDER_CONFIRMED # Sent: ORDER_SHIPPED # Sent: ORDER_DELIVERED # Producer disconnected.
Check Kafka UI Open http://localhost:8080 → Click "order-events" topic → Click "Messages" tab. You'll see all 4 events. Notice they're all in the same partition (because same key "ORD-001").
Chapter 37

Build a Consumer

The consumer reads events from Kafka. It joins a consumer group, gets partitions assigned, and processes events in a loop.

// src/consumer.ts // A complete Kafka consumer that reads order events. // Run: npm run consumer import kafka from "./client"; // ── Define the event type (same as producer) ───────────────── interface OrderEvent { orderId: string; customerId: string; product: string; quantity: number; price: number; status: string; timestamp: string; } async function main() { // ── Create a Consumer instance ──────────────────────────────── const consumer = kafka.consumer({ // groupId: The consumer group this consumer belongs to. // WHY: All consumers with the same groupId share the work. // Kafka assigns partitions among group members so each partition // is read by exactly one consumer in the group. // OTHER OPTIONS: Any string. Use your service name. // "notification-service", "analytics-service", etc. // Different groupId = independent reading (each group gets all events) groupId: "notification-service", // sessionTimeout: How long before Kafka considers this consumer dead. // WHY: If the consumer doesn't send a heartbeat within this time, // Kafka triggers a rebalance and gives its partitions to others. // OTHER OPTIONS: 6000-300000 ms. Default: 30000 (30s). // Lower = faster failure detection but more false alarms. // Higher = fewer false alarms but slower failure detection. sessionTimeout: 30000, // heartbeatInterval: How often the consumer sends "I'm alive" signals. // WHY: Must be less than sessionTimeout (typically 1/3 of it). // Kafka uses heartbeats to detect dead consumers. // OTHER OPTIONS: Any ms value. Default: 3000 (3s). heartbeatInterval: 3000, // maxWaitTimeInMs: How long the broker waits before returning empty if no new events. // WHY: Longer = fewer network requests but higher latency for first event. // OTHER OPTIONS: Any ms value. Default: 5000 (5s). Set to 100 for low-latency. maxWaitTimeInMs: 5000, // maxBytesPerPartition: Max bytes to fetch per partition in one request. // WHY: Controls memory usage. Larger = fewer requests but more memory. // OTHER OPTIONS: Default: 1048576 (1MB). Increase for high-throughput. maxBytesPerPartition: 1048576, // retry: Retry configuration for consumer operations. retry: { retries: 10, // how many times to retry (default: 5) }, }); // ── Connect ────────────────────────────────────────────────── console.log("Connecting consumer..."); await consumer.connect(); console.log("Consumer connected!"); // ── Subscribe to topics ────────────────────────────────────── await consumer.subscribe({ // topic: Which topic to read from. // OTHER OPTIONS: // topics: ["order-events", "payment-events"] — subscribe to multiple // topic: /order-.*/ — regex pattern (all topics matching the pattern) topic: "order-events", // fromBeginning: Where to start reading if this consumer group has no committed offset. // WHY: true = read ALL events from the very start (offset 0). // false = only read NEW events arriving after this consumer starts. // NOTE: This ONLY matters the FIRST time this groupId reads this topic. // After that, Kafka uses the committed offset. // OTHER OPTIONS: false (default — skip old events) fromBeginning: true, }); // ── Process events ─────────────────────────────────────────── // consumer.run() starts the poll loop: fetch → process → commit → repeat await consumer.run({ // autoCommit: Should offsets be committed automatically? // WHY: true = Kafka auto-commits every autoCommitInterval ms. // Simpler but risky — events may be "committed" before fully processed. // false = YOU decide when to commit (safer, more control). // OTHER OPTIONS: false (for manual commit control) autoCommit: true, // autoCommitInterval: How often auto-commit runs (only if autoCommit=true). // OTHER OPTIONS: Default: 5000 (5s). Lower = less data loss risk on crash. autoCommitInterval: 5000, // eachMessage: Called ONCE for EACH event. This is where your business logic goes. // WHY: eachMessage gives you fine-grained control — process one event at a time. // OTHER OPTIONS: // eachBatch: ({ batch, resolveOffset, heartbeat }) => {} // — called once per BATCH of events (more efficient for high throughput) // — you manually call resolveOffset() and heartbeat() eachMessage: async ({ topic, partition, message, heartbeat, pause }) => { // ── Parameters explained ── // topic: which topic this event came from // partition: which partition (number) // message: the actual Kafka message object // heartbeat: call this during long processing to prevent session timeout // pause: call this to temporarily stop reading from this topic-partition // ── Extract event data ──────────────────────────────────── const offset = message.offset; const key = message.key?.toString(); // Buffer → string const value: OrderEvent = JSON.parse(message.value!.toString()); // Buffer → JSON const eventType = message.headers?.["event-type"]?.toString(); const timestamp = message.timestamp; // event creation time // ── Process the event ───────────────────────────────────── console.log(` ┌───────────────────────────────────────────────── │ Topic: ${topic} | Partition: ${partition} | Offset: ${offset} │ Key: ${key} | Event: ${eventType} │ Order: ${value.orderId} — ${value.product} │ Status: ${value.status} | Price: ₹${value.price} │ Time: ${value.timestamp} └─────────────────────────────────────────────────`); // ── If processing takes long, send heartbeat ───────────── // WHY: If your processing takes > heartbeatInterval (3s), // Kafka might think the consumer is dead and trigger a rebalance. // Calling heartbeat() tells Kafka "I'm still alive, just busy." await heartbeat(); // ── Example: Pause reading if an external service is down ─ // const resume = pause(); // setTimeout(() => resume(), 30000); // resume after 30 seconds }, }); // ── Graceful Shutdown ──────────────────────────────────────── // WHY: When your app is stopped (Ctrl+C), you should disconnect cleanly // so Kafka knows immediately (instead of waiting for session timeout). // This triggers a rebalance right away instead of after 30 seconds. const shutdown = async () => { console.log("\nShutting down consumer..."); await consumer.disconnect(); console.log("Consumer disconnected."); process.exit(0); }; process.on("SIGINT", shutdown); // Ctrl+C process.on("SIGTERM", shutdown); // Docker stop / kill } main().catch(console.error);

Run It

# Terminal 1: Start the consumer (it keeps running, waiting for events) npm run consumer # Terminal 2: Send events from the producer npm run producer # Terminal 1 output: # ┌───────────────────────────────────────────────── # │ Topic: order-events | Partition: 2 | Offset: 0 # │ Key: ORD-001 | Event: ORDER_PLACED # │ Order: ORD-001 — Mechanical Keyboard # │ Status: PLACED | Price: ₹2499 # └───────────────────────────────────────────────── # ... (CONFIRMED, SHIPPED, DELIVERED follow in order)
Manual Commit Example For safer processing, disable autoCommit and commit manually:
// Manual commit — commit AFTER processing succeeds await consumer.run({ autoCommit: false, // disable auto-commit eachMessage: async ({ topic, partition, message }) => { // Process the event... const value = JSON.parse(message.value!.toString()); console.log(`Processing: ${value.orderId}`); // Only AFTER successful processing, commit this offset. // WHY: If the consumer crashes before this line, Kafka will // re-deliver this event (at-least-once guarantee). await consumer.commitOffsets([ { topic, partition, offset: (parseInt(message.offset) + 1).toString(), // WHY offset + 1? The committed offset means // "I've processed everything UP TO this offset." // So if you processed offset 5, commit 6 // (meaning "next time, start from 6"). }, ]); }, });
Chapter 38

Advanced Patterns

Production-ready patterns: batch sending, Dead Letter Queue, and transactions.

1. Batch Producer (High Throughput)

src/advanced/producer-batch.ts — Send many events at once for better performance.

// src/advanced/producer-batch.ts // Sends 100 events in batches — much faster than one-by-one. import kafka from "../client"; import { CompressionTypes } from "kafkajs"; async function main() { const producer = kafka.producer({ idempotent: true, maxInFlightRequests: 5 }); await producer.connect(); // ── Build a batch of 100 events ────────────────────────────── const messages = Array.from({ length: 100 }, (_, i) => ({ key: `ORD-${String(i).padStart(4, "0")}`, value: JSON.stringify({ orderId: `ORD-${String(i).padStart(4, "0")}`, customerId: `CUST-${i % 10}`, // 10 unique customers product: "Laptop", quantity: 1, price: 49999, status: "PLACED", timestamp: new Date().toISOString(), }), headers: { "event-type": "ORDER_PLACED" }, })); // ── Send all 100 events in ONE network request ────────────── // WHY: Batching reduces network round-trips dramatically. // 100 individual sends = 100 network calls. // 1 batched send = 1 network call (100x faster!). console.time("batch-send"); await producer.send({ topic: "order-events", acks: -1, compression: CompressionTypes.GZIP, // Compress the entire batch messages, // all 100 events go in one request }); console.timeEnd("batch-send"); // Typically < 50ms for 100 events! // ── sendBatch: Send to MULTIPLE topics in one call ─────────── // WHY: If you need to write to different topics atomically, // sendBatch groups them into one network operation. await producer.sendBatch({ topicMessages: [ { topic: "order-events", messages: [{ key: "ORD-999", value: '{"orderId":"ORD-999","status":"PLACED"}' }], }, { topic: "order-confirmations", messages: [{ key: "ORD-999", value: '{"orderId":"ORD-999","confirmed":true}' }], }, ], }); console.log("Batch + multi-topic send complete."); await producer.disconnect(); } main().catch(console.error);

2. Consumer with Dead Letter Queue

src/advanced/consumer-dlq.ts — Retry failed events and move permanently failed ones to a DLQ topic.

// src/advanced/consumer-dlq.ts // Consumer with retry logic and Dead Letter Queue. import kafka from "../client"; import { KafkaMessage } from "kafkajs"; const MAX_RETRIES = 3; // ── DLQ Producer (sends failed events to the DLQ topic) ────── const dlqProducer = kafka.producer(); // ── Simulated processing that sometimes fails ──────────────── async function processEvent(value: any): Promise<void> { // Simulate: 30% chance of failure if (Math.random() < 0.3) { throw new Error(`Payment gateway timeout for ${value.orderId}`); } console.log(` ✓ Processed: ${value.orderId} — ${value.status}`); } // ── Send failed event to DLQ topic ─────────────────────────── async function sendToDLQ( message: KafkaMessage, topic: string, partition: number, error: Error, retryCount: number ) { await dlqProducer.send({ topic: "order-events.DLQ", messages: [ { // Keep the same key so DLQ events can be correlated key: message.key, // Keep the original event data value: message.value, // Add metadata about the failure headers: { ...message.headers, // preserve original headers "dlq-original-topic": topic, "dlq-original-partition": partition.toString(), "dlq-original-offset": message.offset, "dlq-error-message": error.message, "dlq-retry-count": retryCount.toString(), "dlq-timestamp": Date.now().toString(), }, }, ], }); console.log(` ✗ Sent to DLQ: ${message.key?.toString()} — ${error.message}`); } async function main() { const consumer = kafka.consumer({ groupId: "notification-service-dlq" }); await dlqProducer.connect(); await consumer.connect(); await consumer.subscribe({ topic: "order-events", fromBeginning: true }); await consumer.run({ autoCommit: false, // Manual commit for maximum safety eachMessage: async ({ topic, partition, message }) => { const value = JSON.parse(message.value!.toString()); let retries = 0; // ── Retry loop ─────────────────────────────────────────── while (retries < MAX_RETRIES) { try { await processEvent(value); break; // Success! Exit retry loop } catch (err) { retries++; console.log(` ↻ Retry ${retries}/${MAX_RETRIES}: ${(err as Error).message}`); if (retries === MAX_RETRIES) { // All retries exhausted → send to DLQ await sendToDLQ(message, topic, partition, err as Error, retries); } else { // Wait before retrying (exponential backoff) await new Promise((r) => setTimeout(r, 1000 * retries)); } } } // ── Commit offset (whether processed successfully or sent to DLQ) ─ // WHY: We commit after DLQ too, so the consumer moves forward. // The failed event is now in the DLQ — no need to block here. await consumer.commitOffsets([{ topic, partition, offset: (parseInt(message.offset) + 1).toString(), }]); }, }); const shutdown = async () => { await consumer.disconnect(); await dlqProducer.disconnect(); process.exit(0); }; process.on("SIGINT", shutdown); process.on("SIGTERM", shutdown); } main().catch(console.error);

3. Transactions (Exactly-Once Processing)

src/advanced/transaction.ts — Read from one topic, process, write to another topic, and commit offset — all as ONE atomic operation.

// src/advanced/transaction.ts // Exactly-once: Read → Process → Write → Commit as ONE transaction. // If any step fails, everything rolls back. import kafka from "../client"; async function main() { // ── Transactional Producer ──────────────────────────────────── // WHY: A transactional producer can group multiple writes + offset commits // into one atomic operation. Either ALL succeed or NONE do. const producer = kafka.producer({ idempotent: true, // required for transactions maxInFlightRequests: 1, // required for transactions transactionalId: "order-processor-txn-1", // unique ID for this transactional producer // WHY transactionalId: Kafka uses this to track the transaction state. // If the producer crashes and restarts with the same ID, // Kafka can abort any incomplete transactions from the old instance. // RULE: Each producer instance must have a UNIQUE transactionalId. }); const consumer = kafka.consumer({ groupId: "order-processor", // WHY: With transactions, the consumer reads events and the transactional // producer commits the consumer's offsets as part of the transaction. }); await producer.connect(); await consumer.connect(); await consumer.subscribe({ topic: "order-events", fromBeginning: true }); await consumer.run({ autoCommit: false, // MUST be false — transactions handle commits eachMessage: async ({ topic, partition, message }) => { const order = JSON.parse(message.value!.toString()); // ── Start a transaction ────────────────────────────────── const txn = await producer.transaction(); // WHY: Everything between transaction() and commit()/abort() // is part of the same atomic operation. try { // Step 1: Process the order (your business logic) const confirmation = { orderId: order.orderId, status: "CONFIRMED", confirmedAt: new Date().toISOString(), estimatedDelivery: "3-5 business days", }; // Step 2: Write the result to another topic (inside the transaction) await txn.send({ topic: "order-confirmations", messages: [ { key: order.orderId, value: JSON.stringify(confirmation), }, ], }); // Step 3: Commit the consumer offset (inside the transaction) // WHY: This ties the offset commit to the write above. // If the write to "order-confirmations" fails, the offset is NOT committed. // So the event will be re-read and re-processed. await txn.sendOffsets({ consumerGroupId: "order-processor", topics: [ { topic, partitions: [ { partition, offset: (parseInt(message.offset) + 1).toString(), }, ], }, ], }); // Step 4: COMMIT the transaction // WHY: This makes all the writes and offset commit visible atomically. // Consumers with isolation.level=read_committed will now see them. await txn.commit(); console.log(`✓ Transaction committed: ${order.orderId}`); } catch (err) { // Something went wrong → ABORT the entire transaction // WHY: This undoes everything — writes and offset commits. // The event will be re-delivered on next poll. await txn.abort(); console.error(`✗ Transaction aborted: ${order.orderId} — ${(err as Error).message}`); } }, }); const shutdown = async () => { await consumer.disconnect(); await producer.disconnect(); process.exit(0); }; process.on("SIGINT", shutdown); process.on("SIGTERM", shutdown); } main().catch(console.error);
Complete Implementation Summary You've now built a full Kafka application with Node.js + TypeScript covering:   1. Admin API — create topics with custom configs   2. Producer — send events with keys, headers, compression   3. Consumer — read events, auto-commit and manual commit   4. Batch Producer — high-throughput multi-event sending   5. DLQ Consumer — retry + dead letter queue for failed events   6. Transactions — exactly-once read-process-write pattern   Every line is explained. Every option has alternatives documented. You're production-ready!

Quick Reference: Run Commands

CommandWhat It DoesRun Order
docker compose up -dStart Kafka + UI containersFirst (once)
npm run adminCreate topics (order-events, DLQ, confirmations)Second (once)
npm run consumerStart consumer (keeps running)Third (Terminal 1)
npm run producerSend order eventsFourth (Terminal 2)
npm run batchSend 100 events at onceAnytime
npm run dlqConsumer with retry + DLQAnytime
npm run txnExactly-once transaction consumerAnytime
Chapter 39

Multi-Broker Docker Setup (3 Brokers)

A single broker is fine for learning, but production needs at least 3 brokers for fault tolerance. If one broker dies, the other two keep serving data with zero downtime.

In Simple Words

One broker = one copy of your data. If it dies, everything is gone. Three brokers = three copies of your data on three different machines. Even if one machine catches fire, the other two have a full copy and keep working. That's why production always uses 3+ brokers.

What We're Building
┌──────────────────────────────────────────────────────────────┐ │ DOCKER NETWORK │ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │ │ │ :9092 │ │ :9094 │ │ :9096 │ │ │ │ │ │ │ │ │ │ │ │ P0-Leader │◄──▶│ P0-Follow │ │ P0-Follow │ │ │ │ P1-Follow │ │ P1-Leader │◄──▶│ P1-Follow │ │ │ │ P2-Follow │ │ P2-Follow │ │ P2-Leader │ │ │ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ ┌──────────┐ Controller = Broker 1 (elected via KRaft) │ │ │ Kafka UI │ │ │ │ :8080 │ Replication Factor = 3 (all brokers have │ │ └──────────┘ a copy of every partition) │ └──────────────────────────────────────────────────────────────┘ Your App connects to: localhost:9092, localhost:9094, localhost:9096

docker-compose.yml (3 Brokers + KRaft + Kafka UI)

# docker-compose.yml # Production-like setup: 3 Kafka brokers using KRaft (no ZooKeeper) # Each broker is both a broker AND a controller (for simplicity) # In real production, you'd separate controller nodes from broker nodes version: "3.8" services: # ════════════════════════════════════════════════════════════════ # BROKER 1 — Port 9092 # ════════════════════════════════════════════════════════════════ kafka-1: image: apache/kafka:3.7.0 container_name: kafka-1 ports: - "9092:9092" # Your app connects here for Broker 1 environment: # ── Identity ── KAFKA_NODE_ID: 1 # Unique ID. Must be different for each broker. # WHY: Kafka uses this to track which broker owns which partitions. # ── Roles ── KAFKA_PROCESS_ROLES: broker,controller # WHY: This node acts as both a data broker AND a controller voter. # In large production clusters (50+ brokers), you'd set dedicated # controller-only nodes: KAFKA_PROCESS_ROLES: controller # and separate broker-only nodes: KAFKA_PROCESS_ROLES: broker # ── Controller Quorum ── KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093 # WHY: Lists ALL controllers that participate in leader election. # Format: nodeId@host:controllerPort # With 3 voters: quorum = 2 (majority). Can tolerate 1 controller failure. # ── Listeners ── # WHY we need 3 different listeners: # INTERNAL = broker-to-broker traffic (replication, metadata) # EXTERNAL = your app connects from outside Docker # CONTROLLER = KRaft controller communication KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093 KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092 # WHY advertised != listeners? # "listeners" = what Kafka binds to inside the container (0.0.0.0 = all interfaces) # "advertised" = what Kafka tells clients to connect to # INTERNAL://kafka-1:29092 → other brokers inside Docker use this # EXTERNAL://localhost:9092 → your app outside Docker uses this KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT # WHY: Maps each listener name to a security protocol. # PLAINTEXT = no encryption. In production, use SSL or SASL_SSL. KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL # WHY: Tells Kafka which listener to use for broker-to-broker communication. # Replication traffic uses this. Must be one of the defined listeners. KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER # ── Topic Defaults (production-like) ── KAFKA_DEFAULT_REPLICATION_FACTOR: 3 # WHY: Every new topic will have 3 copies by default. # With 3 brokers, RF=3 means every broker has a copy of every partition. KAFKA_MIN_INSYNC_REPLICAS: 2 # WHY: For acks=all, at least 2 of 3 replicas must confirm the write. # This means you can lose 1 broker and still accept writes. # If 2 brokers die, writes are rejected (data safety over availability). KAFKA_NUM_PARTITIONS: 3 # WHY: Default partition count for auto-created topics. # 3 partitions = 3 consumers can read in parallel. KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 # WHY: Internal topics (__consumer_offsets, __transaction_state) also need # replication. If these lose data, consumer offsets and transactions break. KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" KAFKA_LOG_RETENTION_HOURS: 168 # ── KRaft Cluster ID (must be SAME for all brokers) ── CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qg" # WHY: All brokers in the same cluster must share this ID. # Generate your own with: kafka-storage.sh random-uuid # ════════════════════════════════════════════════════════════════ # BROKER 2 — Port 9094 # ════════════════════════════════════════════════════════════════ kafka-2: image: apache/kafka:3.7.0 container_name: kafka-2 ports: - "9094:9094" environment: KAFKA_NODE_ID: 2 # Different ID! KAFKA_PROCESS_ROLES: broker,controller KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093 KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9094,CONTROLLER://0.0.0.0:29093 KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29092,EXTERNAL://localhost:9094 # NOTE: EXTERNAL is localhost:9094 (different port than Broker 1) KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_MIN_INSYNC_REPLICAS: 2 KAFKA_NUM_PARTITIONS: 3 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" KAFKA_LOG_RETENTION_HOURS: 168 CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qg" # SAME cluster ID # ════════════════════════════════════════════════════════════════ # BROKER 3 — Port 9096 # ════════════════════════════════════════════════════════════════ kafka-3: image: apache/kafka:3.7.0 container_name: kafka-3 ports: - "9096:9096" environment: KAFKA_NODE_ID: 3 # Different ID! KAFKA_PROCESS_ROLES: broker,controller KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093 KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9096,CONTROLLER://0.0.0.0:29093 KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29092,EXTERNAL://localhost:9096 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_DEFAULT_REPLICATION_FACTOR: 3 KAFKA_MIN_INSYNC_REPLICAS: 2 KAFKA_NUM_PARTITIONS: 3 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" KAFKA_LOG_RETENTION_HOURS: 168 CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qg" # SAME cluster ID # ════════════════════════════════════════════════════════════════ # KAFKA UI — Visual Dashboard # ════════════════════════════════════════════════════════════════ kafka-ui: image: provectuslabs/kafka-ui:latest container_name: kafka-ui ports: - "8080:8080" environment: KAFKA_CLUSTERS_0_NAME: production-local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092 depends_on: - kafka-1 - kafka-2 - kafka-3

Start the 3-Broker Cluster

# Start all 3 brokers + UI docker compose up -d # Verify all containers are running docker compose ps # Expected output: # kafka-1 running 0.0.0.0:9092->9092/tcp # kafka-2 running 0.0.0.0:9094->9094/tcp # kafka-3 running 0.0.0.0:9096->9096/tcp # kafka-ui running 0.0.0.0:8080->8080/tcp # Create a topic with replication factor 3 docker exec -it kafka-1 /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \ --create \ --topic order-events \ --partitions 3 \ --replication-factor 3 # Verify partition distribution across brokers docker exec -it kafka-1 /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \ --describe \ --topic order-events # Output shows: # Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 # Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 # Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 # ↑ Each partition has a DIFFERENT leader = load is distributed! # ↑ Replicas: 1,2,3 means ALL 3 brokers have a copy # ↑ ISR: 1,2,3 means ALL copies are in-sync

Update client.ts for Multi-Broker

The only change in your code — list all 3 broker addresses:

// src/client.ts — Updated for 3-broker cluster import { Kafka, logLevel } from "kafkajs"; const kafka = new Kafka({ clientId: "order-service", // List ALL broker addresses. // WHY: The client tries each address until it finds a live broker. // If Broker 1 is down, it connects to Broker 2 or 3 automatically. // Once connected, Kafka tells the client about the full cluster topology. // NOTE: You don't NEED all 3 — even 1 is enough to discover the cluster. // But listing all 3 gives better availability during startup. brokers: [ "localhost:9092", // Broker 1 "localhost:9094", // Broker 2 "localhost:9096", // Broker 3 ], logLevel: logLevel.WARN, // retry: Controls what happens when a broker is temporarily unavailable. // WHY: In a multi-broker setup, one broker might restart while others are fine. // The client should retry connecting to another broker automatically. retry: { retries: 10, // try up to 10 times initialRetryTime: 300, // first retry after 300ms maxRetryTime: 30000, // max wait between retries: 30 seconds }, }); export default kafka;

Test Fault Tolerance — Kill a Broker

# Terminal 1: Start the consumer npm run consumer # Terminal 2: Start producing events continuously npm run producer # Terminal 3: Kill Broker 2 while events are flowing docker stop kafka-2 # What happens: # 1. Controller (Broker 1) detects kafka-2 is dead (no heartbeat) # 2. For partitions where kafka-2 was leader → new leader elected from ISR # 3. For partitions where kafka-2 was follower → removed from ISR # 4. Producer/Consumer get metadata update → reconnect to new leaders # 5. Events keep flowing! Zero data loss! # Check the topic — notice ISR shrunk from 3 to 2: docker exec -it kafka-1 /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server localhost:9092 \ --describe \ --topic order-events # Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,3 ← Broker 2 gone from ISR # Partition: 1 Leader: 3 Replicas: 2,3,1 Isr: 3,1 ← New leader! Was 2, now 3 # Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1 ← Broker 2 gone from ISR # Bring Broker 2 back docker start kafka-2 # Broker 2 catches up with the leaders, re-joins ISR: # Partition: 0 Replicas: 1,2,3 Isr: 1,3,2 ← Broker 2 is back!
This is Real Fault Tolerance You just killed a broker and nothing broke. No events were lost. Producers and consumers reconnected automatically. This is why production uses 3 brokers with replication factor 3 and min.insync.replicas 2.
Chapter 40

Production-Grade Producer

The basic producer works, but production needs retry handling, error callbacks, graceful shutdown, and proper batching configs. Here's a production-ready producer.

// src/production/producer.ts // Production-grade Kafka producer with all the bells and whistles. import kafka from "../client"; import { CompressionTypes, Partitioners } from "kafkajs"; const producer = kafka.producer({ idempotent: true, maxInFlightRequests: 5, allowAutoTopicCreation: false, createPartitioner: Partitioners.DefaultPartitioner, }); // ═══════════════════════════════════════════════════════════════ // EVENT LISTENERS — Know what's happening inside the producer // ═══════════════════════════════════════════════════════════════ // WHY: In production, you need visibility into producer behavior. // These events help you debug issues, monitor health, and alert on failures. producer.on("producer.connect", () => { console.log("[PRODUCER] Connected to Kafka cluster"); // PRODUCTION: Emit a metric to your monitoring system (Prometheus, Datadog, etc.) }); producer.on("producer.disconnect", () => { console.log("[PRODUCER] Disconnected from Kafka cluster"); // PRODUCTION: This could mean a crash. Trigger an alert. }); producer.on("producer.network.request", (event) => { // Fires on EVERY request to Kafka (very noisy, use for debugging only) // event.payload: { apiKey, apiName, broker, clientId, correlationId, duration, size } // PRODUCTION: Track request latency → producer.network.request.duration }); producer.on("producer.network.request_timeout", (event) => { console.error("[PRODUCER] Request timeout:", event.payload); // PRODUCTION: Alert! Kafka might be overloaded or network issues. }); // ═══════════════════════════════════════════════════════════════ // SEND FUNCTION — Wrapped with error handling // ═══════════════════════════════════════════════════════════════ interface OrderEvent { orderId: string; customerId: string; product: string; quantity: number; price: number; status: string; } async function sendOrderEvent(order: OrderEvent): Promise<void> { try { const result = await producer.send({ topic: "order-events", acks: -1, timeout: 30000, compression: CompressionTypes.GZIP, messages: [ { key: order.orderId, value: JSON.stringify({ ...order, timestamp: new Date().toISOString(), version: 1, }), headers: { "event-type": `ORDER_${order.status}`, "source": "order-service", "correlation-id": crypto.randomUUID(), // WHY correlation-id: When this event flows through 5 services, // you can trace the entire journey using this single ID. // Essential for debugging in distributed systems. }, }, ], }); const meta = result[0]; console.log( `[SENT] ${order.orderId} → partition:${meta.partition} offset:${meta.baseOffset}` ); } catch (error: any) { // ── Handle specific Kafka errors ── if (error.type === "TOPIC_AUTHORIZATION_FAILED") { console.error("[ERROR] No permission to write to this topic. Check ACLs."); } else if (error.type === "REQUEST_TIMED_OUT") { console.error("[ERROR] Kafka is not responding. Check broker health."); } else { console.error("[ERROR] Failed to send event:", error.message); } // PRODUCTION: Send failed event to a retry queue or DLQ throw error; } } // ═══════════════════════════════════════════════════════════════ // MAIN — Connect, send events, handle shutdown // ═══════════════════════════════════════════════════════════════ async function main() { await producer.connect(); // Simulate continuous order production let orderNum = 1; const interval = setInterval(async () => { const id = String(orderNum++).padStart(4, "0"); await sendOrderEvent({ orderId: `ORD-${id}`, customerId: `CUST-${orderNum % 10}`, product: "Laptop", quantity: 1, price: 49999, status: "PLACED", }); }, 1000); // Send one event per second // ── Graceful Shutdown ── // WHY: When deploying new code (rolling restart), your app gets SIGTERM. // You must: // 1. Stop sending new events // 2. Wait for in-flight events to complete (acks received) // 3. Disconnect cleanly // If you don't, in-flight events may be lost or duplicated. const shutdown = async (signal: string) => { console.log(`\n[SHUTDOWN] Received ${signal}. Cleaning up...`); clearInterval(interval); // 1. Stop sending new events await producer.disconnect(); // 2. Flush pending + disconnect console.log("[SHUTDOWN] Producer stopped cleanly."); process.exit(0); }; process.on("SIGINT", () => shutdown("SIGINT")); // Ctrl+C process.on("SIGTERM", () => shutdown("SIGTERM")); // Docker stop / K8s pod kill process.on("uncaughtException", async (err) => { console.error("[FATAL] Uncaught exception:", err); await shutdown("uncaughtException"); }); } main().catch(console.error);
Chapter 41

Production-Grade Consumer

A production consumer needs manual commits, rebalance listeners, error handling, DLQ integration, and graceful shutdown.

// src/production/consumer.ts // Production-grade consumer: manual commit, DLQ, rebalance handling, graceful shutdown. import kafka from "../client"; const consumer = kafka.consumer({ groupId: "notification-service", // ── Session & Heartbeat ── sessionTimeout: 30000, heartbeatInterval: 3000, // RULE: heartbeatInterval should be 1/3 of sessionTimeout. // If heartbeat is missed for 30s, Kafka thinks consumer is dead. // ── Rebalance Timeout ── rebalanceTimeout: 60000, // WHY: During rebalance, consumers must join within this time. // If your consumer takes too long to commit offsets during rebalance, // it gets kicked out of the group. // ── Poll Interval ── maxWaitTimeInMs: 5000, // WHY: Broker waits up to 5 seconds for new data before returning empty. // Lower = lower latency but more network traffic. // Higher = higher latency but fewer requests. // ── Fetch Controls ── maxBytesPerPartition: 1048576, // 1MB per partition per fetch maxBytes: 10485760, // 10MB total per fetch across all partitions // WHY: Controls memory usage. If you have 100 partitions and 1MB each, // one fetch could use 100MB of memory. maxBytes caps the total. // ── Read Isolation (for transactions) ── // readUncommitted: false, // default // Set to: readUncommitted: false to only read committed transactions. // This is important if your producers use transactions. }); // DLQ producer for failed events const dlqProducer = kafka.producer(); // ═══════════════════════════════════════════════════════════════ // EVENT LISTENERS — Monitor consumer lifecycle // ═══════════════════════════════════════════════════════════════ consumer.on("consumer.group_join", (event) => { console.log(`[CONSUMER] Joined group. Member: ${event.payload.memberId}`); console.log(`[CONSUMER] Assigned partitions: ${JSON.stringify(event.payload.memberAssignment)}`); // WHY: Know which partitions this consumer instance is responsible for. // PRODUCTION: Log this so you can debug partition assignment issues. }); consumer.on("consumer.rebalancing", () => { console.warn("[CONSUMER] Rebalancing started! Partitions being reassigned..."); // WHY: During rebalance, this consumer STOPS reading. // PRODUCTION: Track rebalance frequency. Too many rebalances = problem. // Possible causes: consumers crashing, long processing, network issues. }); consumer.on("consumer.stop", () => { console.log("[CONSUMER] Stopped."); }); consumer.on("consumer.crash", (event) => { console.error("[CONSUMER] CRASHED!", event.payload.error); // PRODUCTION: Alert immediately! Consumer is dead. // The event.payload.restart indicates if KafkaJS will auto-restart. console.error(`[CONSUMER] Will auto-restart: ${event.payload.restart}`); }); consumer.on("consumer.fetch", () => { // Fires every time the consumer fetches a batch from Kafka. // PRODUCTION: Use to track fetch rate and throughput metrics. }); // ═══════════════════════════════════════════════════════════════ // PROCESSING LOGIC // ═══════════════════════════════════════════════════════════════ const MAX_RETRIES = 3; async function processOrder(order: any): Promise<void> { // Your actual business logic goes here. // Example: Send notification email, update database, call external API. console.log(` Processing: ${order.orderId} — ${order.status}`); // Simulate occasional failure if (Math.random() < 0.1) { throw new Error("Email service timeout"); } } // ═══════════════════════════════════════════════════════════════ // MAIN // ═══════════════════════════════════════════════════════════════ async function main() { await dlqProducer.connect(); await consumer.connect(); await consumer.subscribe({ topic: "order-events", fromBeginning: false, // WHY false in production: You usually don't want to reprocess // millions of old events when deploying a new consumer. // Only new events from now on. Set true only for backfill jobs. }); await consumer.run({ autoCommit: false, // Manual commit for safety eachMessage: async ({ topic, partition, message, heartbeat }) => { const value = JSON.parse(message.value!.toString()); let retries = 0; let success = false; // ── Retry loop with exponential backoff ── while (retries < MAX_RETRIES && !success) { try { await processOrder(value); success = true; } catch (err) { retries++; if (retries < MAX_RETRIES) { const backoff = 1000 * Math.pow(2, retries); // 2s, 4s, 8s... console.warn(` Retry ${retries}/${MAX_RETRIES} in ${backoff}ms...`); await new Promise((r) => setTimeout(r, backoff)); await heartbeat(); // Keep alive during wait! } } } // ── Send to DLQ if all retries failed ── if (!success) { await dlqProducer.send({ topic: "order-events.DLQ", messages: [{ key: message.key, value: message.value, headers: { ...message.headers, "dlq-error": "Max retries exhausted", "dlq-original-topic": topic, "dlq-original-partition": String(partition), "dlq-original-offset": message.offset, "dlq-retry-count": String(MAX_RETRIES), "dlq-timestamp": Date.now().toString(), }, }], }); console.error(` ✗ DLQ: ${value.orderId}`); } // ── Commit offset (whether processed or sent to DLQ) ── await consumer.commitOffsets([{ topic, partition, offset: (parseInt(message.offset) + 1).toString(), }]); }, }); // ── Graceful Shutdown ── // WHY in production: When Kubernetes rolls out a new deployment, // it sends SIGTERM to old pods. You must: // 1. Stop fetching new events // 2. Finish processing current events // 3. Commit final offsets // 4. Leave the consumer group cleanly (triggers immediate rebalance) // If you skip this, the group waits for sessionTimeout (30s) before rebalancing. const shutdown = async (signal: string) => { console.log(`\n[SHUTDOWN] ${signal} received. Draining...`); await consumer.stop(); // stop fetching, finish current batch await consumer.disconnect(); // commit offsets + leave group await dlqProducer.disconnect(); console.log("[SHUTDOWN] Consumer stopped cleanly."); process.exit(0); }; process.on("SIGINT", () => shutdown("SIGINT")); process.on("SIGTERM", () => shutdown("SIGTERM")); } main().catch(console.error);
Running Multiple Consumer Instances In production, you run 3 instances of this consumer (e.g., 3 Kubernetes pods). Since they share the same groupId: "notification-service", Kafka automatically assigns 1 partition to each consumer. If one pod crashes, its partition is reassigned to a surviving pod within sessionTimeout (30 seconds).
3 Consumer Instances Sharing 3 Partitions
Topic: order-events (3 partitions) Consumer Group: notification-service Pod 1 (consumer instance) ──── reads from ──── Partition 0 Pod 2 (consumer instance) ──── reads from ──── Partition 1 Pod 3 (consumer instance) ──── reads from ──── Partition 2 If Pod 2 crashes: Pod 1 ──── Partition 0 + Partition 1 (takes over Pod 2's work) Pod 3 ──── Partition 2 When Pod 2 recovers: Pod 1 ──── Partition 0 (rebalance gives partition back) Pod 2 ──── Partition 1 (resumes from last committed offset) Pod 3 ──── Partition 2
Chapter 42

Monitoring & Health Checks

In production, you need to monitor consumer lag, broker health, and throughput. Here's how to build monitoring into your Node.js app.

In Simple Words

Imagine a factory assembly line. "Consumer lag" is how many boxes are piled up on the belt, waiting to be picked up. If the pile grows, your workers (consumers) are too slow. You need to either speed them up or add more workers.

Check Consumer Lag Programmatically

// src/production/monitor.ts // Checks consumer lag — how far behind each consumer group is. import kafka from "../client"; async function checkConsumerLag() { const admin = kafka.admin(); await admin.connect(); const groupId = "notification-service"; const topic = "order-events"; // ── Step 1: Get the latest offset for each partition ── // WHY: This is the "end" of the log — where the newest event is. const topicOffsets = await admin.fetchTopicOffsets(topic); // Returns: [{ partition: 0, offset: "500" }, { partition: 1, offset: "450" }, ...] // ── Step 2: Get the committed offset for this consumer group ── // WHY: This is where the consumer has read up to. const groupOffsets = await admin.fetchOffsets({ groupId, topics: [topic] }); // Returns: [{ topic: "order-events", partitions: [{ partition: 0, offset: "480" }, ...] }] // ── Step 3: Calculate lag = latest offset - committed offset ── console.log(`\n[LAG REPORT] Consumer Group: ${groupId}`); console.log(`${"─".repeat(60)}`); let totalLag = 0; const groupPartitions = groupOffsets[0].partitions; for (const tp of topicOffsets) { const latestOffset = parseInt(tp.offset); const committed = groupPartitions.find((p) => p.partition === tp.partition); const committedOffset = committed ? parseInt(committed.offset) : 0; const lag = latestOffset - committedOffset; totalLag += lag; const status = lag === 0 ? "CAUGHT UP" : lag < 100 ? "OK" : lag < 1000 ? "WARNING" : "CRITICAL"; console.log( ` Partition ${tp.partition}: latest=${latestOffset} committed=${committedOffset} lag=${lag} [${status}]` ); } console.log(`${"─".repeat(60)}`); console.log(` TOTAL LAG: ${totalLag} events`); if (totalLag > 10000) { console.error(" ⚠ ALERT: Consumer lag is too high! Consider adding more consumers."); // PRODUCTION: Send alert to Slack/PagerDuty/email } // ── Step 4: Describe the consumer group ── // WHY: See which consumers are active, which partitions they own. const groupDesc = await admin.describeGroups([groupId]); const group = groupDesc.groups[0]; console.log(`\n[GROUP INFO]`); console.log(` State: ${group.state}`); // Stable, Rebalancing, Dead, Empty console.log(` Members: ${group.members.length}`); group.members.forEach((member, i) => { console.log(` Member ${i + 1}: ${member.clientId} (${member.clientHost})`); }); await admin.disconnect(); } // ── Run monitoring every 30 seconds ── async function main() { console.log("Starting Kafka monitor..."); while (true) { await checkConsumerLag(); await new Promise((r) => setTimeout(r, 30000)); } } main().catch(console.error);

Health Check Endpoint

Add a simple HTTP health check so Kubernetes knows your consumer is alive:

// Add this to your consumer file // WHY: Kubernetes uses HTTP health checks to decide if a pod is healthy. // If the health check fails, K8s restarts the pod automatically. import http from "http"; let isHealthy = true; let lastMessageTime = Date.now(); // Track when last message was processed consumer.on("consumer.fetch", () => { lastMessageTime = Date.now(); }); consumer.on("consumer.crash", () => { isHealthy = false; }); // Simple HTTP server for health checks const server = http.createServer((req, res) => { if (req.url === "/health") { // Liveness: Is the process alive? res.writeHead(isHealthy ? 200 : 503); res.end(isHealthy ? "OK" : "UNHEALTHY"); } else if (req.url === "/ready") { // Readiness: Is the consumer actively processing? // If no fetch in 60 seconds, something might be wrong. const stale = Date.now() - lastMessageTime > 60000; res.writeHead(stale ? 503 : 200); res.end(stale ? "STALE" : "READY"); } else { res.writeHead(404); res.end(); } }); server.listen(3000, () => { console.log("Health check server on :3000 (/health, /ready)"); });

Key Metrics to Monitor

MetricWhat It MeansAlert When
Consumer LagEvents waiting to be processedLag > 10,000 or growing continuously
Rebalance RateHow often partitions are reassignedMore than 5 rebalances / hour
Fetch LatencyTime to fetch a batch from Kafka> 5 seconds consistently
Commit LatencyTime to commit an offset> 3 seconds consistently
Under-Replicated PartitionsPartitions where ISR < replication factorAny value > 0
Offline PartitionsPartitions with no active leaderAny value > 0 (critical!)
DLQ Message CountFailed events in the Dead Letter QueueAny increase (investigate immediately)
Consumer Group StateStable, Rebalancing, Empty, DeadAnything other than "Stable"
Chapter 43

Production Deployment Checklist

Everything you need to verify before going to production. This is the summary of everything we've learned, as a checklist.

Cluster Configuration

ConfigDev ValueProduction ValueWhy
Broker Count13+ (odd number)Fault tolerance. Odd for clean quorum majority.
replication.factor13Survive 2 broker failures without data loss.
min.insync.replicas12With RF=3, at least 2 must confirm writes.
auto.create.topics.enabletruefalseTopics must be created deliberately with correct configs.
Controller Nodes1 (combined)3 dedicatedDedicated controllers = faster metadata operations.
log.retention.hours168 (7 days)Based on needBalance between replay ability and disk cost.

Producer Configuration

ConfigDev ValueProduction ValueWhy
acks1-1 (all)All ISR replicas must confirm. No data loss.
idempotentfalsetruePrevents duplicate events on retry.
compressionnoneGZIP or ZSTDReduces network bandwidth and disk usage 60-80%.
retries0MAX_INTAuto-set by idempotent=true. Never give up.
maxInFlightRequests55Max for idempotent. Sequence numbers maintain order.
Message KeynullBusiness keyorderId, userId, etc. Guarantees ordering per key.
Graceful ShutdownNoneSIGTERM handlerFlush pending events before exit.

Consumer Configuration

ConfigDev ValueProduction ValueWhy
autoCommittruefalseManual commit after processing. Prevents data loss.
fromBeginningtruefalseOnly read new events. Don't reprocess millions of old events.
sessionTimeout30s30sBalance between detection speed and false alarms.
heartbeatInterval3s3s (1/3 of session)Regular heartbeats prevent false death detection.
Instances Count1= partition count1 consumer per partition = max parallelism.
DLQNonetopic.DLQFailed events go to DLQ, not blocking the queue.
Health CheckNone/health + /readyKubernetes needs HTTP endpoints to manage pods.
Graceful ShutdownNoneSIGTERM handlerCommit offsets and leave group cleanly on deploy.

Operational Checklist

Before Launch
  • All topics created manually with correct partition count and RF=3.
  • SSL/TLS enabled for all connections.
  • SASL authentication configured per service.
  • ACLs restrict topic access (service A can't read service B's topics).
  • Schema Registry configured with BACKWARD compatibility.
  • DLQ topics created for every consumer topic.
  • Monitoring dashboards set up (consumer lag, broker health).
  • Alerts configured (lag > threshold, offline partitions, DLQ growth).
During Operation
  • Monitor consumer lag trends (growing = problem).
  • Watch rebalance frequency (too many = consumers are unstable).
  • Review DLQ events daily (fix root causes, replay fixed events).
  • Check under-replicated partitions (ISR < RF = broker may be failing).
  • Track disk usage on brokers (set up alerts at 80% capacity).
  • Test disaster recovery: periodically kill a broker, verify auto-recovery.
Scaling Decisions
  • Consumer lag growing? Add more consumer instances (up to partition count).
  • Need more parallelism? Add more partitions (can't reduce later!).
  • Disk full? Add more brokers or reduce retention period.
  • High latency? Check compression, batch size, fetch size configs.
  • Too many rebalances? Use CooperativeSticky assignor, tune timeouts.
  • Network bottleneck? Enable compression (ZSTD is fastest with good ratio).
You're Production-Ready! You've learned Kafka from the very basics (what is an event?) all the way to production-grade distributed systems with 3-broker clusters, fault tolerance testing, monitoring, DLQ patterns, transactions, and deployment checklists. The theory chapters (1-32) explain WHY things work. The implementation chapters (33-43) show HOW to build them. Go ship something amazing.