Query With Structured Data
ksqlDB enables querying structured, or nested, data, by using the STRUCT
data type. You use familiar syntax to declare and access structured
data, like mystruct STRUCT<fieldName1 type1, fieldName2 type2>
and
mystruct->fieldName1
.
The following example shows how to create a ksqlDB stream from an
Apache Kafka® topic that has structured data. Also, it shows how to run
queries to access the structured data.
- Set up the ksqlDB environment.
- Use the kafkacat utility to create and populate a new topic,
named
raw-topic
.
- Create a stream on the topic that models the topic's data.
- Inspect the stream to ensure that the data model matches the topic.
- Query the stream to access the structured data.
Set up the ksqlDB Environment
To set up ksqlDB, follow the first three steps in
Write Streaming Queries Against Apache Kafka® Using ksqlDB,
or if you have git and Docker installed already, run the following commands:
| # Step 1
git clone https://github.com/confluentinc/ksql.git
cd ksql
# Step 2
git checkout 0.7.1-post
# Step 3
cd docs/tutorials/
docker-compose up -d
|
After all of the Docker images are pulled, confirm that the ksqlDB and
Kafka containers are running:
Your output should resemble:
| Name Command State Ports
----------------------------------------------------------------------------------------------------
tutorials_kafka_1 /etc/confluent/docker/run Up 0.0.0.0:39092->39092/tcp, 9092/tcp
tutorials_ksql-server_1 /etc/confluent/docker/run Up 8088/tcp
tutorials_schema-registry_1 /etc/confluent/docker/run Up 8081/tcp
tutorials_zookeeper_1 /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
|
The ksqlDB environment is ready for you to develop event streaming
applications.
Create and Populate a New Topic With Structured Data
Use the kafkacat utility to create and populate a new topic,
named raw-topic
, with some records that have nested data. The records
are formatted as JSON arrays. For more information, see
kafkacat Utility.
| docker run --interactive --rm --network tutorials_default \
confluentinc/cp-kafkacat \
kafkacat -b kafka:39092 \
-t raw-topic \
-K: \
-P <<EOF
1:{"type":"key1","data":{"timestamp":"2018-12-21 23:58:42.1","field-a":1,"field-b":"first-value-for-key1"}}
2:{"type":"key2","data":{"timestamp":"2018-12-21 23:58:42.2","field-a":1,"field-c":11,"field-d":"first-value-for-key2"}}
3:{"type":"key1","data":{"timestamp":"2018-12-21 23:58:42.3","field-a":2,"field-b":"updated-value-for-key1"}}
4:{"type":"key2","data":{"timestamp":"2018-12-21 23:58:42.4","field-a":3,"field-c":22,"field-d":"updated-value-for-key2"}}
EOF
|
The nested structure is named data
and has five fields:
timestamp
, a string
field-a
, an integer
field-b
, a string
field-c
, an integer
field-d
, a string
In the following ksqlDB queries, the data
structure is modeled by using
the STRUCT type:
| DATA STRUCT<timestamp VARCHAR, "field-a" INT, "field-b" VARCHAR, "field-c" INT, "field-d" VARCHAR>
|
Double-quotes are necessary for the fieldnames that contain the -
character.
Note
Properties
is not a valid field name.
Create a Stream With Structured Data
Start the ksqlDB CLI:
| docker run --network tutorials_default --rm --interactive --tty \
confluentinc/ksqldb-cli:0.7.1 ksql \
http://ksql-server:8088
|
In the ksqlDB CLI, ensure that raw-topic
is available:
Your output should resemble:
| Kafka Topic | Partitions | Partition Replicas
------------------------------------------------------
_confluent-metrics | 12 | 1
_schemas | 1 | 1
raw-topic | 1 | 1
------------------------------------------------------
|
Inspect raw-topic
to ensure that kafkacat populated it:
| PRINT 'raw-topic' FROM BEGINNING;
|
Your output should resemble:
| Key format: KAFKA (STRING)
Value format: JSON
rowtime: 12/21/18 23:58:42 PM PSD, key: 1, value: {"type":"key1","data":{"timestamp":"2018-12-21 23:58:42.1","field-a":1,"field-b":"first-value-for-key1"}}
rowtime: 12/21/18 23:58:42 PM PSD, key: 2, value: {"type":"key2","data":{"timestamp":"2018-12-21 23:58:42.2","field-a":1,"field-c":11,"field-d":"first-value-for-key2"}}
rowtime: 12/21/18 23:58:42 PM PSD, key: 3, value: {"type":"key1","data":{"timestamp":"2018-12-21 23:58:42.3","field-a":2,"field-b":"updated-value-for-key1"}}
rowtime: 12/21/18 23:58:42 PM PSD, key: 4, value: {"type":"key2","data":{"timestamp":"2018-12-21 23:58:42.4","field-a":3,"field-c":22,"field-d":"updated-value-for-key2"}}
^CTopic printing ceased
|
Press Ctrl+C to stop printing the topic.
Run the following CREATE STREAM statement to register the topic with
ksqlDB:
| CREATE STREAM T (TYPE VARCHAR,
DATA STRUCT<
timestamp VARCHAR,
"field-a" INT,
"field-b" VARCHAR,
"field-c" INT,
"field-d" VARCHAR>)
WITH (KAFKA_TOPIC='raw-topic',
VALUE_FORMAT='JSON');
|
Your output should resemble:
| Message
----------------
Stream created
----------------
|
Run SQL Queries to Access the Structured Data
Run the following command to tell ksqlDB to read from the beginning of the
topic:
| SET 'auto.offset.reset' = 'earliest';
|
Run a SELECT query to inspect the T
stream:
| SELECT * FROM T EMIT CHANGES;
|
Your output should resemble:
| 1544042630406 | 1 | key1 | {TIMESTAMP=2018-12-21 23:58:42.1, field-a=1, field-b=first-value-for-key1, field-c=null, field-d=null}
1544042630406 | 2 | key2 | {TIMESTAMP=2018-12-21 23:58:42.2, field-a=1, field-b=null, field-c=11, field-d=first-value-for-key2}
1544042630406 | 3 | key1 | {TIMESTAMP=2018-12-21 23:58:42.3, field-a=2, field-b=updated-value-for-key1, field-c=null, field-d=null}
1544042630406 | 4 | key2 | {TIMESTAMP=2018-12-21 23:58:42.4, field-a=3, field-b=null, field-c=22, field-d=updated-value-for-key2}
^CQuery terminated
|
Press Ctrl+C to cancel the SELECT query.
Note
ksqlDB assigns null
to the fields that were omitted when kafkacat
populated raw-topic
, like field-c
and field-d
in record key1
.
Query field-a
and field-b
by using the ->
operator to access the
nested elements:
| SELECT DATA->"field-a", DATA->"field-b" FROM T WHERE TYPE='key1' EMIT CHANGES LIMIT 2;
|
Your output should resemble:
| 1 | first-value-for-key1
2 | updated-value-for-key1
Limit Reached
Query terminated
|
Query the other nested elements:
| SELECT DATA->"field-a", DATA->"field-c", DATA->"field-d" FROM T WHERE TYPE='key2' EMIT CHANGES LIMIT 2;
|
Your output should resemble:
| 1 | 11 | first-value-for-key2
3 | 22 | updated-value-for-key2
Limit Reached
Query terminated
|
Create persistent queries based on the previous SELECT statements. In
this example, two different queries are used to separate the input data
into two new streams.
| CREATE STREAM TYPE_1 AS SELECT DATA->"field-a", DATA->"field-b" FROM T WHERE TYPE='key1' EMIT CHANGES;
|
| CREATE STREAM TYPE_2 AS SELECT DATA->"field-a", DATA->"field-c",DATA->"field-d" FROM T2 WHERE TYPE='key2' EMIT CHANGES;
|
For both statements, your output should resemble:
| Message
----------------------------
Stream created and running
----------------------------
|
Inspect the schema of the TYPE_1
stream:
Your output should resemble:
| Name : TYPE_1
Field | Type
-------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__field-a | INTEGER
DATA__field-b | VARCHAR(STRING)
-------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
|
Inspect the schema of the TYPE_2
stream:
Your output should resemble:
| Name : TYPE_2
Field | Type
-------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
DATA__field-a | INTEGER
DATA__field-c | INTEGER
DATA__field-d | VARCHAR(STRING)
-------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
|
Next Steps
Page last revised on: 2020-04-29
Last update:
2020-04-29