Create a new table with the specified columns and properties.
A ksqlDB TABLE works much like tables in other SQL systems. A table has zero or more rows. Each
row is identified by its
PRIMARY KEY values can not be NULL. A message in the
underlying Kafka topic that has the same key as an existing row will replace the earlier row in the
table, or delete the row if the message's value is NULL, i.e. a tombstone, as long as the
previously received row does not have a later timestamp /
ROWTIME. This situation is handled
differently by ksqlDB STREAM, as shown in the following table.
|Key column type||
|NON NULL key constraint||No||Yes
Messages in the Kafka topic with a NULL
|Unique key constraint||No
Messages with the same key as another have no special meaning.
Later messages with the same key replace earlier.
Messages with NULL values are ignored.
NULL message values are treated as a tombstone
Any existing row with a matching key is deleted.
Each column is defined by:
column_name: the name of the column. If unquoted, the name must be a valid
SQL identifier and ksqlDB converts it to uppercase.
The name can be quoted if case needs to be preserved or if the name is not a valid SQL
identifier, for example
data_type: the SQL type of the column. Columns can be any of the
data types supported by ksqlDB.
PRIMARY KEY: columns that are stored in the Kafka message's key should be marked as
PRIMARY KEY columns. If a column is not marked as a
PRIMARY KEY column ksqlDB loads it
from the Kafka message's value. Unlike a stream's
KEY column, a table's
PRIMARY KEY column(s)
are NON NULL. Any records in the Kafka topic with NULL key columns are dropped.
ksqlDB adds an implicit
ROWKEY system column to every stream and table, which represents the
corresponding Kafka message key. An implicit
ROWTIME pseudo column is also available on every
stream and table, which represents the corresponding Kafka message timestamp. The timestamp has
milliseconds accuracy, and generally represents the event time of a stream row and the
last modified time of a table row.
The WITH clause supports the following properties:
|KAFKA_TOPIC (required)||The name of the Kafka topic that backs this source. The topic must either already exist in Kafka, or PARTITIONS must be specified to create the topic. Command will fail if the topic exists with different partition/replica counts.|
|VALUE_FORMAT (required)||Specifies the serialization format of message values in the topic. Supported formats:
|PARTITIONS||The number of partitions in the backing topic. This property must be set if creating a TABLE without an existing topic (the command will fail if the topic does not exist).|
|REPLICAS||The number of replicas in the backing topic. If this property is not set but PARTITIONS is set, then the default Kafka cluster configuration for replicas will be used for creating a new topic.|
|VALUE_DELIMITER||Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character.|
|KEY||Optimization hint: If the Kafka message key is also present as a field/column in the Kafka message value, you may set this property to associate the corresponding field/column with the implicit
|TIMESTAMP||By default, the implicit
|TIMESTAMP_FORMAT||Used in conjunction with TIMESTAMP. If not set will assume that the timestamp field is a
|WRAP_SINGLE_VALUE||Controls how values are deserialized where the values schema contains only a single field. The setting controls how ksqlDB will deserialize the value of the records in the supplied
If set to
If set to
If not supplied, the system default, defined by ksql.persistence.wrap.single.values and defaulting to
Note: Supplying this property for formats that do not support wrapping, for example
|WINDOW_TYPE||By default, the topic is assumed to contain non-windowed data. If the data is windowed, i.e. was created using ksqlDB using a query that contains a
|WINDOW_SIZE||By default, the topic is assumed to contain non-windowed data. If the data is windowed, i.e., was created using ksqlDB using a query that contains a
- To use Avro or Protobuf, you must have Schema Registry enabled and
ksql.schema.registry.urlmust be set in the ksqlDB server configuration file. See Configure ksqlDB for Avro or Protobuf.
- Avro and Protobuf field names are not case sensitive in ksqlDB. This matches the ksqlDB column name behavior.
1 2 3 4 5 6 7 8 9