CREATE STREAM AS SELECT¶
CREATE STREAM stream_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
[ LEFT | FULL | INNER ] JOIN [join_table | join_stream] [ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] ON join_criteria
[ WHERE condition ]
[PARTITION BY column_name]
Create a new stream along with the corresponding Kafka topic, and continuously write the result of the SELECT query into the stream and its corresponding topic.
If the PARTITION BY clause is present, then the resulting stream will
have the specified column as its key. The
column_name must be present
from_stream. For more information, see
Partition Data to Enable Joins.
For joins, the key of the resulting stream will be the value from the column from the left stream that was used in the join criteria. This column will be registered as the key of the resulting stream if included in the selected columns.
For stream-table joins, the column used in the join criteria for the table must be the table key.
For stream-stream joins, you must specify a WITHIN clause for matching records that both occur within a specified time interval. For valid time units, see ksqlDB Time Units.
For more information, see Join Event Streams with ksqlDB.
The WITH clause for the result supports the following properties:
|The name of the Kafka topic that backs this stream. If this property is not set, then the name of the stream in upper case will be used as default.
|Specifies the serialization format of the message value in the topic. Supported formats:
DELIMITED (comma-separated value),
PROTOBUF. If this property is not set, the format of the input stream/table is used. For more information, see Serialization Formats.
|Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character.
|The number of partitions in the backing topic. If this property is not set, then the number of partitions of the input stream/table will be used. In join queries, the property values are taken from the left-side stream or table. The
ksql.sink.partitions property can be set in the properties file the ksqlDB Server is started with, or by using the
|The replication factor for the topic. If this property is not set, then the number of replicas of the input stream or table will be used. In join queries, the property values are taken from the left-side stream or table. The
ksql.sink.replicas property can be set in the properties file the ksqlDB server is started with, or by using the
|Sets a field within this stream's schema to be used as the default source of
ROWTIME for any downstream queries. Downstream queries that use time-based operations, such as windowing, will process records in this stream based on the timestamp in this field. By default, such queries will also use this field to set the timestamp on any records emitted to Kafka. Timestamps have a millisecond accuracy. If not supplied, the
ROWTIME of the source stream is used.
Note: This doesn't affect the processing of the query that populates this stream. For example, given the following statement:
CREATE STREAM foo WITH (TIMESTAMP='t2') ASThe window into which each row of
bar is placed is determined by bar's
|Used in conjunction with TIMESTAMP. If not set, ksqlDB assumes that the timestamp field is a
bigint. When set, the TIMESTAMP field must be of type
varchar and have a format that can be parsed with the Java
DateTimeFormatter. If your timestamp format has characters requiring single quotes, you can escape them with two successive single quotes,
'', for example:
'yyyy-MM-dd''T''HH:mm:ssX'. For more information on timestamp formats, see DateTimeFormatter.
|Controls how values are serialized where the values schema contains only a single field. This setting controls how the query serializes values with a single-field schema.
If set to
true, ksqlDB serializes the field as a named field within a record.
If set to
false, ksqlDB serializes the field as an anonymous value.
If not supplied, the system default, defined by ksql.persistence.wrap.single.values and defaulting to
true, is used.
null values have special meaning in ksqlDB. Care should be taken when dealing with single-field schemas where the value can be
null. For more information, see Single field (un)wrapping.
Note: Supplying this property for formats that do not support wrapping, for example
DELIMITED, or when the value schema has multiple fields, results in an error.
- To use Avro, you must have Schema Registry enabled and
ksql.schema.registry.urlmust be set in the ksqlDB server configuration file. See Configure ksqlDB for Avro or Protobuf.
- Avro field names are not case sensitive in ksqlDB. This matches the ksqlDB column name behavior.
KEY property is not supported. Use PARTITION BY instead.