CREATE STREAM AS SELECT¶
CREATE STREAM stream_name [WITH ( property_name = expression [, ...] )] AS SELECT select_expr [, ...] FROM from_stream [ LEFT | FULL | INNER ] JOIN [join_table | join_stream] [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] ON join_criteria [ WHERE condition ] [PARTITION BY column_name] EMIT CHANGES;
Create a new stream along with the corresponding Kafka topic, and continuously write the result of the SELECT query into the stream and its corresponding topic.
If the PARTITION BY clause is present, then the resulting stream will
have the specified column as its key. The
column_name must be present
from_stream. For more information, see
Partition Data to Enable Joins.
For joins, the key of the resulting stream will be the value from the column from the left stream that was used in the join criteria. This column will be registered as the key of the resulting stream if included in the selected columns.
For stream-table joins, the column used in the join criteria for the table must be the table key.
For stream-stream joins, you must specify a WITHIN clause for matching records that both occur within a specified time interval. For valid time units, see ksqlDB Time Units.
For more information, see Join Event Streams with ksqlDB.
The WITH clause for the result supports the following properties:
|KAFKA_TOPIC||The name of the Kafka topic that backs this stream. If this property is not set, then the name of the stream in upper case will be used as default.|
|VALUE_FORMAT||Specifies the serialization format of the message value in the topic. Supported formats:
|VALUE_DELIMITER||Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character.|
|PARTITIONS||The number of partitions in the backing topic. If this property is not set, then the number of partitions of the input stream/table will be used. In join queries, the property values are taken from the left-side stream or table. The
|REPLICAS||The replication factor for the topic. If this property is not set, then the number of replicas of the input stream or table will be used. In join queries, the property values are taken from the left-side stream or table. The
|TIMESTAMP||Sets a field within this stream's schema to be used as the default source of
Note: This doesn't affect the processing of the query that populates this stream. For example, given the following statement:
CREATE STREAM foo WITH (TIMESTAMP='t2') ASThe window into which each row of
|TIMESTAMP_FORMAT||Used in conjunction with TIMESTAMP. If not set, ksqlDB assumes that the timestamp field is a
|WRAP_SINGLE_VALUE||Controls how values are serialized where the values schema contains only a single field. This setting controls how the query serializes values with a single-field schema.
If set to
If set to
If not supplied, the system default, defined by ksql.persistence.wrap.single.values and defaulting to
Note: Supplying this property for formats that do not support wrapping, for example
- To use Avro, you must have Schema Registry enabled and
ksql.schema.registry.urlmust be set in the ksqlDB server configuration file. See Configure Avro and Schema Registry for ksqlDB.
- Avro field names are not case sensitive in ksqlDB. This matches the ksqlDB column name behavior.
KEY property is not supported. Use PARTITION BY instead.
Page last revised on: 2019-12-12