How to convert a changelog to a table
Context
You have a stream of events that represent a series of changes, known as a changelog. You want a view of the data that reflects only the last change for each key. Because ksqlDB represents change over time using tables, you need a way to convert your changelog stream into a table. This is broadly called materializing a changelog stream into a table.
In action
| CREATE TABLE t1 AS
SELECT k,
LATEST_BY_OFFSET(v1) AS v1,
LATEST_BY_OFFSET(v2) AS v2,
LATEST_BY_OFFSET(v3) AS v3
FROM s1
GROUP BY k
EMIT CHANGES;
|
Materializing a changelog stream
In ksqlDB, you derive new tables by aggregating other streams and tables. To create a table that reflects the latest values for each key, use the LATEST_BY_OFFSET
aggregation.
Begin by telling ksqlDB to start all queries from the earliest point in each topic.
| SET 'auto.offset.reset' = 'earliest';
|
Make a stream s1
with four columns. k
represents the key of the table. Rows with the same key represent information about the same entity. v1
, v2
, and v3
are various value columns.
| CREATE STREAM s1 (
k VARCHAR KEY,
v1 INT,
v2 VARCHAR,
v3 BOOLEAN
) WITH (
kafka_topic = 's1',
partitions = 1,
value_format = 'avro'
);
|
Insert some rows into s1
, repeating entries for some of the keys.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 | INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k1', 0, 'a', true
);
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k2', 1, 'b', false
);
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k1', 2, 'c', false
);
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k3', 3, 'd', true
);
INSERT INTO s1 (
k, v1, v2, v3
) VALUES (
'k2', 4, 'e', true
);
|
Derive a table, t1
, from stream s1
. When you create a table, the columns in the SELECT
clause must either be the column that you're grouping by or columns with an aggregation function applied. The LATEST_BY_OFFSET
aggregation allows you to select any column and retains only the last value it receives, where "last" is in terms of offsets.
| CREATE TABLE t1 AS
SELECT k,
LATEST_BY_OFFSET(v1) AS v1,
LATEST_BY_OFFSET(v2) AS v2,
LATEST_BY_OFFSET(v3) AS v3
FROM s1
GROUP BY k
EMIT CHANGES;
|
Run a pull query against each key. Notice how for each key, the columns reflect the last set of values that were inserted.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 | SELECT k, v1, v2, v3 FROM t1 WHERE k='k1';
+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+
|K |V1 |V2 |V3 |
+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+
|k1 |2 |c |false |
SELECT k, v1, v2, v3 FROM t1 WHERE k='k2';
+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+
|K |V1 |V2 |V3 |
+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+
|k2 |4 |e |true |
SELECT k, v1, v2, v3 FROM t1 WHERE k='k3';
+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+
|K |V1 |V2 |V3 |
+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+
|k3 |3 |d |true |
|
Last update:
2021-12-05