Skip to content

Generate test data

Use the ksql-datagen command-line tool to generate test data that complies with a custom schema that you define.

To generate test data, create an Apache Avro schema and pass it to ksql-datagen. This generates random data according to the schema you provide.

Also, you can generate data from a few simple, predefined schemas.

Prerequisites:

  • Confluent Platform is installed and running. This installation includes an Apache Kafka® broker, ksqlDB, Control Center, ZooKeeper, Schema Registry, REST Proxy, and Connect.
  • If you installed Confluent Platform via TAR or ZIP, navigate to the installation directory. The paths and commands used throughout this tutorial assume that you're in this installation directory.
  • Java: Minimum version 1.8. Install Oracle Java JRE or JDK >= 1.8 on your local machine.

The ksql-datagen tool is installed with Confluent Platform by default.

Note

ksqlDB Server doesn't need to be running for ksql-datagen to generate records to a topic. The ksql-datagen tool isn't just for ksqlDB. You can use it to produce data to any Kafka topic that you have write access to.

Usage

Use the following command to generate records from an Avro schema:

1
<path-to-confluent>/bin/ksql-datagen schema=<path-to-avro-file> format=<record format> topic=<kafka topic name> key=<name of key column> [options ...]

Required Arguments

Name Default Description
schema=<avro schema file> Path to an Avro schema file. Requires the format, topic, and key options.
key-format=<key format> Kafka Format of generated record keys: one of avro, json, delimited, kafka. Case-insensitive.
value-format=<value format> JSON Format of generated record values: one of avro, json, delimited. Case-insensitive.
topic=<kafka topic name> Name of the topic that receives generated records
key=<name of key column> Field to use as the key for generated records.
quickstart=<quickstart preset> Generate records from a preset schema: orders, users, or pageviews. Case-insensitive.
If topic isn't specified, creates a topic named <preset>_kafka_topic_json, for example, users_kafka_topic_json.

Use the following command to generate records from one of the predefined schemas:

1
<path-to-confluent>/bin/ksql-datagen quickstart=<quickstart preset> [options ...]

Optional Arguments

The following options apply to both the schema and quickstart options.

Name Default Description
bootstrap-server=<kafka-server>:<port> localhost:9092 IP address and port for the Kafka server to connect to.
key-format=<key format> Kafka Format of generated record keys: avro, json, delimited or kafka. Case-insensitive. Required by the schema option.
value-format=<value format> JSON Format of generated record values: avro, json, or delimited. Case-insensitive. Required by the schema option.
topic=<kafka topic name> Name of the topic that receives generated records. Required by the schema option.
key=<name of key column> Field to use as the key for generated records. Required by the schema option.
iterations=<number of records> 1,000,000 The maximum number of records to generate.
msgRate=<rate to produce in msgs/second> -1 (unlimited, i.e. as fast as possible) The rate to produce messages at, in messages-per-second.
propertiesFile=<path-to-properties-file> <path-to-confluent>/etc/ksqldb/datagen.properties Path to the ksql-datagen properties file.
schemaRegistryUrl http://localhost:8081 URL of Schema Registry when format is avro.

Tip

For usage information, enter ksql-datagen help.

Generate Records From a Predefined Schema

The ksql-datagen tool provides some simple schemas for generating example orders, users, and pageviews data.

Generate Example Order Records With Structured Data

The orders quickstart option produces records that simulate orders, with orderid, itemid, price, and location columns. The location column is a STRUCT with city, state, and zipcode columns. The orderId column is stored in the topic message key.

The following command generates example order records to a Kafka topic named orders_topic:

1
<path-to-confluent>/ksql-datagen quickstart=orders topic=orders_topic

In the ksqlDB CLI or in Control Center, register a stream on orders_topic:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE STREAM orders_raw (
    orderId VARCHAR KEY,
    itemid VARCHAR,
    price DOUBLE,
    location STRUCT<
        city VARCHAR,
        state VARCHAR,
        zipcode INT>,
    timestamp VARCHAR
  ) WITH (
    KAFKA_TOPIC='orders_topic',
    VALUE_FORMAT='JSON'
  );

Inspect the schema of the orders_raw stream by using the DESCRIBE statement:

1
DESCRIBE orders_raw;

Your output should resemble:

1
2
3
4
5
6
7
8
9
Name                 : ORDERS_RAW
 Field     | Type                                                                 
----------------------------------------------------------------------------------
 ORDERID   | VARCHAR(STRING)  (key)
 ITEMID    | VARCHAR(STRING)                                                      
 PRICE     | DOUBLE                                                               
 LOCATION  | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE INTEGER> 
 TIMESTAMP | VARCHAR(STRING)                                                      
----------------------------------------------------------------------------------

For more information, see How to query structured data.

Generate Example User Records

The users quickstart option produces records that simulate user data, with registertime, gender, regionid, and userid fields. You can join userid values with the page view records generated by the pageviews quickstart option.

The following command generates example user records:

1
<path-to-confluent>/bin/ksql-datagen quickstart=users

In this example, no topic name is specified, so ksql-datagen creates a topic named users_kafka_topic_json.

In the ksqlDB CLI or in Control Center, register a table on users_kafka_topic_json:

1
2
3
4
5
6
7
8
9
CREATE TABLE users_original (
    userid VARCHAR PRIMARY KEY,
    registertime BIGINT,
    gender VARCHAR,
    regionid VARCHAR
  ) WITH (
    kafka_topic='users_kafka_topic_json',
    value_format='JSON'
  );

Inspect the schema of the users_original table by using the DESCRIBE statement:

1
DESCRIBE users_original;

Your output should resemble:

1
2
3
4
5
6
7
8
Name                 : USERS_ORIGINAL
 Field        | Type                      
------------------------------------------
 USERID       | VARCHAR(STRING)  (key)
 REGISTERTIME | BIGINT                    
 GENDER       | VARCHAR(STRING)           
 REGIONID     | VARCHAR(STRING)           
------------------------------------------

Generate Example User Records With Complex Data

The users_ quickstart option produces records that simulate user data, with registertime, gender, regionid, userid, interests, and contactInfo columns. The interests column is an ARRAY, and the contactInfo column is a MAP.

You can join userid values with the page view records generated by the pageviews quickstart option.

The following command generates example user records that have complex data:

1
<path-to-confluent>/bin/ksql-datagen quickstart=users_ topic=users_extended

In the ksqlDB CLI or in Control Center, register a table on users_extended:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE TABLE users_extended (
    userid VARCHAR PRIMARY KEY,
    registertime BIGINT,
    gender VARCHAR,
    regionid VARCHAR,
    interests ARRAY<STRING>,
    contactInfo MAP<STRING, STRING>
  ) WITH (
    kafka_topic='users_extended',
    value_format='JSON'
  );

Inspect the schema of the users_extended table by using the DESCRIBE statement:

1
DESCRIBE users_extended;

Your output should resemble:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
Name                 : USERS_EXTENDED
 Field        | Type                         
----------------------------------------------
 USERID       | VARCHAR(STRING)  (primary key)
 REGISTERTIME | BIGINT                       
 GENDER       | VARCHAR(STRING)              
 REGIONID     | VARCHAR(STRING)              
 INTERESTS    | ARRAY<VARCHAR(STRING)>       
 CONTACTINFO  | MAP<STRING, VARCHAR(STRING)> 
----------------------------------------------

For more information, see How to query structured data.

Generate Example User Page Views

The pageviews quickstart option produces records that simulate page views, with viewtime, userid, and pageid columns. You can join userid values with the user records generated by the users quickstart option.

The following command generates example pageview records to a Kafka topic named pageviews:

1
<path-to-confluent>/bin/ksql-datagen quickstart=pageviews topic=pageviews

In the ksqlDB CLI or in Control Center, register a stream on pageviews:

1
2
3
4
5
6
7
8
CREATE STREAM pageviews_original (
    viewtime bigint,
    userid varchar,
    pageid varchar
  ) WITH (
    kafka_topic='pageviews',
    value_format='DELIMITED'
  );

Inspect the schema of the pageviews_original stream by using the DESCRIBE statement:

1
DESCRIBE pageviews_original;

Your output should resemble:

1
2
3
4
5
6
7
Name                 : PAGEVIEWS_ORIGINAL
 Field    | Type                      
--------------------------------------
 VIEWTIME | BIGINT                    
 USERID   | VARCHAR(STRING)           
 PAGEID   | VARCHAR(STRING)           
--------------------------------------

Generate Records From an Avro Schema

Define a Custom Schema

In this example, you download a custom Avro schema and generate matching test data. The schema is named impressions.avro, and it represents advertisements delivered to users.

Download impressions.avro and copy it to your home directory. It's used by ksql-datagen when you start generating test data.

1
curl https://raw.githubusercontent.com/apurvam/streams-prototyping/master/src/main/resources/impressions.avro > impressions.avro

Generate Test Data

When you have a custom schema registered, you can generate test data that's made up of random values that satisfy the schema requirements. In the impressions schema, advertisement identifiers are two-digit random numbers between 10 and 99, as specified by the regular expression ad_[1-9][0-9].

Open a new command shell, and in the <path-to-confluent>/bin directory, start generating test values by using the ksql-datagen command. In this example, the schema file, impressions.avro, is in the root directory.

1
<path-to-confluent>/bin/ksql-datagen schema=~/impressions.avro format=delimited topic=impressions key=impressionid

After a few startup messages, your output should resemble:

1
2
3
4
impression_796 --> ([ 1528756317023 | 'impression_796' | 'user_41' | 'ad_29' ])
impression_341 --> ([ 1528756317446 | 'impression_341' | 'user_34' | 'ad_32' ])
impression_419 --> ([ 1528756317869 | 'impression_419' | 'user_58' | 'ad_74' ])
impression_399 --> ([ 1528756318146 | 'impression_399' | 'user_32' | 'ad_78' ])

Consume the Test Data Stream

In the ksqlDB CLI or in Control Center, register the impressions stream:

1
CREATE STREAM impressions (viewtime BIGINT, key VARCHAR, userid VARCHAR, adid VARCHAR) WITH (KAFKA_TOPIC='impressions', VALUE_FORMAT='DELIMITED');

Create the impressions2 persistent streaming query:

1
CREATE STREAM impressions2 as select * from impressions EMIT CHANGES;

Suggested Resources


Last update: 2020-08-12