How to convert a changelog to a table¶
Context¶
You have a topic or a stream of events that represent a series of changes, known as a changelog. You want a view of the data that reflects only the last change for each key. Because ksqlDB represents change over time using tables, you need a way to convert your changelog into a table. This is broadly called materializing a changelog into a table.
Materializing a changelog topic¶
If you have a changelog topic, and you want a view of the data that reflects the latest values for each key then simply create a table with the changelog topic using the CREATE SOURCE TABLE statement.
Let's say that you have the following data in your changelog topic where the first row is the record that has the earliest offset and the last row is the record that has the latest offset:
1 2 3 4 5 6 7 8 | |
k1 and k2 in the changelog topic.
Make a table latest_view with four columns. k represents the key of the table. Rows with the same key represent information about the same entity. v1, v2, and v3 are various value columns.
1 2 3 4 5 6 7 8 9 10 | |
Now, you can view the latest values for each key in your changelog topic by issuing a pull query against the latest_view table that you created above:
1 2 3 4 5 6 7 | |
Notice how for each key, the columns reflect the latest set of values.
If you want to look up the latest value for a particular key, for example, k2, issue a pull query for that key:
1 2 3 4 5 | |
Materializing a changelog STREAM¶
The best way to materialize an input topic as a table is the CREATE SOURCE TABLE statement. If you already have a STREAM, you can also materialize it as a table with the CREATE TABLE ... AS ... statement. This is useful if you have a STREAM of events in ksqlDB that represents a series of changes called changelog_stream, and you want a view of the data that reflects the latest values for each key.
Begin by telling ksqlDB to start all queries from the earliest point in each topic:
1 | |
Let's mimic adding a few records to the changelog_stream by using the INSERT INTO statement:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | |
You can view the data in the changelog_stream by issuing a pull query:
1 2 3 4 5 6 7 8 9 10 | |
To view the data that reflects only the last change for each key in changelog_stream, you can derive a table called latest_events with a CREATE TABLE AS SELECT statement in conjunction with the LATEST_BY_OFFSET aggregation:
1 2 3 4 5 6 7 8 | |
Now, you can view the latest values for each key in your changelog_stream by issuing a pull query against the latest_events table that you created above:
1 2 3 4 5 6 7 | |
Notice how for each key, the columns reflect the latest set of values.