Integrate with PostgreSQL
This tutorial demonstrates a simple workflow to integrate ksqlDB with an instance of PostgreSQL.
- Confluent Platform is installed an running. This installation includes a Kafka broker, ksqlDB, ZooKeeper, Schema Registry and Connect.
- If you installed Confluent Platform via TAR or ZIP, navigate into the installation directory. The paths and commands used throughout this tutorial assume that you are in this installation directory.
- Consider installing the Confluent CLI to start a local installation of Confluent Platform.
- Java: Minimum version 1.8. Install Oracle Java JRE or JDK >= 1.8 on your local machine
Installing JDBC Source Connector Plugin¶
If you installed Kafka Connect via Confluent Platform, then it comes with an installation of the JDBC source connector. Otherwise, install it from Confluent Hub.
Installing Postgres via Docker¶
If you are experimenting with the ksqlDB-Connect integration and don't have a PostgreSQL instance locally, you can install it by using Docker and populate some data:
Install PostgreSQL by using the
docker pull postgres command.
Start the database and expose the JDBC port:
Run PSQL to generate some data:
1 2 3 4 5 6 7 8 9 10 11 12
When you're done, clear your local state by using the
docker kill command.
Create a JDBC Source Connector¶
Now that Postgres is up and running with a database for your user, you
can connect to it via ksqlDB. If you're using the default configurations,
ksqlDB connects automatically to your Connect cluster.
Otherwise, you must change the
ksql.connect.url property to point to
your Connect deployment.
1 2 3 4 5 6
At this point, data should automatically start flowing in from Postgres to ksqlDB. Confirm this by running the following statement.
Your output should resemble:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Import this topic as a table to ksqlDB by using the following command.
Select everything from the topic to see how it gets auto populated:
You output should resemble:
1 2 3 4 5 6 7 8 9
Note that users are repeated multiple times. This means that
bulk mode is
specified, which re-imports the entire database every time. Obviously, this
isn't appropriate for production. For more information on changelog capture,
Incremental Query Modes.
Page last revised on: 2020-04-29