Skip to content

CREATE STREAM

Synopsis

1
2
CREATE [OR REPLACE] [SOURCE] STREAM [IF NOT EXISTS] stream_name ( { column_name data_type [KEY] } [, ...] )
  WITH ( property_name = expression [, ...] );

Description

Create a new stream with the specified columns and properties.

If the IF NOT EXISTS clause is present, the statement won't fail if a stream with the same name already exists.

Source streams

A stream can be created as read-only if the SOURCE clause is provided. INSERTS statements and the DELETE TOPIC on DROP STREAM statements aren't permitted. Source streams do not support pull queries. Only source tables support running pull queries on them.

To disable the SOURCE stream feature, set ksql.source.table.materialization.enabled to false in your ksqlDB server properties file.

Partitioning

Assign the PARTITIONS property in the WITH clause to specify the number of partitions in the stream's backing topic. Partitioning streams and tables is especially important for stateful or otherwise intensive queries. For more information, see Parallelization.

KEY and VALUE columns

A ksqlDB STREAM is a stream of facts. Each fact is immutable and unique. A stream can store its data in either KEY or VALUE columns. Both KEY and VALUE columns can be NULL. No special processing is done if two rows have the same key. This situation is handled differently by ksqlDB TABLEs, as shown in the following table.

STREAM TABLE
Key column type KEY PRIMARY KEY
NON NULL key constraint No Yes
Records in the Kafka topic with a NULL PRIMARY KEY are ignored.
Unique key constraint No
Messages with the same key as another have no special meaning.
Yes
Later messages with the same key replace earlier.
Tombstones No
Messages with NULL values are ignored.
Yes
NULL message values are treated as a tombstone
Any existing row with a matching key is deleted.

Each column is defined by:

  • column_name: the name of the column. If unquoted, the name must be a valid SQL identifier and ksqlDB converts it to uppercase. The name can be quoted if case needs to be preserved or if the name is not a valid SQL identifier, for example `mixedCaseId` or `[email protected]!chars`.
  • data_type: the SQL type of the column. Columns can be any of the data types supported by ksqlDB.
  • KEY: columns that are stored in the Kafka message's key should be marked as KEY columns. If a column is not marked as a KEY column, ksqlDB loads it from the Kafka message's value. Unlike a table's PRIMARY KEY, a stream's keys can be NULL.

For supported serialization formats, ksqlDB can integrate with Confluent Schema Registry. ksqlDB can use Schema Inference to spare you from defining columns manually in your CREATE STREAM statements.

ROWTIME

Each row within the stream has a ROWTIME pseudo column, which represents the event time of the row. The timestamp has milliseconds accuracy. The timestamp is used by ksqlDB during any windowing operations and during joins, where data from each side of a join is generally processed in time order.

By default, ROWTIME is populated from the corresponding Kafka message timestamp. Set TIMESTAMP in the WITH clause to populate ROWTIME from a column in the Kafka message key or value.

ROWPARTITION and ROWOFFSET

Like ROWTIME, ROWPARTITION and ROWOFFSET are pseudo columns. They represent the partition and offset of the source topic. For example, if issuing a push query with ROWPARTITION or ROWOFFSET in the SELECT clause on a stream or table backed by topic x, the push query's projection will contain the partition and offset information of the underlying records in topic x.

Stream properties

Specify details about your stream by using the WITH clause, which supports the following properties:

Property Description
KAFKA_TOPIC (required) The name of the Kafka topic that backs this source. The topic must either already exist in Kafka, or PARTITIONS must be specified to create the topic. Command will fail if the topic exists with different partition/replica counts.
KEY_FORMAT Specifies the serialization format of the message key in the topic. For supported formats, see Serialization Formats.
If not supplied, the system default, defined by ksql.persistence.default.format.key, is used. If the default is also not set the statement will be rejected as invalid.
VALUE_FORMAT Specifies the serialization format of the message value in the topic. For supported formats, see Serialization Formats.
If not supplied, the system default, defined by ksql.persistence.default.format.value, is used. If the default is also not set the statement will be rejected as invalid.
FORMAT Specifies the serialization format of both the message key and value in the topic. It is not valid to supply this property alongside either KEY_FORMAT or VALUE_FORMAT. For supported formats, see Serialization Formats.
PARTITIONS The number of partitions in the backing topic. This property must be set if creating a STREAM without an existing topic (the command will fail if the topic does not exist). You can't change the number of partitions on a stream. To change the partition count, you must drop the stream and create it again.
REPLICAS The number of replicas in the backing topic. If this property is not set but PARTITIONS is set, then the default Kafka cluster configuration for replicas will be used for creating a new topic.
VALUE_DELIMITER Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character.
TIMESTAMP By default, the pseudo ROWTIME column is the timestamp of the record in the Kafka topic. The TIMESTAMP property can be used to override ROWTIME with the contents of the specified column within the Kafka message (similar to timestamp extractors in Kafka's Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in ROWTIME.
TIMESTAMP_FORMAT Used in conjunction with TIMESTAMP. If not set, ksqlDB timestamp column must be of type bigint or timestamp. If it is set, then the TIMESTAMP column must be of type varchar and have a format that can be parsed with the java DateTimeFormatter. If your timestamp format has characters requiring single quotes, you can escape them with successive single quotes, '', for example: 'yyyy-MM-dd''T''HH:mm:ssX'. For more information on timestamp formats, see DateTimeFormatter.
WRAP_SINGLE_VALUE Controls how values are deserialized where the value schema contains only a single column. The setting controls how ksqlDB will deserialize the value of the records in the supplied KAFKA_TOPIC that contain only a single column.
If set to true, ksqlDB expects the column to have been serialized as a named column within a record.
If set to false, ksqlDB expects the column to have been serialized as an anonymous value.
If not supplied, the system default, defined by ksql.persistence.wrap.single.values and defaulting to true, is used.
Note: null values have special meaning in ksqlDB. Care should be taken when dealing with single-column schemas where the value can be null. For more information, see Single column (un)wrapping.
Note: Supplying this property for formats that do not support wrapping, for example DELIMITED, or when the value schema has multiple columns, will result in an error.
WINDOW_TYPE By default, the topic is assumed to contain non-windowed data. If the data is windowed, i.e., was created using ksqlDB using a query that contains a WINDOW clause, then the WINDOW_TYPE property can be used to provide the window type. Valid values are SESSION, HOPPING, and TUMBLING.
WINDOW_SIZE By default, the topic is assumed to contain non-windowed data. If the data is windowed, i.e., was created using ksqlDB using a query that contains a WINDOW clause, and the WINDOW_TYPE property is TUMBLING or HOPPING, then the WINDOW_SIZE property should be set. The property is a string with two literals, window size (a number) and window size unit (a time unit). For example: 10 SECONDS.

For more information on timestamp formats, see DateTimeFormatter.

Note

  • To use Avro or Protobuf, you must have Schema Registry enabled and ksql.schema.registry.url must be set in the ksqlDB Server configuration file. See Configure ksqlDB for Avro, Protobuf, and JSON schemas.
  • Avro and Protobuf field names are not case sensitive in ksqlDB. This matches the ksqlDB column name behavior.

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
-- stream with a page_id column loaded from the kafka message value:
CREATE STREAM pageviews (
    page_id BIGINT,
    viewtime BIGINT,
    user_id VARCHAR
  ) WITH (
    KAFKA_TOPIC = 'keyless-pageviews-topic',
    VALUE_FORMAT = 'JSON'
  );

-- stream with a page_id column loaded from the kafka message key:
CREATE STREAM pageviews (
    page_id BIGINT KEY,
    viewtime BIGINT,
    user_id VARCHAR
  ) WITH (
    KAFKA_TOPIC = 'keyed-pageviews-topic',
    VALUE_FORMAT = 'JSON'
  );

-- keyless stream, with value columns loaded from Schema Registry:
CREATE STREAM pageviews WITH (
    KAFKA_TOPIC = 'keyless-pageviews-topic',
    VALUE_FORMAT = 'JSON_SR'
  );

-- keyed stream, with value columns loaded from Schema Registry:
CREATE STREAM pageviews (
    page_id BIGINT KEY
  ) WITH (
    KAFKA_TOPIC = 'keyed-pageviews-topic',
    VALUE_FORMAT = 'JSON_SR'
  );

Last update: 2022-01-19