Beyond Kafka

Schema, Streams & Connectors

Ajithkumar Sekar
Senior Software Developer at Weave Communications

Agenda 📋

  • 🚀 Kafka Intro
  • 📝 Schemas
  • 🔌 Kafka Connectors
  • 💾 KSQL DB

🚀 Kafka Introduction

Apache Kafka is a distributed event streaming platform

Producers → publish messages/events to topics

Topics → categorized streams of data stored in clusters

Consumers → subscribe to topics and process messages

High throughput, scalable, and fault-tolerant messaging system

Kafka Architecture (simplified)

graph LR
    P1[Producer 1] -->|publish| KC
    P2[Producer 2] -->|publish| KC
    
    subgraph KC[Kafka Cluster]
        T1[Topic: orders]
    end
    
    KC -->|subscribe| C1[Consumer 1]
    KC -->|subscribe| C2[Consumer 2]
    KC -->|subscribe| C3[Consumer 3]
    
    style P1 fill:#d63031,stroke:#333,color:#fff
    style P2 fill:#d63031,stroke:#333,color:#fff
    style T1 fill:#0984e3,stroke:#333,color:#fff
    style KC fill:#fdcb6e,stroke:#333,color:#000
    style C1 fill:#00b894,stroke:#333,color:#fff
    style C2 fill:#00b894,stroke:#333,color:#fff
    style C3 fill:#00b894,stroke:#333,color:#fff
            

Python Producer 📤


from kafka import KafkaProducer
import json

# Initialize producer
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# Send message
message = {'order_id': 12345, 'product': 'Laptop', 'quantity': 2}

producer.send('orders', value=message)
producer.flush()
				

Python Consumer 📥


from kafka import KafkaConsumer
import json

# Initialize consumer
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    auto_offset_reset='earliest',
    group_id='order-processor'
)

# Process messages
for message in consumer:
    order = message.value
    print(f"An order of {order['product']} with quantity {order['quantity']} received.")
				

Questions to Consider

What happens once an order message is consumed?

Do you see any inefficiencies in the events being sent?

Challenges with Raw JSON

  • Kafka stores events on disk until retention period expires
  • Repetitive JSON key names for every single event = wasted storage
  • Lack of contract between producer and consumer
  • No validation = potential runtime errors

Schema Registry

Centralized service for managing and validating schemas

Benefits:

  • Data contract enforcement
  • Message validation
  • Reduced storage (Avro, Protobuf)
  • Schema evolution support

Serialization Formats

Avro


{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "int"},
    {"name": "product", "type": "string"},
    {"name": "quantity", "type": "int"}
  ]
}
						
  • Compact binary format
  • Schema in JSON
  • Dynamic typing

Protobuf


message Order {
  int32 order_id = 1;
  string product = 2;
  int32 quantity = 3;
}
						
  • Highly efficient
  • Strongly typed
  • Language-agnostic

Storage Comparison: JSON vs Avro

JSON (78 bytes)


{"order_id": 12345, 
 "product": "Laptop", 
 "quantity": 2}
{"order_id": 12346, 
 "product": "Mouse", 
 "quantity": 5}
{"order_id": 12347, 
 "product": "Keyboard", 
 "quantity": 1}
						

❌ Repetitive keys

❌ Wasteful storage

Avro (≈40 bytes)


[ID:001][0x3039 
        "Laptop" 0x02]
[ID:001][0x303A 
        "Mouse" 0x05]
[ID:001][0x303B 
        "Keyboard" 0x01]
						

✓ Schema separate

✓ 50% reduction

Schema Registry Architecture

graph TB
    P[Producer] -->|1. Get Schema| SR[Schema Registry]
    SR -->|2. Return Schema ID| P
    P -->|3. Serialize with Schema ID| KC[Kafka Cluster]
    KC -->|4. Read Message| C[Consumer]
    C -->|5. Get Schema by ID| SR
    SR -->|6. Return Schema| C
    
    style P fill:#d63031,stroke:#333,color:#fff
    style SR fill:#6c5ce7,stroke:#333,color:#fff
    style KC fill:#fdcb6e,stroke:#333,color:#000
    style C fill:#00b894,stroke:#333,color:#fff
				

Live Demo

Kafka Connectors

Framework for integrating Kafka with external systems

Types of Connectors:

  • Source Connectors (External System → Kafka)
  • Sink Connectors (Kafka → External System)

Kafka Connect Overview

graph LR
    DB[(Database)] -->|Source Connector| KC[Kafka Connector]
    API[REST API] -->|Source Connector| KC
    KC -->|Publish| KT[Kafka Topics]
    KT -->|Consume| KC2[Kafka Connector]
    KC2 -->|Sink Connector| ES[(Elasticsearch)]
    KC2 -->|Sink Connector| S3[(S3 Storage)]
    KC2 -->|Sink Connector| HDFS[(HDFS)]
    
    style DB fill:#0984e3,stroke:#333,color:#fff
    style API fill:#0984e3,stroke:#333,color:#fff
    style KC fill:#00b894,stroke:#333,color:#fff
    style KT fill:#fdcb6e,stroke:#333,color:#000
    style KC2 fill:#00b894,stroke:#333,color:#fff
    style ES fill:#6c5ce7,stroke:#333,color:#fff
    style S3 fill:#6c5ce7,stroke:#333,color:#fff
    style HDFS fill:#6c5ce7,stroke:#333,color:#fff
				

Live Demo

KSQL DB

Stream processing using SQL-like queries

Features:

  • Real-time data transformation
  • Filtering and aggregations
  • Joining streams and tables
  • Materialized views

KSQL DB Architecture

				graph TB
					KT[Kafka Topics] --> KSQL[KSQL DB Engine]
					KSQL -->|SQL Queries| ST[Streams & Tables]
					ST -->|Process| KSQL
					KSQL -->|Write Results| KT2[Output Topics]
					KSQL -->|Materialized Views| MV[State Stores]
					
					style KT fill:#fdcb6e,stroke:#333,color:#000
					style KSQL fill:#fd79a8,stroke:#333,color:#fff
					style ST fill:#6c5ce7,stroke:#333,color:#fff
					style KT2 fill:#fdcb6e,stroke:#333,color:#000
					style MV fill:#00b894,stroke:#333,color:#fff
			

KSQL DB Capabilities

Filter & Transform

Real-time data filtering and transformation

Aggregations

COUNT, SUM, AVG with GROUP BY

Joins

Stream-stream and stream-table joins

Windowing

Tumbling, hopping, and session windows

KSQL Example: Create Stream


-- Create a stream from Kafka topic
CREATE STREAM logs_stream (
  timestamp VARCHAR,
  app VARCHAR,
  level VARCHAR,
  msg VARCHAR
) WITH (
  KAFKA_TOPIC='logs',
  VALUE_FORMAT='JSON'
);
				

KSQL Example: Filter Stream


-- Create order service error stream
CREATE STREAM order_errors AS
  SELECT timestamp, app, msg
  FROM logs_stream
  WHERE level = 'ERROR'
    AND app = 'order-service';
				

KSQL Example: Aggregation


-- Aggregate error counts by app in last 1 hour
CREATE TABLE error_counts AS
  SELECT app,
         COUNT(*) AS error_count,
         COLLECT_LIST(msg) AS recent_errors
  FROM logs_stream
  WHERE level = 'ERROR'
  WINDOW TUMBLING (SIZE 1 HOUR)
  GROUP BY app;
				

Live Demo

Questions?

Thank You!