Create a Table
In ksqlDB, you create tables from existing Apache Kafka® topics, create tables that will create new Kafka topics, or create tables of query results from other tables or streams.
- Use the CREATE TABLE statement to create a table from an existing Kafka topic, or a new Kafka topic.
- Use the CREATE TABLE AS SELECT statement to create a table with query results from an existing table or stream.
Note
Creating streams is similar to creating tables. For more information, see Create a ksqlDB Stream.
Create a Table from an existing Kafka Topic¶
Use the CREATE TABLE statement to create a table from an existing underlying Kafka topic. The Kafka topic must exist already in your Kafka cluster.
The following examples show how to create tables from a Kafka topic,
named users
. To see these examples in action, create the users
topic
by following the procedure in
Write Streaming Queries Against Apache Kafka® Using ksqlDB.
Create a Table with Selected Columns¶
The following example creates a table that has four columns from the
users
topic: registertime
, userid
, gender
, and regionid
. Also,
the userid
field is assigned as the table's KEY property.
Note
The KEY field is optional. For more information, see Key Requirements.
ksqlDB can't infer the topic value's data format, so you must provide the
format of the values that are stored in the topic. In this example, the
data format is JSON
. Other options are Avro
, DELIMITED
, JSON_SR
, PROTOBUF
, and KAFKA
. For
more information, see
Serialization Formats.
ksqlDB requires keys to have been serialized using Kafka's own serializers or compatible
serializers. ksqlDB supports INT
, BIGINT
, DOUBLE
, and STRING
key types.
In the ksqlDB CLI, paste the following CREATE TABLE statement:
1 2 3 4 5 6 7 8 |
|
Your output should resemble:
1 2 3 4 |
|
Inspect the table by using the SHOW TABLES and DESCRIBE statements:
1 |
|
Your output should resemble:
1 2 3 4 |
|
Get the schema for the table:
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 9 10 11 |
|
Create a continuous streaming query on the users
table by using the
SELECT statement:
1 |
|
Assuming the table has content, your output should resemble:
1 2 3 4 5 6 7 8 |
|
Press Ctrl+C to stop printing the query results.
The table values update continuously with the most recent records,
because the underlying users
topic receives new messages continuously.
Create a Table backed by a new Kafka Topic¶
Use the CREATE TABLE statement to create a table without a preexisting topic by providing the PARTITIONS count, and optionally the REPLICA count, in the WITH clause.
Taking the example of the users table from above, but where the underlying Kafka topic does not already exist, you can create the table by pasting the following CREATE TABLE statement into the CLI:
1 2 3 4 5 6 7 8 9 10 |
|
This will create the users topics for you with the supplied partition and replica count.
Create a ksqlDB Table with Streaming Query Results¶
Use the CREATE TABLE AS SELECT statement to create a ksqlDB table that contains the results of a SELECT query from another table or stream.
CREATE TABLE AS SELECT creates a new ksqlDB table with a corresponding Kafka topic and streams the result of the SELECT query as a changelog into the topic. ksqlDB creates a persistent query that runs continuously until you terminate it explicitly.
To stream the result of a SELECT query into an existing table and its underlying topic, use the INSERT INTO statement.
The following SQL statement creates a users_female
table that
contains results from a persistent query for users that have gender
set to FEMALE
:
1 2 3 4 |
|
Your output should resemble:
1 2 3 4 |
|
Inspect the table by using the SHOW TABLES and PRINT statements:
1 |
|
Your output should resemble:
1 2 3 4 5 |
|
Print some rows in the table:
1 |
|
Your output should resemble:
1 2 3 4 5 6 |
|
Press Ctrl+C to stop printing the table.
Note
The query continues to run after you stop printing the table.
Use the SHOW QUERIES statement to view the query that ksqlDB created for
the users_female
table:
1 |
|
Your output should resemble:
1 2 3 4 5 |
|
A persistent query that's created by the CREATE TABLE AS SELECT
statement has the string CTAS
in its ID, for example,
CTAS_USERS_FEMALE_0
.
Create a ksqlDB Table from a ksqlDB Stream¶
Use the CREATE TABLE AS SELECT statement to create a table from a stream. Creating a table from a stream requires aggregation, so you need to include a function like COUNT(*) in the SELECT clause.
1 2 3 4 5 |
|
Your output should resemble:
1 2 3 4 |
|
Observe the changes happening to the table by using a streaming SELECT statement.
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Note
It is possible for the same key to be output multiple time when emitting changes to the table. This is because each time the row in the table changes it will be emitted.
Look up the value for a specific key within the table by using a SELECT statement.
1 |
|
Your output should resemble:
1 2 3 4 5 |
|
Delete a ksqlDB Table¶
Use the DROP TABLE statement to delete a table. If you created the table by using CREATE TABLE AS SELECT, you must first terminate the corresponding persistent query.
Use the TERMINATE statement to stop the CTAS_USERS_FEMALE_0
query:
1 |
|
Your output should resemble:
1 2 3 4 |
|
Use the DROP TABLE statement to delete the users_female
table:
1 |
|
Your output should resemble:
1 2 3 4 |
|
Next Steps¶
Page last revised on: 2020-04-29