Create a new table with the specified columns and properties.
If the IF NOT EXISTS clause is present, the statement won't fail if a table with the same name already exists.
Assign the PARTITIONS property in the WITH clause to specify the number of partitions in the stream's backing topic. Partitioning streams and tables is especially important for stateful or otherwise intensive queries. For more information, see Parallelization.
A ksqlDB table works much like tables in other SQL systems. A table has zero or
more rows. Each row is identified by its
PRIMARY KEY. A row's
If an incoming message in the underlying Kafka topic has the same key
as an existing row, it replaces the existing row in the table. But if the
message's value is
null, it deletes the row.
A message that has a
null value is known as a tombstone, because it
causes the existing row to be deleted.
This situation is handled differently by ksqlDB STREAM, as shown in the following table.
|Key column type||
|NON NULL key constraint||No||Yes
Messages in the Kafka topic with a NULL
|Unique key constraint||No
Messages with the same key as another have no special meaning.
Later messages with the same key replace earlier.
Messages with NULL values are ignored.
NULL message values are treated as a tombstone
Any existing row with a matching key is deleted.
Each column is defined by:
column_name: the name of the column. If unquoted, the name must be a valid SQL identifier and ksqlDB converts it to uppercase. The name can be quoted if case needs to be preserved or if the name is not a valid SQL identifier, for example
data_type: the SQL type of the column. Columns can be any of the data types supported by ksqlDB.
PRIMARY KEY: columns that are stored in the Kafka message's key should be marked as
PRIMARY KEYcolumns. If a column is not marked as a
PRIMARY KEYcolumn ksqlDB loads it from the Kafka message's value. Unlike a stream's
KEYcolumn, a table's
PRIMARY KEYcolumn(s) are NON NULL. Any records in the Kafka topic with NULL key columns are dropped.
- To use Avro, Protobuf, or JSON_SR you must have Schema Registry enabled and
ksql.schema.registry.urlmust be set in the ksqlDB server configuration file. For more information, see Configure ksqlDB for Avro, Protobuf, and JSON schemas.
- Avro and Protobuf field names are not case sensitive in ksqlDB. This matches the ksqlDB column name behavior.
Each row within the table has a
ROWTIME pseudo column, which represents the last modified time
of the row. The timestamp has milliseconds accuracy. The timestamp is used by ksqlDB when updating
a row during any windowing operations and during joins, where data from each side of a join is
generally processed in time order.
ROWTIME is populated from the corresponding Kafka message timestamp. Set
WITH clause to populate
ROWTIME from a column in the Kafka message key or value.
Specify details about your table by using the WITH clause, which supports the following properties:
|KAFKA_TOPIC (required)||The name of the Kafka topic that backs this source. The topic must either already exist in Kafka, or PARTITIONS must be specified to create the topic. Command will fail if the topic exists with different partition/replica counts.|
|KEY_FORMAT||Specifies the serialization format of the message key in the topic. For supported formats, see Serialization Formats.
If not supplied, the system default, defined by ksql.persistence.default.format.key, is used. If the default is also not set the statement will be rejected as invalid.
|VALUE_FORMAT||Specifies the serialization format of the message value in the topic. For supported formats, see Serialization Formats.
If not supplied, the system default, defined by ksql.persistence.default.format.value, is used. If the default is also not set the statement will be rejected as invalid.
|FORMAT||Specifies the serialization format of both the message key and value in the topic. It is not valid to supply this property alongside either
|PARTITIONS||The number of partitions in the backing topic. This property must be set if creating a TABLE without an existing topic (the command will fail if the topic does not exist). You can't change the number of partitions on a table. To change the partition count, you must drop the table and create it again.|
|REPLICAS||The number of replicas in the backing topic. If this property is not set but PARTITIONS is set, then the default Kafka cluster configuration for replicas will be used for creating a new topic.|
|VALUE_DELIMITER||Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character.|
|TIMESTAMP||By default, the pseudo
|TIMESTAMP_FORMAT||Used in conjunction with TIMESTAMP. If not set the timestamp column must be of type
|WRAP_SINGLE_VALUE||Controls how values are deserialized where the values schema contains only a single column. The setting controls how ksqlDB will deserialize the value of the records in the supplied
If set to
If set to
If not supplied, the system default, defined by ksql.persistence.wrap.single.values, then the format's default is used.
Note: Supplying this property for formats that do not support wrapping, for example
|WINDOW_TYPE||By default, the topic is assumed to contain non-windowed data. If the data is windowed, i.e. was created using ksqlDB using a query that contains a
|WINDOW_SIZE||By default, the topic is assumed to contain non-windowed data. If the data is windowed, i.e., was created using ksqlDB using a query that contains a
1 2 3 4 5 6 7 8 9 10
1 2 3 4 5 6 7