Ajithkumar Sekar
Senior Software Developer at Weave Communications
Apache Kafka is a distributed event streaming platform
Producers → publish messages/events to topics
Topics → categorized streams of data stored in clusters
Consumers → subscribe to topics and process messages
High throughput, scalable, and fault-tolerant messaging system
graph LR
P1[Producer 1] -->|publish| KC
P2[Producer 2] -->|publish| KC
subgraph KC[Kafka Cluster]
T1[Topic: orders]
end
KC -->|subscribe| C1[Consumer 1]
KC -->|subscribe| C2[Consumer 2]
KC -->|subscribe| C3[Consumer 3]
style P1 fill:#d63031,stroke:#333,color:#fff
style P2 fill:#d63031,stroke:#333,color:#fff
style T1 fill:#0984e3,stroke:#333,color:#fff
style KC fill:#fdcb6e,stroke:#333,color:#000
style C1 fill:#00b894,stroke:#333,color:#fff
style C2 fill:#00b894,stroke:#333,color:#fff
style C3 fill:#00b894,stroke:#333,color:#fff
from kafka import KafkaProducer
import json
# Initialize producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Send message
message = {'order_id': 12345, 'product': 'Laptop', 'quantity': 2}
producer.send('orders', value=message)
producer.flush()
from kafka import KafkaConsumer
import json
# Initialize consumer
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
group_id='order-processor'
)
# Process messages
for message in consumer:
order = message.value
print(f"An order of {order['product']} with quantity {order['quantity']} received.")
What happens once an order message is consumed?
Do you see any inefficiencies in the events being sent?
Centralized service for managing and validating schemas
Benefits:
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "int"},
{"name": "product", "type": "string"},
{"name": "quantity", "type": "int"}
]
}
message Order {
int32 order_id = 1;
string product = 2;
int32 quantity = 3;
}
{"order_id": 12345,
"product": "Laptop",
"quantity": 2}
{"order_id": 12346,
"product": "Mouse",
"quantity": 5}
{"order_id": 12347,
"product": "Keyboard",
"quantity": 1}
❌ Repetitive keys
❌ Wasteful storage
[ID:001][0x3039
"Laptop" 0x02]
[ID:001][0x303A
"Mouse" 0x05]
[ID:001][0x303B
"Keyboard" 0x01]
✓ Schema separate
✓ 50% reduction
graph TB
P[Producer] -->|1. Get Schema| SR[Schema Registry]
SR -->|2. Return Schema ID| P
P -->|3. Serialize with Schema ID| KC[Kafka Cluster]
KC -->|4. Read Message| C[Consumer]
C -->|5. Get Schema by ID| SR
SR -->|6. Return Schema| C
style P fill:#d63031,stroke:#333,color:#fff
style SR fill:#6c5ce7,stroke:#333,color:#fff
style KC fill:#fdcb6e,stroke:#333,color:#000
style C fill:#00b894,stroke:#333,color:#fff
Framework for integrating Kafka with external systems
Types of Connectors:
graph LR
DB[(Database)] -->|Source Connector| KC[Kafka Connector]
API[REST API] -->|Source Connector| KC
KC -->|Publish| KT[Kafka Topics]
KT -->|Consume| KC2[Kafka Connector]
KC2 -->|Sink Connector| ES[(Elasticsearch)]
KC2 -->|Sink Connector| S3[(S3 Storage)]
KC2 -->|Sink Connector| HDFS[(HDFS)]
style DB fill:#0984e3,stroke:#333,color:#fff
style API fill:#0984e3,stroke:#333,color:#fff
style KC fill:#00b894,stroke:#333,color:#fff
style KT fill:#fdcb6e,stroke:#333,color:#000
style KC2 fill:#00b894,stroke:#333,color:#fff
style ES fill:#6c5ce7,stroke:#333,color:#fff
style S3 fill:#6c5ce7,stroke:#333,color:#fff
style HDFS fill:#6c5ce7,stroke:#333,color:#fff
Stream processing using SQL-like queries
Features:
graph TB KT[Kafka Topics] --> KSQL[KSQL DB Engine] KSQL -->|SQL Queries| ST[Streams & Tables] ST -->|Process| KSQL KSQL -->|Write Results| KT2[Output Topics] KSQL -->|Materialized Views| MV[State Stores] style KT fill:#fdcb6e,stroke:#333,color:#000 style KSQL fill:#fd79a8,stroke:#333,color:#fff style ST fill:#6c5ce7,stroke:#333,color:#fff style KT2 fill:#fdcb6e,stroke:#333,color:#000 style MV fill:#00b894,stroke:#333,color:#fff
Real-time data filtering and transformation
COUNT, SUM, AVG with GROUP BY
Stream-stream and stream-table joins
Tumbling, hopping, and session windows
-- Create a stream from Kafka topic
CREATE STREAM logs_stream (
timestamp VARCHAR,
app VARCHAR,
level VARCHAR,
msg VARCHAR
) WITH (
KAFKA_TOPIC='logs',
VALUE_FORMAT='JSON'
);
-- Create order service error stream
CREATE STREAM order_errors AS
SELECT timestamp, app, msg
FROM logs_stream
WHERE level = 'ERROR'
AND app = 'order-service';
-- Aggregate error counts by app in last 1 hour
CREATE TABLE error_counts AS
SELECT app,
COUNT(*) AS error_count,
COLLECT_LIST(msg) AS recent_errors
FROM logs_stream
WHERE level = 'ERROR'
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY app;