How to convert a changelog to a table¶
Context¶
You have a topic or a stream of events that represent a series of changes, known as a changelog. You want a view of the data that reflects only the last change for each key. Because ksqlDB represents change over time using tables, you need a way to convert your changelog into a table. This is broadly called materializing a changelog into a table.
Materializing a changelog topic¶
If you have a changelog
topic, and you want a view of the data that reflects the latest values for each key then simply create a table with the changelog
topic using the CREATE SOURCE TABLE
statement.
Let's say that you have the following data in your changelog
topic where the first row is the record that has the earliest offset and the last row is the record that has the latest offset:
1 2 3 4 5 6 7 8 |
|
k1
and k2
in the changelog
topic.
Make a table latest_view
with four columns. k
represents the key of the table. Rows with the same key represent information about the same entity. v1
, v2
, and v3
are various value columns.
1 2 3 4 5 6 7 8 9 10 |
|
Now, you can view the latest values for each key in your changelog
topic by issuing a pull query against the latest_view
table that you created above:
1 2 3 4 5 6 7 |
|
Notice how for each key, the columns reflect the latest set of values.
If you want to look up the latest value for a particular key, for example, k2
, issue a pull query for that key:
1 2 3 4 5 |
|
Materializing a changelog STREAM
¶
The best way to materialize an input topic as a table is the CREATE SOURCE TABLE
statement. If you already have a STREAM
, you can also materialize it as a table with the CREATE TABLE ... AS ...
statement. This is useful if you have a STREAM
of events in ksqlDB that represents a series of changes called changelog_stream
, and you want a view of the data that reflects the latest values for each key.
Begin by telling ksqlDB to start all queries from the earliest point in each topic:
1 |
|
Let's mimic adding a few records to the changelog_stream
by using the INSERT INTO
statement:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
|
You can view the data in the changelog_stream
by issuing a pull query:
1 2 3 4 5 6 7 8 9 10 |
|
To view the data that reflects only the last change for each key in changelog_stream
, you can derive a table called latest_events
with a CREATE TABLE AS SELECT
statement in conjunction with the LATEST_BY_OFFSET
aggregation:
1 2 3 4 5 6 7 8 |
|
Now, you can view the latest values for each key in your changelog_stream
by issuing a pull query against the latest_events
table that you created above:
1 2 3 4 5 6 7 |
|
Notice how for each key, the columns reflect the latest set of values.