product_reviews Kafka topic and insert them into a PostgreSQL tablekafka-connect-jdbc-10.8.4.jar and postgresql-42.4.4.jar JARs inside the jdbc directory of the specified plugin.path in connect-standalone.propertiesbin/connect-standalone.sh config/connect-standalone.properties
CREATE TABLE IF NOT EXISTS product_reviews (
review_id text PRIMARY KEY,
product_id text,
user_id text,
rating int,
review_text text,
created_at timestamptz
);
product_reviews events by running:
uv run connector/produce.py 30
product-reviews-pg-sink connector by calling this endpoint:
curl --location 'http://localhost:8083/connectors' \
--header 'Content-Type: application/json' \
--data '{
"name": "product-reviews-pg-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "product_reviews",
"connection.url": "jdbc:postgresql://localhost:5432/ajith",
"connection.user": "postgres",
"connection.password": "ajith",
"insert.mode": "insert",
"pk.mode": "none",
"auto.create": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"delete.enabled": "false"
}
}'
Now all events (both produced and being produced) will automatically be inserted into the PostgreSQL table without writing a single line of code!