Schema Inference With ID
For supported serialization formats, ksqlDB can use 
schema inference
to retrieve (read) and register (write) schemas as needed. If you specify a
KEY_SCHEMA_ID or VALUE_SCHEMA_ID explicitly in the CREATE statements,
ksqlDB retrieves and registers the schema specified by the ID from Schema Registry,
and it also serializes data using exactly the same schema referred to by the ID.
This can spare you from defining columns and data types manually and also make
sure the data are serialized by the specified physical schema, which can be
consumed in downstream systems. Before using schema inference with explicit IDs
in ksqlDB, make sure that the Schema Registry is up and running and ksqlDB is
configured to use it.
Here's what you can do with schema inference with IDs in ksqlDB:
- Declare streams and tables on Kafka topics with supported key and
  value formats by using CREATE STREAMandCREATE TABLEstatements withKEY_SCHEMA_IDorVALUE_SCHEMA_IDproperties, without the need to declare the key and value columns.
- Declare derived views with CREATE STREAM AS SELECTandCREATE TABLE AS SELECTstatements withKEY_SCHEMA_IDorVALUE_SCHEMA_IDproperties. The schema of the view is registered in Schema Registry automatically.
- Serialize output data using the schema referred to by KEY_SCHEMA_IDorVALUE_SCHEMA_ID, instead of the logical data source schema stored in ksqlDB.
If you're declaring a stream or table with a key format that's different from
its value format, and only one of the two formats supports schema inference,
you can explicitly provide the columns for the format that doesn't support
schema inference while still having ksqlDB load columns for the format that
does support schema inference from Schema Registry. This is known as
partial schema inference. To infer value columns for a keyless stream, set
the key format to NONE.
Tables require a PRIMARY KEY, so you must supply one explicitly in your
CREATE TABLE statement. KEY columns are optional for streams, so if you
don't supply one, the stream is created without a key column.
Note
The following example statements show how to create streams and tables that
have Avro-formatted data. If you want to use Protobuf or JSON-formatted data,
substitute PROTOBUF or JSON_SR for AVRO in each statement.
Create a new stream or table¶
When KEY_SCHEMA_ID or VALUE_SCHEMA_ID is used in statements to create
a stream or table, the schema fetched from Schema Registry is used to infer data
source's columns and serialize output data. See
Schema inference and data serialization
for details about how columns are inferred and data are serialized.
Important
- The schemas referred to by KEY_SCHEMA_IDandVALUE_SCHEMA_IDmust be registered in Schema Registry. They can be under any subject but must match the formats defined byKEY_FORMATandVALUE_FORMAT, respectively.
- You can't define key or value columns in a statement if a corresponding
  KEY_SCHEMA_IDorVALUE_SCHEMA_IDis supplied.
Without a key column¶
The following statement shows how to create a new pageviews stream by
reading from a Kafka topic that has Avro-formatted message values.
| 1 2 3 4 5 6 |  | 
In this example, you don't need to define any columns in the CREATE statement.
ksqlDB infers this information automatically from Schema Registry using the
provided VALUE_SCHEMA_ID.
With a key column¶
The following statement shows how to create a new pageviews stream by reading
from a Kafka topic that has Avro-formatted key and message values.
| 1 2 3 4 5 6 7 |  | 
In this example, ksqlDB infers the key and value columns automatically from
Schema Registry using the provided KEY_SCHEMA_ID and VALUE_SCHEMA_ID.
With partial schema inference¶
The following statement shows how to create a new pageviews table by reading
from a Kafka topic that has Avro-formatted message values and a
KAFKA-formatted INT primary key.
| 1 2 3 4 5 6 7 8 |  | 
In this example, only the key column is supplied in the CREATE statement.
ksqlDB infers the value columns automatically from Schema Registry using the
provided VALUE_SCHEMA_ID.
Declaring a derived view with schema ID.¶
The following statement shows how to create a materialized view derived from
an existing source with the VALUE_SCHEMA_ID property. The schema referred to
by VALUE_SCHEMA_ID is used to check column compatibility with output columns
and serialize output data. For more information, see
Schema inference and data serialization.
| 1 2 3 4 5 6 7 8 9 |  | 
Important
The schema referred to by VALUE_SCHEMA_ID must be compatible with the
logical schema defined by the SELECT clause. For more information, see
Schema inference and data serialization.
Schema inference and data serialization¶
The schema in Schema Registry is a "physical schema", and the schema in ksqlDB is
a "logical schema". The physical schema, not the logical schema, is registered
under the subject <topic-name>-key or <topic-name>-value if corresponding
KEY_SCHEMA_ID or VALUE_SCHEMA_ID values are provided.
Schema inference schema requirements¶
If WRAP_SINGLE_VALUE is set to true in the SQL statement, the physical
schema is expected to be a struct type, and the field names are used as data
source column names. Field types are inferred from corresponding column data
types.
- In AVRO, thestructtype corresponds with therecordtype.
- In PROTOBUFthestructtype corresponds with themessagetype.
- In JSON_SR, thestructtype corresponds with theobjecttype.
Note
In the following examples, the AVRO schema string in Schema Registry
is a single-line raw string without newline characters (\n). The 
strings are shown as human-readable text for convenience.
For example, the following a physical schema is in AVRO format and is
registered with Schema Registry with ID 1:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |  | 
The following CREATE statement defines a stream on the pageviews topic
and specifies the physical schema that has an ID of 1.
| 1 2 3 4 5 6 7 8 9 |  | 
The following output from the describe pageviews command shows the inferred
logical schema for the pageviews stream:
| 1 2 3 4 5 6 7 8 9 |  | 
If WRAP_SINGLE_VALUE is false in the statement, and if KEY_SCHEMA_ID is
set, ROWKEY is used as the key's column name.
If VALUE_SCHEMA_ID is set, ROWVAL is used as the value's column name. The physical
schema is used as the column data type.
For example, the following physical schema is AVRO and is defined in Schema Registry
with ID 2:
| 1 |  | 
The following CREATE statement defines a table on the pageview-count topic
and specifies the physical schema that has ID 2.
| 1 2 3 4 5 6 7 8 9 10 |  | 
The inferred logical schema for the pageview_count table is:
| 1 2 3 4 5 6 |  | 
For more information about WRAP_SINGLE_VALUE, see 
Single Field (un)wrapping.
Schema Inference Type Handling¶
ksqlDB supports the null value for KEY and VALUE columns. If a field
in the physical schema is a required type, it's translated to an optional type
in the logical schema. This has subtle implications for data serialization
which are explained in the following section.
Important
- ksqlDB ignores unsupported types in the physical schema and continues translating supported types to the logical schema. You should verify that the logical schema is translated as expected.
- During schema translation from a physical schema to a logical schema,
  structtype field names are used as column names in the logical schema. Field names are not translated to uppercase, in contrast with schema inference without a schema id, which does translate field names to uppercase.
Schema Compatibility Check for Derived View¶
You can use schema IDs when creating a materialized view, but instead of inferring the logical schema for the view, the schema is used to check compatibility against the query's projection and serialized output data. For compatibility checks, the inferred logical schema must be a superset of the query's projection schema, which means corresponding column names, types, and order must match. The inferred logical schema may have extra columns.
The following example creates the pageviews_new topic as the result of a
SELECT query:
| 1 2 3 4 5 6 7 8 9 |  | 
If the pageviews value column has the type ts INT, the logical schema of
pageviews_new is decided by the projection in the query
SELECT pageId, ts FROM pageviews. When VALUE_SCHEMA_ID is used, 
the inferred logical schema is checked against ts INT for compatibility. 
The following example shows a compatible physical schema:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |  | 
In this AVRO schema, title is an extra field. Because the physical schema
is used for data serialization, the title field with a default value appears
in serialized data, even though the inserted value can never set the title
field, because it's not in the logical schema defined by the SELECT clause of
the query.
The following example shows an incompatible physical schema, which is
incompatible because of the type mismatch for pageId.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |  | 
Data Serialization¶
When a schema ID is provided, and schema inference is successful, ksqlDB can
create the data source. When writing to the data source, the physical schema
inferred by the schema ID is used to serialize data, instead of the logical
schema that's used in other cases. Because ksqlDB's logical schema accepts
null values but the physical schema may not, serialization can fail even if
the inserted value is valid for the logical schema.
The following example shows a physical schema that's defined in Schema Registry
with ID 1. No default values are specified for the page_name and ts
fields. 
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |  | 
The following example creates a stream with schema ID 1:
| 1 2 3 4 5 6 7 8 9 |  | 
ksqlDB infers the following schema for pageviews:
| 1 2 3 4 5 6 7 |  | 
If you insert values to pageviews with null values, ksqlDB returns an
error:
| 1 |  | 
| 1 |  | 
This error occurs because page_name and ts are required fields without
default values in the specified physical schema.
Important
ksqlDB doesn't check that null can be serialized in a physical schema
that contains required fields. You must ensure that null can be handled
properly, either by making physical schema fields optional or by using the
IFNULL
function to ensure that null is never inserted.
Important
If key_schema_id is used in table creation with windowed aggregation, the serialized key value
also contain window information in addition to original key value.