Create a Table
In ksqlDB, you create tables from existing Apache Kafka® topics, create tables that will create new Kafka topics, or create tables of query results from other tables or streams.
- Use the CREATE TABLE statement to create a table from an existing Kafka topic, or a new Kafka topic.
- Use the CREATE TABLE AS SELECT statement to create a table with query results from an existing table or stream.
Note
Creating streams is similar to creating tables. For more information, see Create a ksqlDB Stream.
ksqlDB can't infer the topic value's data format, so you must provide the
format of the values that are stored in the topic. In this example, the
data format is JSON
. For all supported formats, see
Serialization Formats.
ksqlDB requires keys to be serialized using Kafka's own serializers or
compatible serializers. For supported data types, see the KAFKA
format.
If the data in your Kafka topics doesn't have a suitable key format,
see Key Requirements.
Create a Table from an existing Kafka Topic¶
Use the CREATE TABLE statement to create a table from an existing underlying Kafka topic.
The following examples show how to create tables from a Kafka topic
named users
.
Create a Table with Selected Columns¶
The following example creates a table that has four columns from the
users
topic: registertime
, userid
, gender
, and regionid
.
The userid
column is the primary key of the table. This means that it's loaded from the Kafka message
key. Primary key columns can't be NULL
.
In the ksqlDB CLI, paste the following CREATE TABLE statement:
1 2 3 4 5 6 7 8 9 |
|
Your output should resemble:
1 2 3 4 |
|
Inspect the table by using the SHOW TABLES and DESCRIBE statements:
1 |
|
Your output should resemble:
1 2 3 4 |
|
Get the schema for the table:
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 9 |
|
Create a continuous streaming query on the users
table by using the
SELECT statement:
1 |
|
Assuming the table has content, your output should resemble:
1 2 3 4 5 6 7 8 |
|
Press Ctrl+C to stop printing the query results.
The table values update continuously with the most recent records,
because the underlying users
topic receives new messages continuously.
Creating a Table using Schema Inference¶
For supported serialization formats,
ksqlDB can integrate with Confluent Schema Registry.
ksqlDB can use Schema Inference to
spare you from defining columns manually in your CREATE TABLE
statements.
The following example creates a table over an existing topic, loading the value column definitions from Schema Registry.
1 2 3 4 5 6 |
|
For more information, see Schema Inference.
Create a Table backed by a new Kafka Topic¶
Use the CREATE TABLE statement to create a table without a preexisting topic by providing the PARTITIONS count, and optionally the REPLICA count, in the WITH clause.
Taking the example of the users table from above, but where the underlying Kafka topic does not already exist, you can create the table by pasting the following CREATE TABLE statement into the CLI:
1 2 3 4 5 6 7 8 9 10 11 |
|
This will create the users topics for you with the supplied partition and replica count.
Create a ksqlDB Table with Streaming Query Results¶
Use the CREATE TABLE AS SELECT statement to create a ksqlDB table view that contains the results of a SELECT query from another table or stream.
CREATE TABLE AS SELECT creates a new ksqlDB table with a corresponding Kafka topic and streams the result of the SELECT query as a changelog into the topic. ksqlDB creates a persistent query that runs continuously until you terminate it explicitly.
The following SQL statement creates a users_female
table that
contains results from a persistent query for users that have gender
set to FEMALE
:
1 2 3 4 5 |
|
Your output should resemble:
1 2 3 4 |
|
Inspect the table by using the SHOW TABLES and PRINT statements:
1 |
|
Your output should resemble:
1 2 3 4 5 |
|
Print some rows in the table:
1 |
|
Your output should resemble:
1 2 3 4 5 6 |
|
Press Ctrl+C to stop printing the table.
Note
The query continues to run after you stop printing the table.
Use the SHOW QUERIES statement to view the query that ksqlDB created for
the users_female
table:
1 |
|
Your output should resemble:
1 2 3 4 5 |
|
A persistent query that's created by the CREATE TABLE AS SELECT
statement has the string CTAS
in its ID, for example,
CTAS_USERS_FEMALE_0
.
Create a ksqlDB Table from a ksqlDB Stream¶
Use the CREATE TABLE AS SELECT statement to create a table from a stream. Creating a table from a stream requires aggregation, so you need to include a function like COUNT(*) in the SELECT clause.
1 2 3 4 5 |
|
Your output should resemble:
1 2 3 4 |
|
Observe the changes happening to the table by using a streaming SELECT statement.
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Note
It is possible for the same key to be output multiple time when emitting changes to the table. This is because each time the row in the table changes it will be emitted.
Look up the value for a specific key within the table by using a SELECT statement.
1 |
|
Your output should resemble:
1 2 3 4 5 |
|
Delete a ksqlDB Table¶
Use the DROP TABLE statement to delete a table. If you created the table by using CREATE TABLE AS SELECT, you must first terminate the corresponding persistent query.
Use the TERMINATE statement to stop the CTAS_USERS_FEMALE_0
query:
1 |
|
Your output should resemble:
1 2 3 4 |
|
Use the DROP TABLE statement to delete the users_female
table:
1 |
|
Your output should resemble:
1 2 3 4 |
|