Stream Processing
Creating a collection with an enforced schema over a new or existing Apache Kafka® topic is useful but has limited utility by itself for creating an application. When you declare a collection, you can only work with the events in their current form. But a critical part of creating streaming applications is transforming, filtering, joining, and aggregating events.
In ksqlDB, you manipulate events by deriving new collections from existing ones and describing the changes between them. When a collection is updated with a new event, ksqlDB updates the collections that are derived from it in real-time. This rich form of computing is known formally as stream processing, because it creates programs that operate continually over unbounded streams of events, ad infinitum. These processes stop only when you explicitly terminate them.
The general pattern for stream processing in ksqlDB is to create a new
collection by using the SELECT
statement on an existing collection. The
result of the inner SELECT
feeds into the outer declared collection. You
don't need to declare a schema when deriving a new collection, because ksqlDB
infers the column names and types from the inner SELECT
statement.
The value of the ROWTIME
pseudo column
defines the timestamp of the record written to Kafka, and the value of
the ROWPARTITION
and ROWOFFSET
pseudo columns define the partition and offset
of the source record, respectively. The value of system columns
can not be set in the SELECT
.
Here are a few examples of deriving between the different collection types.
Derive a new stream from an existing stream¶
Given the following stream:
1 2 |
|
You can derive a new stream with all of the song titles transformed to uppercase:
1 2 3 4 |
|
Each time a new song is inserted into the rock_songs
topic, the uppercase
version of the title is appended to the title_cased_songs
stream.
Deriving a new table from an existing stream¶
Given the following table and stream:
1 2 3 4 5 |
|
You can create a table that aggregates rows from the orders
stream, while
also joining the stream on the products
table to enrich the orders
data:
1 2 3 4 |
|
This aggregate table keeps track of the total number of orders of each product, along with the total amount of revenue generated by each product.
Deriving a new table from an existing table¶
Given the following aggregate table:
1 2 3 |
|
You can derive another table that filters out rows from the page_view_metrics
table:
1 2 3 |
|
Deriving a new stream from multiple streams¶
Given the following two streams:
1 2 3 4 5 |
|
You can create a derived stream that joins the impressions
and clicks
streams to output rows indicating that a given impression has been clicked
within one minute of the initial ad impression:
1 2 3 4 |
|
Any time an impressions
row is received, followed within one minute by a
clicks
row having the same user
, a row is emitted into the
clicked_impressions
stream.