Schemas
Data sources like streams and tables have an associated schema. This schema defines the columns available in the data, just like a the columns in a traditional SQL database table.
Key vs Value columns¶
ksqlDB supports both key and value columns. These map to the data held in the key and value of the underlying Kafka topic message.
A column is defined by a combination of its name, its SQL data type, and possibly a namespace.
Key columns have a KEY
or PRIMARY KEY
suffix for streams and tables, respectively.
ksqlDB supports a single key column only.
Value columns have no namespace suffix. There can be one or more value columns.
For example, the following example statement declares a stream with a single key column and several value columns:
1 2 3 4 5 6 7 |
|
This statement declares a table with a primary key and value columns:
1 2 3 4 5 6 7 |
|
Tables require a primary key, but the key column of a stream is optional. For example, the following statement defines a stream with no key column:
1 2 3 4 5 6 7 |
|
Tip
How Keys Work in ksqlDB has more details on key columns and provides guidance for when you may want to use streams with and without key columns.
Schema Inference¶
For supported serialization formats,
ksqlDB can integrate with Confluent Schema Registry.
ksqlDB automatically retrieves (reads) and registers (writes) schemas as needed,
which spares you from defining columns and data types manually in CREATE
statements and from manual interaction with Schema Registry. Before using schema
inference in ksqlDB, make sure that the Schema Registry is up and running and
ksqlDB is configured to use it.
Here's what you can do with schema inference in ksqlDB:
- Declare streams and tables on Kafka topics with supported value formats by using
CREATE STREAM
andCREATE TABLE
statements, without needing to declare the value columns. - Declare derived views with
CREATE STREAM AS SELECT
andCREATE TABLE AS SELECT
statements. The schema of the view is registered in Schema Registry automatically. - Convert data to different formats with
CREATE STREAM AS SELECT
andCREATE TABLE AS SELECT
statements, by declaring the required output format in theWITH
clause. For example, you can convert a stream from Avro to JSON.
Only the schema of the message value can be retrieved from Schema Registry. Message
keys must be compatible with the KAFKA
format
to be accessible within ksqlDB. ksqlDB ignores schemas that have been registered
for message keys.
Note
Message keys in Avro and Protobuf are not supported. If your message keys
are in an unsupported format, see What to do if your key is not set or is in a different format.
JSON message keys can be accessed by defining the key as a single STRING
value, which will
contain the JSON document.
Although ksqlDB doesn't support loading the message key's schema from Schema Registry,
you can provide the key column definition within the CREATE TABLE
or CREATE STREAM
statement, if the data records are compatible with ksqlDB. This is known as
partial schema inference, because the key schema is provided explicitly.
Tables require a PRIMARY KEY
, so you must supply one explicitly in your
CREATE TABLE
statement. KEY
columns are optional for streams, so if you
don't supply one the stream is created without a key column.
The following example statements show how to create streams and tables that have
Avro-formatted data. If you want to use Protobuf- or JSON-formatted data,
substitute PROTOBUF
, JSON
or JSON_SR
for AVRO
in each statement.
Note
ksqlDB handles the JSON
and JSON_SR
formats differently. While the
JSON
format is capable of reading the schema from Schema Registry,
JSON_SR
both reads and registers new schemas, as necessary.
Create a new stream¶
Without a key column¶
The following statement shows how to create a new pageviews
stream by
reading from a Kafka topic that has Avro-formatted message values.
1 2 3 4 5 |
|
In this example, you don't need to define any columns in the CREATE statement.
ksqlDB infers this information automatically from the latest registered schema
for the pageviews-avro-topic
topic. ksqlDB uses the most recent schema at the
time the statement is first executed.
Important
The schema must be registered in Schema Registry under the subject
pageviews-avro-topic-value
.
With a key column¶
The following statement shows how to create a new pageviews
stream by reading
from a Kafka topic that has Avro-formatted message values and a
KAFKA
-formatted INT
message key.
1 2 3 4 5 6 |
|
In the previous example, you need only supply the key column in the CREATE
statement. ksqlDB infers the value columns automatically from the latest
registered schema for the pageviews-avro-topic
topic. ksqlDB uses the most
recent schema at the time the statement is first executed.
Note
The schema must be registered in Schema Registry under the subject
pageviews-avro-topic-value
.
Create a new table¶
The following statement shows how to create a new users
table by reading
from a Kafka topic that has Avro-formatted message values and a
KAFKA
-formatted BIGINT
message key.
1 2 3 4 5 6 |
|
In the previous example, you need only supply the key column in the CREATE
statement. ksqlDB infers the value columns automatically from the latest
registered schema for the users-avro-topic
topic. ksqlDB uses the most
recent schema at the time the statement is first executed.
Note
The schema must be registered in Schema Registry under the subject
users-avro-topic-value
.
Create a new source with selected columns¶
If you want to create a STREAM or TABLE that has only a subset of the available fields in the Avro schema, you must explicitly define the columns.
The following statement shows how to create a new pageviews_reduced
stream, which is similar to the previous example, but with only a few of
the available fields in the Avro data. In this example, only the
viewtime
and url
value columns are picked.
1 2 3 4 5 6 7 |
|
Declaring a derived view¶
The following statement shows how to create a materialized view derived from an
existing source. The Kafka topic that the view is materialized to
inherits the value format of the source, unless it's overridden explicitly in
the WITH
clause, as shown. The value schema is registered with Schema Registry
if the value format supports the integration, with the exception of the JSON
format, which only reads from Schema Registry.
1 2 3 4 5 6 7 8 9 |
|
Note
The schema will be registered in Schema Registry under the subject
PAGEVIEWS_BY_URL-value
.
Converting formats¶
ksqlDB enables you to change the underlying value format of streams and tables. This means that you can easily mix and match streams and tables with different data formats and also convert between value formats. For example, you can join a stream backed by Avro data with a table backed by JSON data.
The example below converts a JSON-formatted topic into Avro. Only the
VALUE_FORMAT
is required to achieve the data conversion. ksqlDB generates an
appropriate Avro schema for the new PAGEVIEWS_AVRO
stream automatically and
registers the schema with Schema Registry.
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Note
The schema will be registered in Schema Registry under the subject
PAGEVIEWS_AVRO-value
.
For more information, see How to convert a stream's serialization format in Kafka Tutorials.
Valid Identifiers¶
Column and field names must be valid identifiers.
Unquoted identifiers will be treated as upper-case, for example col0
is equivalent to COL0
, and
must contain only alpha-numeric and underscore characters.
Identifiers containing invalid character, or where case needs to be preserved, can be quoted using
back-tick quotes, for example `col0`
.
SQL data types¶
The following SQL types are supported by ksqlDB:
Primitive types¶
Supported primitive types are:
BOOLEAN
: a binary valueINT
: 32-bit signed integerBIGINT
: 64-bit signed integerDOUBLE
: double precision (64-bit) IEEE 754 floating-point numberSTRING
: a unicode character sequence (UTF8)
Decimal type¶
The DECIMAL
type can store numbers with a very large number of digits and perform calculations exactly.
It is recommended for storing monetary amounts and other quantities where exactness is required.
However, arithmetic on decimals is slow compared to integer and floating point types.
DECIMAL
types have a precision and scale.
The scale is the number of digits in the fractional part, to the right of the decimal point.
The precision is the total number of significant digits in the whole number, that is,
the number of digits on both sides of the decimal point.
For example, the number 765.937500
has a precision of 9 and a scale of 6.
To declare a column of type DECIMAL
use the syntax:
1 |
|
The precision must be positive, the scale zero or positive.
Array type¶
The ARRAY
type defines a variable-length array of elements. All elements in the array must be of
the same type.
To declare an ARRAY
use the syntax:
1 |
|
The element-type of an another SQL data type.
For example, the following creates an array of STRING
s:
1 |
|
Instances of an array can be created using the syntax:
1 |
|
For example, the following creates an array with three INT
elements:
1 |
|
Map type¶
The MAP
type defines a variable-length collection of key-value pairs. All keys in the map must be
of the same type. All values in the map must be of the same type.
To declare a MAP
use the syntax:
1 |
|
The key-type must currently be STRING
while the value-type can an any other SQL data type.
For example, the following creates a map with STRING
keys and values:
1 |
|
Instances of a map can be created using the syntax:
1 |
|
For example, the following creates a map with three key-value pairs:
1 |
|
Struct type¶
The STRUCT
type defines a list of named fields, where each field can have any SQL data type.
To declare a STRUCT
use the syntax:
1 |
|
The field-name can be any valid identifier. The field-type can be any valid SQL data type.
For example, the following creates a struct with an INT
field called FOO
and a BOOLEAN
field
call BAR
:
1 |
|
Instances of a struct can be created using the syntax:
1 |
|
For example, the following creates a struct with fields called FOO
and BAR
and sets their values
to 10
and true
, respectively:
1 |
|
Custom types¶
KsqlDB supports custom types using the CREATE TYPE
statements.
See the CREATE TYPE
docs for more information.