Create a Stream
In ksqlDB, you create streams from existing Apache Kafka® topics, create streams that will create new Kafka topics, or create streams of query results from other streams.
- Use the CREATE STREAM statement to create a stream from an existing Kafka topic, or a new Kafka topic.
- Use the CREATE STREAM AS SELECT statement to create a query stream from an existing stream.
Note
Creating tables is similar to creating streams. For more information, see Create a ksqlDB Table.
Create a Stream from an existing Kafka topic¶
Use the CREATE STREAM statement to create a stream from an existing underlying Kafka topic. The Kafka topic must exist already in your Kafka cluster.
The following examples show how to create streams from a Kafka topic,
named pageviews
. To see these examples in action, create the
pageviews
topic by following the procedure in
Write Streaming Queries Against Apache Kafka® Using ksqlDB.
Create a Stream with Selected Columns¶
The following example creates a stream that has three columns from the
pageviews
topic: viewtime
, userid
, and pageid
.
ksqlDB can't infer the topic value's data format, so you must provide the
format of the values that are stored in the topic. In this example, the
data format is DELIMITED
. Other options are Avro
, JSON
, JSON_SR
, PROTOBUF
, and KAFKA
.
See Serialization Formats for more
details.
ksqlDB requires keys to have been serialized using Kafka's own serializers or compatible
serializers. ksqlDB supports INT
, BIGINT
, DOUBLE
, and STRING
key types.
In the ksqlDB CLI, paste the following CREATE STREAM statement:
1 2 3 4 5 6 |
|
Your output should resemble:
1 2 3 4 |
|
Inspect the stream by using the SHOW STREAMS and DESCRIBE statements:
1 |
|
Your output should resemble:
1 2 3 4 |
|
Get the schema for the stream:
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 9 10 |
|
Create a Stream with a Specified Key¶
The previous SQL statement makes no assumptions about the Kafka message key in the underlying Kafka topic. If the value of the message key in the topic is the same as one of the columns defined in the stream, you can specify the key in the WITH clause of the CREATE STREAM statement. If you use this column name later to perform a join or a repartition, ksqlDB knows that no repartition is needed. In effect, the named column becomes an alias for ROWKEY.
For example, if the Kafka message key has the same value as the pageid
column, you can write the CREATE STREAM statement like this:
1 2 3 4 5 6 7 |
|
Confirm that the KEY field in the new stream is pageid
by using the
DESCRIBE EXTENDED statement:
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 |
|
Create a Stream with Timestamps¶
In ksqlDB, message timestamps are used for window-based operations, like windowed aggregations, and to support event-time processing.
If you want to use the value of one of the topic's columns as the Kafka message timestamp, set the TIMESTAMP property in the WITH clause.
For example, if you want to use the value of the viewtime
column as
the message timestamp, you can rewrite the previous CREATE STREAM statement
like this:
1 2 3 4 5 6 7 8 |
|
Confirm that the TIMESTAMP field is viewtime
by using the DESCRIBE
EXTENDED statement:
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 |
|
Create a Stream backed by a new Kafka Topic¶
Use the CREATE STREAM statement to create a stream without a preexisting topic by providing the PARTITIONS count, and optionally the REPLICA count, in the WITH clause.
Taking the example of the pageviews table from above, but where the underlying Kafka topic does not already exist, you can create the stream by pasting the following CREATE STREAM statement into the CLI:
1 2 3 4 5 6 7 8 |
|
This will create the pageviews topics for you with the supplied partition and replica count.
Create a Persistent Streaming Query from a Stream¶
Use the CREATE STREAM AS SELECT statement to create a persistent query stream from an existing stream.
CREATE STREAM AS SELECT creates a stream that contains the results from a SELECT query. ksqlDB persists the SELECT query results into a corresponding new topic. A stream created this way represents a persistent, continuous, streaming query, which means that it runs until you stop it explicitly.
Note
A SELECT statement by itself is a non-persistent continuous query. The result of a SELECT statement isn't persisted in a Kafka topic and is only printed in the ksqlDB console. Don't confuse persistent queries created by CREATE STREAM AS SELECT with the streaming query result from a SELECT statement.
Use the SHOW QUERIES statement to list the persistent queries that are running currently.
Use the PRINT statement to view the results of a persistent query in the ksqlDB CLI. Press CTRL+C to stop printing records. When you stop printing, the query continues to run.
Use the TERMINATE statement to stop a persistent query. Exiting the ksqlDB CLI does not stop persistent queries. Your ksqlDB servers continue to process the queries, and queries run continuously until you terminate them explicitly.
To stream the result of a SELECT query into an existing stream and its underlying topic, use the INSERT INTO statement.
Note
The CREATE STREAM AS SELECT statement doesn't support the KEY property. To specify a KEY field, use the PARTITION BY clause. For more information, see Partition Data to Enable Joins.
The following SQL statement creates a pageviews_intro
stream that
contains results from a persistent query that matches "introductory"
pages that have a pageid
value that's less than Page_20
:
1 2 3 4 |
|
Your output should resemble:
1 2 3 4 |
|
To confirm that the pageviews_intro
query is running continuously as a
stream, run the PRINT statement:
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 9 |
|
Press Ctrl+C to stop printing the stream.
Note
The query continues to run after you stop printing the stream.
Use the SHOW QUERIES statement to view the query that ksqlDB created for
the pageviews_intro
stream:
1 |
|
Your output should resemble:
1 2 3 4 5 |
|
A persistent query that's created by the CREATE STREAM AS SELECT
statement has the string CSAS
in its ID, for example,
CSAS_PAGEVIEWS_INTRO_0
.
Delete a ksqlDB Stream¶
Use the DROP STREAM statement to delete a stream. If you created the stream by using CREATE STREAM AS SELECT, you must first terminate the corresponding persistent query.
Use the TERMINATE statement to stop the CSAS_PAGEVIEWS_INTRO_0
query:
1 |
|
Your output should resemble:
1 2 3 4 |
|
Use the DROP STREAM statement to delete a persistent query stream. You must TERMINATE the query before you can drop the corresponding stream.
1 |
|
Your output should resemble:
1 2 3 4 |
|
Next Steps¶
Page last revised on: 2020-04-29