CREATE STREAM AS SELECT¶
1 2 3 4 5 6 7 8
Create a new materialized stream view, along with the corresponding Kafka topic, and stream the result of the query into the topic.
The PARTITION BY clause, if supplied, is applied to the source after any JOIN or WHERE clauses, and before the SELECT clause, in much the same way as GROUP BY.
Joins to streams can use any stream column. If the join criteria is not the key column of the stream ksqlDB will internally repartition the data.
Kafka guarantees the relative order of any two messages from one source partition only if they are also both in the same partition after the repartition. Otherwise, Kafka is likely to interleave messages. The use case will determine if these ordering guarantees are acceptable.
See Partition Data to Enable Joins for more information about how to correctly partition your data for joins.
For stream-stream joins, you must specify a WITHIN clause for matching records that both occur within a specified time interval. For valid time units, see Time Units.
The key of the resulting stream is determined by the following rules, in order of priority:
1. if the query has a
1. if the
PARITION BY is on a single source column reference, the key will match the
name, type and contents of the source column.
1. otherwise the key will have a system generated name, unless you provide an alias in the
projection, and will match the type and contents of the result of the expression.
1. if the query has a join see Join Synthetic Key Columns for more info.
1. otherwise, the primary key will match the name, unless you provide an alias in the projection,
and type of the source stream's key.
The projection must include all columns required in the result, including any key columns.
For supported serialization formats,
ksqlDB can integrate with Confluent Schema Registry.
ksqlDB registers the value schema of the new stream with Schema Registry automatically.
The schema is registered under the subject
The WITH clause for the result supports the following properties:
|KAFKA_TOPIC||The name of the Kafka topic that backs this stream. If this property is not set, then the name of the stream in upper case will be used as default.|
|KEY_FORMAT||Specifies the serialization format of the message key in the topic. For supported formats, see Serialization Formats. If this property is not set, the format from the left-most input stream/table is used.|
|VALUE_FORMAT||Specifies the serialization format of the message value in the topic. For supported formats, see Serialization Formats. If this property is not set, the format from the left-most input stream/table is used.|
|FORMAT||Specifies the serialization format of both the message key and value in the topic. It is not valid to supply this property alongside either
|VALUE_DELIMITER||Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character.|
|PARTITIONS||The number of partitions in the backing topic. If this property is not set, then the number of partitions of the input stream/table will be used. In join queries, the property values are taken from the left-most stream or table.|
|REPLICAS||The replication factor for the topic. If this property is not set, then the number of replicas of the input stream or table will be used. In join queries, the property values are taken from the left-most stream or table.|
|TIMESTAMP||Sets a column within this stream's schema to be used as the default source of
Note: This doesn't affect the processing of the query that populates this stream. For example, given the following statement:
CREATE STREAM foo WITH (TIMESTAMP='t2') ASThe window into which each row of
|TIMESTAMP_FORMAT||Used in conjunction with TIMESTAMP. If not set, ksqlDB timestamp column must be of type
|WRAP_SINGLE_VALUE||Controls how values are serialized where the values schema contains only a single column. This setting controls how the query serializes values with a single-column schema.
If set to
If set to
If not supplied, the system default, defined by ksql.persistence.wrap.single.values, then the format's default is used.
Note: Supplying this property for formats that do not support wrapping, for example
- To use Avro, you must have Schema Registry enabled and
ksql.schema.registry.url must be set in the ksqlDB server configuration
file. See Configure ksqlDB for Avro, Protobuf, and JSON schemas.
- Avro field names are not case sensitive in ksqlDB. This matches the ksqlDB
column name behavior.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17