Skip to content

Overview

This tutorial will demonstrate how to integrate ksqlDB with an external data source to power a simple ride sharing app. Our external source will be a PostgreSQL database containing relatively static data describing each driver’s vehicle. By combining this human-friendly static data with a continuous stream of computer-friendly driver and rider location events, we derive an enriched output stream that the ride sharing app may use to facilitate a rendezvous in real time.

When to use embedded Connect

ksqlDB natively integrates with Connect by either communicating with an external Connect cluster or by running Connect embedded within the KSQL server process. Each of these modes is best suited for the following environments:

  • Embedded - Suitable for development, testing, and simpler production workloads at lower throughputs when there is no need to scale ksqlDB independently of Connect.
  • External - Suitable for all production workloads.

Note

The Connect integration mode is a deployment configuration option. The Connect integration interface is identical for both modes, so your CREATE SOURCE and CREATE SINK statements are independent of the integration mode.

1. Get ksqlDB

Since ksqlDB runs natively on Apache Kafka®, you need a running Kafka installation that ksqlDB is configured to use. The following docker-compose files run everything for you via Docker, including ksqlDB running Kafka Connect in embedded mode. Embedded Connect enables you to leverage the power of Connect without having to manage a separate Connect cluster, because ksqlDB manages one for you. Also, this tutorial use PostgreSQL as an external datastore to integrate with ksqlDB.

In an empty local working directory, copy and paste the following docker-compose content into a file named docker-compose.yml. You will create and add a number of other files to this directory during this tutorial.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:latest
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.8.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_CONNECT_WORKER_CONFIG: "/connect/connect.properties"
      KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "ksql-connect-configs"
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "ksql-connect-offsets"
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: "ksql-connect-statuses"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"
    volumes:
      - ./confluent-hub-components/confluentinc-kafka-connect-jdbc:/usr/share/kafka/plugins/jdbc

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.8.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  postgres:
    image: postgres:12
    hostname: postgres
    container_name: postgres
    ports:
      - "5432:5432"
    environment:
      POSTGRES_PASSWORD: password

2. Get the JDBC connector

The easiest way to download connectors for use in ksqlDB with embedded Connect is via the Confluent Hub Client included in the ksqlDB-server Docker image.

To download the JDBC connector, use the following command:

1
docker run -v $PWD/confluent-hub-components:/share/confluent-hub-components confluentinc/ksqldb-server:0.8.1 confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.5.0

This command downloads the JDBC connector into the directory ./confluent-hub-components.

3. Start ksqlDB and PostgreSQL

In the directory containing the docker-compose.yml file you created in the first step, run the following command to start all services in the correct order.

1
docker-compose up

4. Connect to PostgreSQL

Run the following command to establish an interactive session with PostgreSQL.

1
docker exec -it postgres psql -U postgres

5. Populate PostgreSQL with vehicle/driver data

In the PostgreSQL session, run the following SQL statements to set up the driver data. You will join this PostgreSQL data with event streams in ksqlDB.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
CREATE TABLE driver_profiles (
  driver_id integer PRIMARY KEY,
  make text,
  model text,
  year integer,
  license_plate text,
  rating float
);

INSERT INTO driver_profiles (driver_id, make, model, year, license_plate, rating) VALUES
  (0, 'Toyota', 'Prius',   2019, 'KAFKA-1', 5.00),
  (1, 'Kia',    'Sorento', 2017, 'STREAMS', 4.89),
  (2, 'Tesla',  'Model S', 2019, 'CNFLNT',  4.92),
  (3, 'Toyota', 'Camry',   2018, 'ILVKSQL', 4.85);

6. Start ksqlDB's interactive CLI

ksqlDB runs as a server which clients connect to in order to issue queries.

Run the following command to connect to the ksqlDB server and start an interactive command-line interface (CLI) session.

1
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

7. Create source connector

Make your PostgreSQL data accessible to ksqlDB by creating a source connector. In the ksqlDB CLI, run the following command.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE SOURCE CONNECTOR jdbc_source WITH (
  'connector.class'          = 'io.confluent.connect.jdbc.JdbcSourceConnector',
  'connection.url'           = 'jdbc:postgresql://postgres:5432/postgres',
  'connection.user'          = 'postgres',
  'connection.password'      = 'password',
  'topic.prefix'             = 'jdbc_',
  'table.whitelist'          = 'driver_profiles',
  'mode'                     = 'incrementing',
  'numeric.mapping'          = 'best_fit',
  'incrementing.column.name' = 'driver_id',
  'key'                      = 'driver_id',
  'key.converter'            = 'org.apache.kafka.connect.converters.IntegerConverter');

When the source connector is created, it imports any PostgreSQL tables matching the specified table.whitelist. Tables are imported via Kafka topics, with one topic per imported table. Once these topics are created, you can interact with them just like any other Kafka topic used by ksqlDB.

8. View imported topic

In the ksqlDB CLI session, run the following command to verify that the driver_profiles table has been imported as a Kafka topic. Because you specified jdbc_ as the topic prefix, you should see a jdbc_driver_profiles topic in the output.

1
SHOW TOPICS;

9. Create drivers table in ksqlDB

The driver data is now integrated as a Kafka topic, but you need to create a ksqlDB table over this topic to begin referencing it from ksqlDB queries. Streams and tables in ksqlDB essentially associate a schema with a Kafka topic, breaking each message in the topic into strongly typed columns.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
CREATE TABLE driverProfiles (
  rowkey INTEGER KEY,
  driver_id INTEGER,
  make STRING,
  model STRING,
  year INTEGER,
  license_plate STRING,
  rating DOUBLE
)
WITH (kafka_topic='jdbc_driver_profiles', value_format='json', key='driver_id');

Tables in ksqlDB support update semantics, where each message in the underlying topic represents a row in the table. For messages in the topic with the same key, the latest message associated with a given key represents the latest value for the corresponding row in the table.

Note

When the data is ingested from the database, it's being written

to the Kafka topic using JSON serialization. Since JSON itself doesn't declare a schema, you need to declare it again when you run CREATE TABLE. In practice, you would normally use Avro or Protobuf, since this supports the retention of schemas, ensuring compatibility between producers and consumers. This means that you don't have to enter it each time you want to use the data in ksqlDB.

10. Create streams for driver locations and rider locations

In this step, you create streams over new topics to encapsulate location pings that are sent every few seconds by drivers’ and riders’ phones. In contrast to tables, ksqlDB streams are append-only collections of events, so they're suitable for a continuous stream of location updates.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
CREATE STREAM driverLocations (
  rowkey INTEGER KEY,
  driver_id INTEGER,
  latitude DOUBLE,
  longitude DOUBLE,
  speed DOUBLE
)
WITH (kafka_topic='driver_locations', value_format='json', partitions=1, key='driver_id');

CREATE STREAM riderLocations (
  rowkey INTEGER KEY,
  driver_id INTEGER,
  latitude DOUBLE,
  longitude DOUBLE
)
WITH (kafka_topic='rider_locations', value_format='json', partitions=1, key='driver_id');

11. Enrich driverLocations stream by joining with PostgreSQL data

The driverLocations stream has a relatively compact schema, and it doesn’t contain much data that a human would find particularly useful. You can enrich the stream of driver location events by joining them with the human-friendly vehicle information stored in the PostgreSQL database. This enriched data can be presented by the rider’s mobile application, ultimately helping the rider to safely identify the driver’s vehicle.

You can achieve this result easily by joining the driverLocations stream with the driver_profiles table stored in PostgreSQL.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
CREATE STREAM enrichedDriverLocations AS
  SELECT
    dl.driver_id       AS driver_id,
    dl.latitude        AS latitude,
    dl.longitude       AS longitude,
    dl.speed           AS speed,
    jdbc.make          AS make,
    jdbc.model         AS model,
    jdbc.year          AS year,
    jdbc.license_plate AS license_plate,
    jdbc.rating        AS rating
  FROM driverLocations dl JOIN driverProfiles jdbc
    ON dl.driver_id = jdbc.driver_id
  EMIT CHANGES;

12. Create the rendezvous stream

To put all of this together, create a final stream that the ridesharing app can use to facilitate a driver-rider rendezvous in real time. This stream is defined by a query that joins together rider and driver location updates, resulting in a contextualized output that the app can use to show the rider their driver’s position as the rider waits to be picked up.

The rendezvous stream includes human-friendly information describing the driver’s vehicle for the rider. Also, the rendezvous stream computes (albeit naively) the driver’s estimated time of arrival (ETA) at the rider’s location.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE STREAM rendezvous AS
  SELECT
    e.license_plate AS license_plate,
    e.make          AS make,
    e.model         AS model,
    e.year          AS year,
    e.latitude      AS vehicle_lat,
    e.longitude     AS vehicle_long,
    GEO_DISTANCE(e.latitude, e.longitude, r.latitude, r.longitude) / e.speed AS eta
  FROM enrichedDriverLocations e JOIN riderLocations r WITHIN 1 MINUTE
    ON e.driver_id = r.driver_id
  EMIT CHANGES;

13. Start two ksqlDB CLI sessions

Run the following command twice to open two separate ksqlDB CLI sessions. If you still have a CLI session open from a previous step, you can reuse that session.

1
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

14. Run a continuous query

In this step, you run a continuous query over the rendezvous stream.

This may feel a bit unfamiliar, because the query never returns until you terminate it. The query perpetually pushes output rows to the client as events are written to the rendezvous stream. Leave the query running in your CLI session for now. It will begin producing output as soon as events are written into ksqlDB.

1
SELECT * FROM rendezvous EMIT CHANGES;

15. Write data to input streams

Your continuous query reads from the rendezvous stream, which takes its input from the enrichedDriverLocations and riderLocations streams. And enrichedDriverLocations takes its input from the driverLocations stream, so you need to write data into driverLocations and riderLocations before rendezvous produces the joined output that the continuous query reads.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
INSERT INTO driverLocations (driver_id, latitude, longitude, speed) VALUES (0, 37.3965, -122.0818, 23.2);
INSERT INTO riderLocations (driver_id, latitude, longitude) VALUES (0, 37.3952, -122.0813);

INSERT INTO driverLocations (driver_id, latitude, longitude, speed) VALUES (1, 37.7850, -122.40270, 12.0);
INSERT INTO riderLocations (driver_id, latitude, longitude) VALUES (1, 37.7887, -122.4074);

INSERT INTO driverLocations (driver_id, latitude, longitude, speed) VALUES (2, 37.7925, -122.4148, 11.2);
INSERT INTO riderLocations (driver_id, latitude, longitude) VALUES (2, 37.7876, -122.4235);

INSERT INTO driverLocations (driver_id, latitude, longitude, speed) VALUES (3, 37.4471, -122.1625, 14.7);
INSERT INTO riderLocations (driver_id, latitude, longitude) VALUES (3, 37.4442, -122.1658);

As soon as you start writing rows to the input streams, your continuous query from the previous step starts producing joined output. The rider's location pings are joined with their inbound driver's location pings in real time, providing the rider with driver ETA, rating, and additional information describing the driver's vehicle.

Next steps

This tutorial shows how to run ksqlDB in embedded Connect mode using Docker. It uses the JDBC connector to integrate ksqlDB with PostgreSQL data, but this is just one of many connectors that are available to help you integrate ksqlDB with external systems. Check out Confluent Hub to learn more about all of the various connectors that enable integration with a wide variety of external systems.

You may also want to take a look at our examples to better understand how you can use ksqlDB for your specific workload.


Last update: 2020-04-21