How to use connector management¶
You have external data stores that you want to read from and write to with ksqlDB, but you don’t want to write custom glue code to do it. ksqlDB is capable of using the vast ecosystem of Kafka Connect connectors through its SQL syntax. This functionality is called connector management.
1 2 3 4 5 6 7 8 9
Before you can use connector management, you need to decide what mode you want to run connectors in. ksqlDB can run connectors in two different modes: embedded or external. This controls how and where the connectors are executed. The way in which you configure ksqlDB's server determines which mode it will use. All nodes in a single ksqlDB cluster must use the same mode.
Regardless of which mode you use, the syntax to create and use connectors is the same.
In embedded mode, ksqlDB runs connectors directly on its servers. This is convenient because it reduces the number of moving parts that you need to manage in your infrastructure. Embedded mode is highly useful for development, testing, and production workloads that have light/moderate data volumes. Use this mode when you don't need to scale your ingest/egress capacity independently from your processing capacity. When you use embedded mode, ksqlDB server is actually running a Kafka Connect server in distributed mode.
Before you can use an embedded connector, you need to download it prior to starting ksqlDB. A downloaded connector package is essentially a set of jars that contain the code for interacting with the target data store.
The easiest way to download a connector is to use
confluent-hub, a utility program distributed by Confluent.
Create a directory for your connectors:
Run the following command to get the Voluble data generator connector:
After running this command,
confluent-hub-components should contain the Voluble jars. If you are running in clustered mode, you must install the connector on every server.
When you have all the connectors that you need, configure ksqlDB to find them.
You must restart all of the ksqlDB servers to finish installing the new connectors.
You control whether ksqlDB uses embedded mode by supplying the server configuration property
ksql.connect.worker.config with the path to a Kafka Connect configuration file. Although embedded mode eases the operational burden of running a full Kafka Connect cluster, it doesn't dilute Kafka Connect's power. Any property that you can configure a regular Kafka Connect cluster with can also be applied to embedded mode.
There are a number of properties that you must set to have a valid Connect setup. Refer to the Kafka Connect documentation to learn about the right properties to set. One critical property is
ksql.connect.plugin.path, which specifies the path to find the connector jars. If you're using Docker, use a volume to mount your connector jars from your host into the container.
If you're deploying with Docker, you can skip setting
ksql.connect.worker.config. ksqlDB will look for environment variables prefixed with
KSQL_CONNECT_. If it finds any, it will remove the
KSQL_ prefix and place them into a Connect configuration file. Embedded mode will use that configuration file. This is a convenience to avoid creating and mounting a separate configuration file.
To get started, here is a Docker Compose example with a server configured for embedded mode. All
KSQL_ environment variables are converted automatically to server configuration properties. Any connectors installed on your host at
confluent-hub-components are loaded. Save this in a file named
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
Bring up the stack with:
Launching a connector¶
Now that ksqlDB has a connector and is configured to run it in embedded mode, you can launch it. Start by running ksqlDB's CLI:
Starting a connector is as simple as giving it a name and properties. In this example, you launch the Voluble connector to source random events into a Kafka topic. Run the following SQL statement:
1 2 3 4 5 6 7 8 9
Here is what this ksqlDB statement does:
- ksqlDB interacts with Kafka Connect to create a new source connector named
- Kafka Connect infers that
sis a Voluble connector because of the value of
connector.class. Kafka Connect searches its plugin path to find a connector that matches the specified class.
- ksqlDB passes the remaining properties directly to the Voluble connector so that it can configure itself.
- Voluble publishes a new event to topic
500milliseconds with a UUID key and a map value of two keys,
The properties are the same that you would pass to a connector if it was running in a dedicated Connect cluster. You can pass any properties that the connector or Kafka Connect respects, like
max.tasks to scale the number of instances of the connector.
Check that the connector is working by printing the contents of the
people topic, which connector
Because the data is random, your output should look roughly like the following:
1 2 3 4 5 6 7
When you're done, you can drop the connector by running:
You can confirm that the connector is no longer running by looking at the output of
Embedded Kafka Connect logs messages inline with ksqlDB's server's log messages. View them by running the following command:
Introspecting embedded mode¶
Sometimes you might need a little more power to introspect how your connectors are behaving by interacting directly with the embedded Kafka Connect server. First, notice that ksqlDB is really just wrapping a regular Kafka Connect server. You can curl it and interact with its REST API just like any other Connect server.
Your output should resemble:
This can be really useful if you're having trouble getting a connector to load or need more insight into how connector tasks are behaving.
In external mode, ksqlDB communicates with an external Kafka Connect cluster. It's able to create and destroy connectors as needed. Use external mode when you have high volumes of input and output, or need to scale your ingest/egress capacity independently from your processing capacity.
External mode essentially works the same way as embedded mode, except connectors run outside of ksqlDB's servers. All that is needed is to configure ksqlDB server with the
ksql.connect.url property, indicating the address of the Connect server. Beyond that, you can manage connectors exactly as you would in embedded mode. No other configuration is needed.
This guide omits an example of setting up an external Kafka Connect cluster. Many great examples, like Apache Kafka's, have already been published.
Tear down the stack¶
When you're done, tear down the stack by running: