Serialization
The term serialization refers to the manner in which an event's raw bytes are translated to and from information structures that ksqlDB can understand at runtime. ksqlDB offers several mechanisms for controlling serialization and deserialization.
The primary mechanism is by choosing the serialization format when you
create a stream or table and specify the VALUE_FORMAT
in the WITH
clause.
While ksqlDB supports different value formats, it requires keys to be KAFKA
format.
1 |
|
Serialization Formats¶
ksqlDB supports these serialization formats:
DELIMITED
supports comma separated values. See DELIMITED below.JSON
supports JSON values. See JSON below.AVRO
supports AVRO serialized values. See AVRO below.KAFKA
supports primitives serialized using the standard Kafka serializers. See KAFKA below.PROTOBUF
supports Protocol Buffers. See Protobuf below.
DELIMITED¶
The DELIMITED
format supports comma separated values.
The serialized object should be a Kafka-serialized string, which will be split into columns.
For example, given a SQL statement such as:
1 |
|
ksqlDB splits a value of 120, bob, 49
into the three fields with ID
of
120
, NAME
of bob
and AGE
of 49
.
This data format supports all SQL
data types except ARRAY
, MAP
and
STRUCT
.
JSON¶
The JSON
format supports JSON values.
The JSON format supports all SQL
data types.
As JSON doesn't itself support a map type, ksqlDB serializes MAP
types as
JSON objects. Because of this the JSON format can only support MAP
objects
that have STRING
keys.
The serialized object should be a Kafka-serialized string containing a valid JSON value. The format supports JSON objects and top-level primitives, arrays and maps. See below for more info.
Note
If you want to use a JSON-based schema with Schema Registry, specify the
JSON_SR
format.
JSON Objects¶
Values that are JSON objects are probably the most common.
For example, given a SQL statement such as:
1 |
|
And a JSON value of:
1 2 3 4 5 |
|
ksqlDB deserializes the JSON object's fields into the corresponding fields of the stream.
Top-level primitives, arrays and maps¶
The JSON format supports reading and writing top-level primitives, arrays and maps.
For example, given a SQL statement with only a single field in the
value schema and the WRAP_SINGLE_VALUE
property set to false
:
1 |
|
And a JSON value of:
1 |
|
ksqlDB can deserialize the values into the ID
field of the stream.
When serializing data with a single field, ksqlDB can serialize the field
as an anonymous value if the WRAP_SINGLE_VALUE
is set to false
, for
example:
1 |
|
For more information, see Single field (un)wrapping.
Field Name Case Sensitivity¶
The format is case-insensitive when matching a SQL field name with a JSON document's property name. The first case-insensitive match is used.
Avro¶
The AVRO
format supports Avro binary serialization of all SQL
data types, including records and
top-level primitives, arrays, and maps.
The format requires ksqlDB to be configured to store and retrieve the Avro schemas from the Confluent Schema Registry. For more information, see Configure ksqlDB for Avro or Protobuf.
Avro Records¶
Avro records can be deserialized into matching ksqlDB schemas.
For example, given a SQL statement such as:
1 |
|
And an Avro record serialized with the schema:
1 2 3 4 5 6 7 8 9 10 |
|
ksqlDB deserializes the Avro record's fields into the corresponding fields of the stream.
Top-level primitives, arrays and maps¶
The Avro format supports reading and writing top-level primitives, arrays and maps.
For example, given a SQL statement with only a single field in the
value schema and the WRAP_SINGLE_VALUE
property set to false
:
1 |
|
And an Avro value serialized with the schema:
1 2 3 |
|
ksqlDB can deserialize the values into the ID
field of the stream.
When serializing data with a single field, ksqlDB can serialize the field
as an anonymous value if the WRAP_SINGLE_VALUE
is set to false
, for
example:
1 |
|
For more information, see Single field (un)wrapping.
Field Name Case Sensitivity¶
The format is case-insensitive when matching a SQL field name with an Avro record's field name. The first case-insensitive match is used.
KAFKA¶
The KAFKA
format supportsINT
, BIGINT
, DOUBLE
and STRING
primitives that have been serialized using Kafka's standard set of
serializers.
The format is designed primarily to support primitive message keys. It can be used as a value format, though certain operations aren't supported when this is the case.
Unlike some other formats, the KAFKA
format does not perform any type
coercion, so it's important to correctly match the field type to the
underlying serialized form to avoid deserialization errors.
The table below details the SQL types the format supports, including
details of the associated Kafka Java Serializer, Deserializer and
Connect Converter classes you would need to use to write the key to
Kafka, read the key from Kafka, or use to configure Apache Connect to
work with the KAFKA
format, respectively.
SQL Field Type | Kafka Type | Kafka Serializer | Kafka Deserializer | Connect Converter |
---|---|---|---|---|
INT / INTEGER | A 32-bit signed integer | org.apache.kafka.common.serialization.IntegerSerializer |
org.apache.kafka.common.serialization.IntegerDeserializer |
org.apache.kafka.connect.storage.IntegerConverter |
BIGINT | A 64-bit signed integer | org.apache.kafka.common.serialization.LongSerializer |
org.apache.kafka.common.serialization.LongDeserializer |
org.apache.kafka.connect.storage.LongConverter |
DOUBLE | A 64-bit floating point number | org.apache.kafka.common.serialization.DoubleSerializer |
org.apache.kafka.common.serialization.DoubleDeserializer |
org.apache.kafka.connect.storage.DoubleConverter |
STRING / VARCHAR | A UTF-8 encoded text string | org.apache.kafka.common.serialization.StringSerializer |
org.apache.kafka.common.serialization.StringDeserializer |
org.apache.kafka.connect.storage.StringConverter |
Because the format supports only primitive types, you can only use it when the schema contains a single field.
For example, if your Kafka messages have a long
key, you can make
them available to ksqlDB by using a statement like:
1 |
|
If you integrate ksqlDB with Confluent Schema Registry, and your ksqlDB application uses a compatible value format (Avro, JSON_SR, or Protobuf), you can just supply the key column, and ksqlDB loads the value columns from Schema Registry:
1 |
|
The key column must be supplied, because ksqlDB supports only keys in KAFKA
format.
Protobuf¶
Protobuf handles null
values differently than AVRO and JSON. Protobuf doesn't
have the concept of a null
value, so the conversion between PROTOBUF and Java
(Kafka Connect) objects is undefined. Usually, Protobuf resolves a
"missing field" to the default value of its type.
- String: the default value is the empty string.
- Byte: the default value is empty bytes.
- Bool: the default value is
false
. - Numeric type: the default value is zero.
- Enum: the default value is the first defined enum value, which must be zero.
- Message field: the field is not set. Its exact value is language-dependent. See the generated code guide for details.
Decimal Serialization¶
ksqlDB accepts Decimals that are serialized either as numbers, or the text representation of the base 10 equivalent. For example, ksqlDB can read data from both formats below:
1 2 3 4 |
|
Decimals with specified precision and scale are serialized as JSON floating point numbers. For example:
1 2 3 |
|
Single field (un)wrapping¶
Note
The DELIMITED
and KAFKA
formats don't support single-field
unwrapping.
Controlling deserializing of single fields¶
When ksqlDB deserializes a Kafka message into a row, the key is deserialized into the key field, and the message's value is deserialized into the value fields.
By default, ksqlDB expects any value with a single-field schema to have been serialized as a named field within a record. However, this is not always the case. ksqlDB also supports reading data that has been serialized as an anonymous value.
For example, a value with multiple fields might look like the following in JSON:
1 2 3 4 |
|
If the value only had the id
field, ksqlDB would still expect the value
to be serialized as a named field, for example:
1 2 3 |
|
If your data contains only a single field, and that field is not wrapped
within a JSON object, or an Avro record is using the AVRO
format, then
you can use the WRAP_SINGLE_VALUE
property in the WITH
clause of
your CREATE TABLE or
CREATE STREAM statements. Setting the
property to false
tells ksqlDB that the value isn't wrapped, so the
example above would be a JSON number:
1 |
|
For example, the following creates a table where the values in the underlying topic have been serialized as an anonymous JSON number:
1 |
|
If a statement doesn't set the value wrapping explicitly, ksqlDB uses the
system default, which is defined by ksql.persistence.wrap.single.values
.
You can change the system default. For more information, see
ksql.persistence.wrap.single.values.
Important
ksqlDB treats null
keys and values as a special case. We recommend
avoiding unwrapped single-field schemas if the field can have a null
value.
A null
value in a table's topic is treated as a tombstone, which
indicates that a row has been removed. If a table's source topic has an
unwrapped single-field key schema and the value is null
, it's treated
as a tombstone, resulting in any previous value for the key being
removed from the table.
A null
key or value in a stream's topic is ignored when the stream is
part of a join. A null
value in a table's topic is treated as a
tombstone, and a null
key is ignored when the table is part of a join.
When you have an unwrapped single-field schema, ensure that any null
key or value has the desired result.
Controlling serialization of single fields¶
When ksqlDB serializes a row into a Kafka message, the key field is serialized into the message's key, and any value fields are serialized into the message's value.
By default, if the value has only a single field, ksqlDB serializes the single field as a named field within a record. However, this doesn't always match the requirements of downstream consumers, so ksqlDB allows the value to be serialized as an anonymous value.
For example, consider the statements:
1 2 |
|
The second statement defines a stream with only a single field in the
value, named f0
.
By default, when ksqlDB writes out the result to Kafka, it persists the
single field as a named field within a JSON object, or an Avro record if
using the AVRO
format:
1 2 3 |
|
If you require the value to be serialized as an anonymous value, for example:
1 |
|
Then you can use the WRAP_SINGLE_VALUE
property in your statement.
For example,
1 |
|
If a statement doesn't set the value wrapping explicitly, ksqlDB uses the
system default, defined by ksql.persistence.wrap.single.values
. You
can change the system default. For more information, see
ksql.persistence.wrap.single.values.
Important
ksqlDB treats null
keys and values as a special case. We recommended
avoiding unwrapped single-field schemas if the field can have a null
value.
A null
value in a table's topic is treated as a tombstone, which
indicates that a row has been removed. If a table's source topic has an
unwrapped single-field key schema and the value is null
, it's treated
as a tombstone, resulting in any previous value for the key being
removed from the table.
A null
key or value in a stream's topic is ignored when the stream is
part of a join. A null
value in a table's topic is treated as a
tombstone, and a null
key is ignored when the table is part of a join.
When you have an unwrapped single-field schema, ensure that any null
key or value has the desired result.
Single-field serialization examples¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
|
Page last revised on: 2020-05-05