Skip to content

Schemas

Data sources like streams and tables have an associated schema. This schema defines the columns available in the data, just like a the columns in a traditional SQL database table.

Key vs Value columns

ksqlDB supports both key and value columns. These map to the data held in the key and value of the underlying Kafka topic message.

A column is defined by a combination of its name, its SQL data type, and possibly a namespace.

Key columns have a KEY or PRIMARY KEY suffix for streams and tables, respectively. ksqlDB supports a single key column only.

Value columns have no namespace suffix. There can be one or more value columns.

For example, the following example statement declares a stream with a single key column and several value columns:

1
2
3
4
5
6
7
CREATE STREAM USER_UPDATES (
   ID BIGINT KEY, 
   STRING NAME, 
   ADDRESS ADDRESS_TYPE
 ) WITH (
   ...
 );

This statement declares a table with a primary key and value columns:

1
2
3
4
5
6
7
CREATE TABLE USERS (
   ID BIGINT PRIMARY KEY, 
   STRING NAME, 
   ADDRESS ADDRESS_TYPE
 ) WITH (
   ...
 );

Tables require a primary key, but the key column of a stream is optional. For example, the following statement defines a stream with no key column:

1
2
3
4
5
6
7
CREATE STREAM APP_LOG (
   LOG_LEVEL STRING,
   APP_NAME STRING,
   MESSAGE STRING
 ) WITH (
   ...
 );

Tip

How Keys Work in ksqlDB has more details on key columns and provides guidance for when you may want to use streams with and without key columns.

Schema Inference

For supported serialization formats, ksqlDB can integrate with Confluent Schema Registry. ksqlDB automatically retrieves (reads) and registers (writes) schemas as needed, which spares you from defining columns and data types manually in CREATE statements and from manual interaction with Schema Registry. Before using schema inference in ksqlDB, make sure that the Schema Registry is up and running and ksqlDB is configured to use it.

Here's what you can do with schema inference in ksqlDB:

  • Declare streams and tables on Kafka topics with supported value formats by using CREATE STREAM and CREATE TABLE statements, without needing to declare the value columns.
  • Declare derived views with CREATE STREAM AS SELECT and CREATE TABLE AS SELECT statements. The schema of the view is registered in Schema Registry automatically.
  • Convert data to different formats with CREATE STREAM AS SELECT and CREATE TABLE AS SELECT statements, by declaring the required output format in the WITH clause. For example, you can convert a stream from Avro to JSON.

Only the schema of the message value can be retrieved from Schema Registry. Message keys must be compatible with the KAFKA format to be accessible within ksqlDB. ksqlDB ignores schemas that have been registered for message keys.

Note

Message keys in Avro and Protobuf are not supported. If your message keys are in an unsupported format, see What to do if your key is not set or is in a different format. JSON message keys can be accessed by defining the key as a single STRING value, which will contain the JSON document.

Although ksqlDB doesn't support loading the message key's schema from Schema Registry, you can provide the key column definition within the CREATE TABLE or CREATE STREAM statement, if the data records are compatible with ksqlDB. This is known as partial schema inference, because the key schema is provided explicitly.

Tables require a PRIMARY KEY, so you must supply one explicitly in your CREATE TABLE statement. KEY columns are optional for streams, so if you don't supply one the stream is created without a key column.

The following example statements show how to create streams and tables that have Avro-formatted data. If you want to use Protobuf- or JSON-formatted data, substitute PROTOBUF, JSON or JSON_SR for AVRO in each statement.

Note

ksqlDB handles the JSON and JSON_SR formats differently. While the JSON format is capable of reading the schema from Schema Registry, JSON_SR both reads and registers new schemas, as necessary.

Create a new stream

Without a key column

The following statement shows how to create a new pageviews stream by reading from a Kafka topic that has Avro-formatted message values.

1
2
3
4
5
CREATE STREAM pageviews
  WITH (
    KAFKA_TOPIC='pageviews-avro-topic',
    VALUE_FORMAT='AVRO'
  );

In this example, you don't need to define any columns in the CREATE statement. ksqlDB infers this information automatically from the latest registered schema for the pageviews-avro-topic topic. ksqlDB uses the most recent schema at the time the statement is first executed.

Important

The schema must be registered in Schema Registry under the subject pageviews-avro-topic-value.

With a key column

The following statement shows how to create a new pageviews stream by reading from a Kafka topic that has Avro-formatted message values and a KAFKA-formatted INT message key.

1
2
3
4
5
6
CREATE STREAM pageviews (
    pageId INT KEY
  ) WITH (
    KAFKA_TOPIC='pageviews-avro-topic',
    VALUE_FORMAT='AVRO'
  );

In the previous example, you need only supply the key column in the CREATE statement. ksqlDB infers the value columns automatically from the latest registered schema for the pageviews-avro-topic topic. ksqlDB uses the most recent schema at the time the statement is first executed.

Note

The schema must be registered in Schema Registry under the subject pageviews-avro-topic-value.

Create a new table

The following statement shows how to create a new users table by reading from a Kafka topic that has Avro-formatted message values and a KAFKA-formatted BIGINT message key.

1
2
3
4
5
6
CREATE TABLE users (
    userId BIGINT PRIMARY KEY
  ) WITH (
    KAFKA_TOPIC='users-avro-topic',
    VALUE_FORMAT='AVRO'
  );

In the previous example, you need only supply the key column in the CREATE statement. ksqlDB infers the value columns automatically from the latest registered schema for the users-avro-topic topic. ksqlDB uses the most recent schema at the time the statement is first executed.

Note

The schema must be registered in Schema Registry under the subject users-avro-topic-value.

Create a new source with selected columns

If you want to create a STREAM or TABLE that has only a subset of the available fields in the Avro schema, you must explicitly define the columns.

The following statement shows how to create a new pageviews_reduced stream, which is similar to the previous example, but with only a few of the available fields in the Avro data. In this example, only the viewtime and url value columns are picked.

1
2
3
4
5
6
7
CREATE STREAM pageviews_reduced (
    viewtime BIGINT,
    url VARCHAR
  ) WITH (
    KAFKA_TOPIC='pageviews-avro-topic',
    VALUE_FORMAT='AVRO'
  );

Declaring a derived view

The following statement shows how to create a materialized view derived from an existing source. The Kafka topic that the view is materialized to inherits the value format of the source, unless it's overridden explicitly in the WITH clause, as shown. The value schema is registered with Schema Registry if the value format supports the integration, with the exception of the JSON format, which only reads from Schema Registry.

1
2
3
4
5
6
7
8
9
CREATE TABLE pageviews_by_url 
  WITH (
    VALUE_FORMAT='AVRO'
  ) AS 
  SELECT
    url,
    COUNT(*) AS VIEW_COUNT
  FROM pageviews
  GROUP BY url;

Note

The schema will be registered in Schema Registry under the subject PAGEVIEWS_BY_URL-value.

Converting formats

ksqlDB enables you to change the underlying value format of streams and tables. This means that you can easily mix and match streams and tables with different data formats and also convert between value formats. For example, you can join a stream backed by Avro data with a table backed by JSON data.

The example below converts a JSON-formatted topic into Avro. Only the VALUE_FORMAT is required to achieve the data conversion. ksqlDB generates an appropriate Avro schema for the new PAGEVIEWS_AVRO stream automatically and registers the schema with Schema Registry.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
CREATE STREAM pageviews_json (
    pageid VARCHAR KEY, 
    viewtime BIGINT, 
    userid VARCHAR
  ) WITH (
    KAFKA_TOPIC='pageviews_kafka_topic_json', 
    VALUE_FORMAT='JSON'
  );

CREATE STREAM pageviews_avro
  WITH (VALUE_FORMAT = 'AVRO') AS
  SELECT * FROM pageviews_json;

Note

The schema will be registered in Schema Registry under the subject PAGEVIEWS_AVRO-value.

For more information, see How to convert a stream's serialization format in Kafka Tutorials.

Valid Identifiers

Column and field names must be valid identifiers.

Unquoted identifiers will be treated as upper-case, for example col0 is equivalent to COL0, and must contain only alpha-numeric and underscore characters.

Identifiers containing invalid character, or where case needs to be preserved, can be quoted using back-tick quotes, for example `col0`.

SQL data types

The following SQL types are supported by ksqlDB:

Primitive types

Supported primitive types are:

  • BOOLEAN: a binary value
  • INT: 32-bit signed integer
  • BIGINT: 64-bit signed integer
  • DOUBLE: double precision (64-bit) IEEE 754 floating-point number
  • STRING: a unicode character sequence (UTF8)

Decimal type

The DECIMAL type can store numbers with a very large number of digits and perform calculations exactly. It is recommended for storing monetary amounts and other quantities where exactness is required. However, arithmetic on decimals is slow compared to integer and floating point types.

DECIMAL types have a precision and scale. The scale is the number of digits in the fractional part, to the right of the decimal point. The precision is the total number of significant digits in the whole number, that is, the number of digits on both sides of the decimal point. For example, the number 765.937500 has a precision of 9 and a scale of 6.

To declare a column of type DECIMAL use the syntax:

1
DECIMAL(precision, scale)

The precision must be positive, the scale zero or positive.

Array type

The ARRAY type defines a variable-length array of elements. All elements in the array must be of the same type.

To declare an ARRAY use the syntax:

1
ARRAY<element-type>

The element-type of an another SQL data type.

For example, the following creates an array of STRINGs:

1
ARRAY<STRING>

Instances of an array can be created using the syntax:

1
ARRAY[value [, value]*]

For example, the following creates an array with three INT elements:

1
ARRAY[2, 4, 6]

Map type

The MAP type defines a variable-length collection of key-value pairs. All keys in the map must be of the same type. All values in the map must be of the same type.

To declare a MAP use the syntax:

1
MAP<key-type, element-type>

The key-type must currently be STRING while the value-type can an any other SQL data type.

For example, the following creates a map with STRING keys and values:

1
MAP<STRING, STRING>

Instances of a map can be created using the syntax:

1
MAP(key := value [, key := value]*)

For example, the following creates a map with three key-value pairs:

1
MAP('a' := 1, 'b' := 2, 'c' := 3)

Struct type

The STRUCT type defines a list of named fields, where each field can have any SQL data type.

To declare a STRUCT use the syntax:

1
STRUCT<field-name field-type [, field-name field-type]*>

The field-name can be any valid identifier. The field-type can be any valid SQL data type.

For example, the following creates a struct with an INT field called FOO and a BOOLEAN field call BAR:

1
STRUCT<FOO INT, BAR BOOLEAN>

Instances of a struct can be created using the syntax:

1
STRUCT(field-name := field-value [, field-name := field-value]*)

For example, the following creates a struct with fields called FOO and BAR and sets their values to 10 and true, respectively:

1
STRUCT('FOO' := 10, 'BAR' := true)

Custom types

KsqlDB supports custom types using the CREATE TYPE statements. See the CREATE TYPE docs for more information.


Last update: 2020-12-15