Skip to content

ksqlDB can read and write messages that have Avro, Protobuf, or JSON schemas by integrating with Confluent Schema Registry. ksqlDB automatically retrieves (reads) and registers (writes) schemas as needed, which saves you from having to manually define columns and data types in SQL and from manual interaction with Schema Registry.

Supported functionality

ksqlDB supports Avro, Protobuf, and JSON data in the Apache Kafka® message values.

Schemas with nested fields are supported. You can read nested data, in Avro, Protobuf, and JSON formats, by using the STRUCT type. Also, you can create new nested STRUCT data as the result of a query. For more info, see STRUCT.

The following functionality is not supported:

  • Message keys in Avro, Protobuf, or JSON formats are not supported. Message keys in ksqlDB are always interpreted as KAFKA format, which means ksqlDB ignores schemas that have been registered for message keys.

Although ksqlDB doesn't support loading the message key's schema from Schema Registry, you can provide the key column definition within the CREATE TABLE or CREATE STREAM statement.

Tables require a PRIMARY KEY, so you must supply one in your CREATE TABLE statement. KEY columns are optional for streams, so if you do not supply one the stream will be created without any key columns.

Configure ksqlDB for Avro, Protobuf, and JSON

You must configure the REST endpoint of Schema Registry by setting ksql.schema.registry.url (default: http://localhost:8081) in the ksqlDB Server configuration file (<path-to-confluent>/etc/ksqldb/ksql-server.properties). For more information, see Installation Instructions.

Important

Don't use the SET statement in the ksqlDB CLI to configure the registry endpoint.

Use Avro, Protobuf, or JSON in ksqlDB

Before using Avro, Protobuf, or JSON in ksqlDB, make sure that Schema Registry is up and running and that ksql.schema.registry.url is set correctly in the ksqlDB properties file (defaults to http://localhost:8081). Schema Registry is included by default with Confluent Platform.

Important

By default, ksqlDB-registered schemas have the same name (KsqlDataSourceSchema) and the same namespace (io.confluent.ksql.avro_schemas). You can override this behavior by providing a VALUE_AVRO_SCHEMA_FULL_NAME property in the WITH clause, where you set the VALUE_FORMAT to 'AVRO'. As the name suggests, this property overrides the default name/namespace with the provided one. For example, com.mycompany.MySchema registers a schema with the MySchema name and the com.mycompany namespace.

Here's what you can do with Avro, Protobuf, and JSON in ksqlDB:

  • Declare streams and tables on Kafka topics with Avro- or Protobuf- formatted data by using CREATE STREAM and CREATE TABLE statements.
  • Read from and write into Avro- or Protobuf-formatted data by using CREATE STREAM AS SELECT and CREATE TABLE AS SELECT statements.
  • Create derived streams and tables from existing streams and tables with CREATE STREAM AS SELECT and CREATE TABLE AS SELECT statements.
  • Convert data to different formats with CREATE STREAM AS SELECT and CREATE TABLE AS SELECT statements. For example, you can convert a stream from Avro to JSON.

Example SQL statements with Avro

The following statements show how to create streams and tables that have Avro-formatted data. If you want to use Protobuf- or JSON-formatted data, substitute PROTOBUF or JSON_SR for AVRO in each statement.

Note

ksqlDB handles the JSON and JSON_SR formats differently. Use JSON_SR when you need schema validation by Schema Registry. If you don't need schema validation, you can use JSON.

Create a new stream by reading Avro-formatted data

Without a key column

The following statement shows how to create a new pageviews stream by reading from a Kafka topic that has Avro-formatted message values.

1
2
3
4
5
CREATE STREAM pageviews
  WITH (
    KAFKA_TOPIC='pageviews-avro-topic',
    VALUE_FORMAT='AVRO'
  );

With a key column

The following statement shows how to create a new pageviews stream by reading from a Kafka topic that has Avro-formatted message values and a KAFKA-formatted INT message key.

1
2
3
4
5
6
CREATE STREAM pageviews (
    pageId INT KEY
  ) WITH (
    KAFKA_TOPIC='pageviews-avro-topic',
    VALUE_FORMAT='AVRO'
  );

Create a new table by reading Avro-formatted data

The following statement shows how to create a new users table by reading from a Kafka topic that has Avro-formatted message values and a KAFKA-formatted BIGINT message key.

1
2
3
4
5
6
CREATE TABLE users (
    userId BIGINT PRIMARY KEY
  ) WITH (
    KAFKA_TOPIC='users-avro-topic',
    VALUE_FORMAT='AVRO'
  );

In this example, you don't need to define any value columns or data types in the CREATE statement. ksqlDB infers this information automatically from the latest registered schema for the pageviews-avro-topic topic. ksqlDB uses the most recent schema at the time the statement is first executed.

Note

The schema must be registered in Schema Registry under the subject users-avro-topic-value.

Create a new stream with selected fields of Avro-formatted data

If you want to create a STREAM or TABLE with only a subset of all the available fields in the Avro schema, you must explicitly define the columns and data types.

The following statement shows how to create a new pageviews_reduced stream, which is similar to the previous example, but with only a few of the available fields in the Avro data. In this example, only the viewtime and pageid columns are picked.

1
2
3
4
5
6
7
CREATE STREAM pageviews_reduced (
    pageid VARCHAR KEY, 
    viewtime BIGINT
  ) WITH (
    KAFKA_TOPIC='pageviews-avro-topic',
    VALUE_FORMAT='AVRO'
  );

Convert a JSON Stream to an Avro stream

ksqlDB enables you to work with streams and tables regardless of their underlying data format. This means that you can easily mix and match streams and tables with different data formats and also convert between data formats. For example, you can join a stream backed by Avro data with a table backed by JSON data.

In this example, only the VALUE_FORMAT is required for Avro to achieve the data conversion. ksqlDB automatically generates an appropriate Avro schema for the new pageviews_avro stream, and it registers the schema with Schema Registry.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE STREAM pageviews_json (
    pageid VARCHAR KEY, 
    viewtime BIGINT, 
    userid VARCHAR
  ) WITH (
    KAFKA_TOPIC='pageviews_kafka_topic_json', 
    VALUE_FORMAT='JSON'
  );

CREATE STREAM pageviews_avro
  WITH (VALUE_FORMAT = 'AVRO') AS
  SELECT * FROM pageviews_json
  EMIT CHANGES;

For more information, see Changing Data Serialization Format from JSON to Avro in the Stream Processing Cookbook.


Last update: 2020-06-23