Skip to content

How to test an application

Context

You have an application of ksqlDB statements, and you want to automatically test whether they behavior correctly when given a set of inputs. ksqlDB exposes a test runner command line tool to do just that. It runs quickly and doesn't require a running Apache Kafka® or ksqlDB cluster.

In action

1
ksql-test-runner -s statements.sql -i input.json -o output.json

Usage

To test a set of SQL statements, you provide three files, one file containing the SQL statements and two JSON files containing the input records and the expected output records.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
NAME
        ksql-test-runner - The KSQL testing tool

SYNOPSIS
        ksql-test-runner [ {--extension-dir | -e} <extensionDir> ]
                [ {--input-file | -i} <inputFile> ]
                {--output-file | -o} <outputFile>
                {--sql-file | -s} <statementsFile>

OPTIONS
        --extension-dir <extensionDir>, -e <extensionDir>
            A directory containting extensions.

            This option may occur a maximum of 1 times


        --input-file <inputFile>, -i <inputFile>
            A JSON file containing the input records.

            This option may occur a maximum of 1 times


        --output-file <outputFile>, -o <outputFile>
            A JSON file containing the expected output records.

            This option may occur a maximum of 1 times


        --sql-file <statementsFile>, -s <statementsFile>
            A SQL file containing KSQL statements to be tested.

            This option may occur a maximum of 1 times

Test file structure

Statements File

The statements file contains the SQL statements to test. The following are the supported statements in the testing tool:

  • CREATE STREAM
  • CREATE TABLE
  • CREATE STREAM AS SELECT
  • CREATE TABLE AS SELECT
  • INSERT INTO

Here is a sample statements file for the testing tool:

1
2
CREATE STREAM orders (ORDERID INT KEY, ORDERUNITS double) WITH (kafka_topic='test_topic', value_format='JSON');
CREATE STREAM S1 AS SELECT ORDERID, ORDERUNITS, CASE WHEN orderunits < 2.0 THEN 'small' WHEN orderunits < 4.0 THEN 'medium' ELSE 'large' END AS case_result FROM orders EMIT CHANGES;

Input File

The input file is a JSON file with one array field named inputs. Each element in the array is the representation of input records.

The input records array can't be empty. A record should have a topic, a key, a value, and a timestamp. The following is a sample input file for the previous test:

1
2
3
4
5
6
7
8
9
{
  "inputs": [
    {"topic": "test_topic", "timestamp": 0, "key": 0, "value": {"ORDERUNITS": 2.0}},
    {"topic": "test_topic", "timestamp": 0, "key": 100, "value": {"ORDERUNITS": 4.0}},
    {"topic": "test_topic", "timestamp": 0, "key": 101, "value": {"ORDERUNITS": 6.0 }},
    {"topic": "test_topic", "timestamp": 0, "key": 101, "value": {"ORDERUNITS": 3.0}},
    {"topic": "test_topic", "timestamp": 0, "key": 101, "value": {"ORDERUNITS": 1.0}}
  ]
}

Output File

The output file is a JSON file with an array field named outputs. Similar to the input file, each element in the array is the representation of the expected output records.

The output records array can't be empty. An expected output record should have a topic, a key, a value, and a timestamp. The following is a sample expected output file for the previous test:

1
2
3
4
5
6
7
8
9
{
  "outputs": [
    {"topic": "S1", "timestamp": 0, "key": 0, "value": {"ORDERUNITS": 2.0, "CASE_RESULT": "medium"}},
    {"topic": "S1", "timestamp": 0, "key": 100, "value": {"ORDERUNITS": 4.0, "CASE_RESULT": "large"}},
    {"topic": "S1", "timestamp": 0, "key": 101, "value": {"ORDERUNITS": 6.0, "CASE_RESULT": "large"}},
    {"topic": "S1", "timestamp": 0, "key": 101, "value": {"ORDERUNITS": 3.0, "CASE_RESULT": "medium"}},
    {"topic": "S1", "timestamp": 0, "key": 101, "value": {"ORDERUNITS": 1.0, "CASE_RESULT": "small"}}
  ]
}

In the input and output files you can have records with windowed keys. Such records can be generated by windowed aggregations in ksqlDB. To specify a window for a record you can add a window field to the record. A window field has three fields:

  • start: the start time for the window.
  • end: the end time for the window.
  • type: the type of the window. A window type can be time or session.

The following is an example expected output file with records that have a window field:

1
2
3
4
5
6
7
8
{
   "outputs": [
     {"topic": "S2", "timestamp": 0, "key": 0, "window": {"start": 0, "end": 30000, "type": "time"}, "value": "0,0"},
     {"topic": "S2", "timestamp": 10000, "key": 0, "window": {"start": 0, "end": 30000, "type": "time"}, "value": "0,5"},
     {"topic": "S2", "timestamp": 30000, "key": 100, "window": {"start": 30000, "end": 60000, "type": "time"}, "value": "100,100"},
     {"topic": "S2", "timestamp": 45000, "key": 100, "window": {"start": 30000, "end": 60000, "type": "time"}, "value": "100,100"}
   ]
}

Currently, in the input files you can have only records with session window types.

Running the testing tool

The testing tool indicates the success or failure of a test by printing the corresponding record. The following is the result of a successful test:

1
ksql-test-runner -s statements.sql -i input.json -o output.json

Your output should resemble:

1
>>> Test passed!

Note that the tool may also write verbose log output to the terminal too, in which case you may need to page through it to locate the test status message.

If a test fails, the testing tool will indicate the failure along with the cause. Here is an example of the output for a failing test:

1
  >>>>> Test failed: Topic 'S1', message 4: Expected <101, {"ORDERUNITS":18.0,"CASE_RESULT":"small"}> with timestamp=0 but was <101, {ORDERUNITS=1.0, CASE_RESULT=small}> with timestamp=0

Query execution

To use the ksqlDB testing tool effectively, you need to understand the query execution logic in the testing tool. Although the final results should be deterministic, the intermediate results in SQL queries (Kafka Apps) may vary based on several factors, such as order of reading input or config properties like the producer buffer size. In order to make the composition of output for the test cases simpler, the ksqlDB testing tool executes queries in a predictable way. Consider the following guidance when you prepare the output for your tests.

Input consumption

Before processing the next input record, the testing tool processes input records for each query one-by-one and writes the generated record(s) for each input record into the result topic. This means that queries running in the testing tool have the same behavior as when cache.max.bytes.buffering = 0. This is especially important in aggregate queries, where you may not see some of the intermediate results in real executions because of buffering, but when the testing tool executes, every possible intermediate result is created.

Kafka cluster

The ksqlDB testing tool doesn't use a real Kafka cluster. Instead, it simulates the behavior of a cluster with a single broker for the SQL queries. This means that the testing tool ignores configuration settings for the input and output topics, like the number of partitions or replicas.

Processing order

The testing tool processes the statements in the order that you provide them. So, for a given statement, only the statements before it can potentially affect its results. This behavior differs from a ksqlDB cluster, where statements that are submitted later can affect the output of a query. For example, consider the following set of statements:

1
2
3
4
5
6
7
CREATE STREAM orders (ORDERUNITS double) WITH (kafka_topic='test_topic', value_format='JSON');
INSERT INTO orders VALUES(10.0);
INSERT INTO orders VALUES(15.0);
INSERT INTO orders VALUES(20.0);
CREATE STREAM S1 AS SELECT ORDERUNITS, CASE WHEN orderunits < 2.0 THEN 'small' WHEN orderunits < 4.0 THEN 'medium' ELSE 'large' END AS case_result FROM orders EMIT CHANGES;
INSERT INTO orders VALUES(25.0);
INSERT INTO orders VALUES(30.0);

If you run these statements in a real ksqlDB cluster, you see one result generated for each INSERT INTO statement, and you have five records in the output. On the other hand, if you run the previous statements in the testing tool, only the INSERT INTO statements before the CSAS query generate results, and the testing tool doesn't run the query for the records generated by the INSERT INTO statements after the CSAS statement.

Note: Be aware of the the order in which the input data for a query is processed. For a given query, the testing tool first processes the input records provided in the input file. After fully processing these messages, the testing tool inspects the source topics for the query in the simulated Kafka cluster and processes any messages in these topics. For JOIN queries that have more than one source topic, the testing tool first processes the left-side topic and then processes the right-side topic.

Generate an input file from an existing topic

You can use kafkacat and jq in combination to create an input file based on data already in a Kafka topic:

1
2
3
kafkacat -b broker:29092 -t my_topic -C -e -J | \
  jq --slurp '{inputs:[.[]|{topic:.topic,timestamp: .ts, key: .key, value: .payload|fromjson}]}' \
  > input.json

Replace broker:29092 with your broker host and port, and my_topic with the name of your topic. You can limit how many messages are written to the file by adding a -c flag to the kafkacat statement—for example, -c42 would write the first 42 messages from the topic.


Last update: 2021-03-31