ksqlDB is a simple and powerful tool for building streaming applications on top of Apache Kafka®. This guide helps you plan for provisioning your ksqlDB deployment and answers these questions:
- What server specification should I use to run ksqlDB?
- Approximately how many ksqlDB server nodes do I need?
- Do I need to provision additional Kafka brokers to support ksqlDB?
- What kind of deployment mode makes sense for me?
- How do I know whether my ksqlDB queries are handling the incoming message rate, and how can I tune ksqlDB if it's falling behind?
Because the underlying implementation of ksqlDB uses the Kafka Streams API for query processing, the details covered in the Streams documentation apply to ksqlDB as well. The Kafka Streams capacity planning guide is another useful resource for ksqlDB capacity planning.
Approach To Sizing¶
This document provides you with a rough estimate of the computing resources required to run your SQL queries in ksqlDB. There are many factors that determine your actual required resources, and the best practice is to stage and test your queries against realistic loads first, and then adjust your planned provisioning accordingly.
CPU: ksqlDB consumes CPU to serialize and deserialize messages into the declared stream and table schemas, and then process each message as required by the query. At least 4 cores are recommended.
Disk: ksqlDB uses local disk to persist temporary state for aggregations and joins. An SSD and at least 100 GB is recommended.
Memory: ksqlDB memory usage is dominated by on-heap message processing and off-heap state for aggregations, like SUM, COUNT, TOPKDISTINCT, and joins. Usage for message processing scales with message throughput, while aggregation and join state is a function of topic partition count, key space size, and windowing. A good starting point here is 32 GB.
Network: ksqlDB relies heavily on Kafka, so fast and reliable network is important for optimal throughput. A 1 Gbit NIC is a good starting point.
General guidelines for a basic ksqlDB server are:
- 4 cores
- 32 GB RAM
- 100 GB SSD
- 1 Gbit network
Don't deploy multi-tenant ksqlDB Server instances.
We recommend against using ksqlDB in a multi-tenant fashion. For example, if you have two ksqlDB applications running on the same node, and one is greedy, you're likely to encounter resource issues related to multi-tenancy. We recommend using a single pool of ksqlDB Server instances per use case. You should deploy separate applications onto separate ksqlDB nodes, because it becomes easier to reason about scaling and resource utilization. Also, deploying per use case makes it easier to reason about failovers and replication.
ksqlDB consumes resources on your Kafka cluster.
ksqlDB creates the following types of topics on your Kafka cluster:
Every query started by a
CREATE STREAM AS SELECT or
CREATE TABLE AS SELECT statement writes its results to an output
topic. The created topic is configured with the following properties:
- Name: By default, ksqlDB creates the output topic with the same name as
the stream or table created by the statement. You can specify a
custom name in the
KAFKA_TOPICproperty of the statement's
- Partitions: By default ksqlDB creates an output topic with 4
partitions. You can specify a custom partition count in the
PARTITIONSproperty of the statement's
- Replication Factor: By default, ksqlDB creates the output topic with a
replication factor of 1. You can specify a custom replication factor
REPLICASproperty of the statement's
Internal Topics for Repartitioning¶
Some queries require that the input stream be repartitioned so that all
messages being aggregated or joined together reside in the same
partition. Repartitioning means that an intermediate topic is created
and every record is produced and consumed to and from that topic with a
key that ensures the locality requirements of the query are satisfied.
The intermediate topic is named with the suffix
repartition topic has the same number of partitions and replicas as the
input stream for aggregations, or the input table for joins.
To determine if your query needs a repartition, you can use the
DESCRIBE EXTENDED and
EXPLAIN statements. For example, consider the
following table created from the quickstart:
CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid EMIT CHANGES; DESCRIBE EXTENDED pageviews_by_page;
Your output should resemble:
... Queries that write into this TABLE ----------------------------------- id:CTAS_PAGEVIEWS_BY_PAGE - CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid EMIT CHANGES; For query topology and execution plan please run: EXPLAIN <QueryId>
The DESCRIBE EXTENDED output includes the Query ID for the query populating the table. You can run EXPLAIN against the query to print the underlying streams topology:
Your output should resemble:
Type : QUERY SQL : CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid EMIT CHANGES; Execution plan -------------- > [ PROJECT ] Schema: [PAGEID : STRING , KSQL_COL_1 : INT64]. > [ AGGREGATE ] Schema: [PAGEVIEWS_ORIGINAL.PAGEID : STRING , PAGEVIEWS_ORIGINAL.ROWTIME : INT64 , KSQL_AGG_VARIABLE_0 : INT64]. > [ PROJECT ] Schema: [PAGEVIEWS_ORIGINAL.PAGEID : STRING , PAGEVIEWS_ORIGINAL.ROWTIME : INT64]. > [ SOURCE ] Schema: [PAGEVIEWS_ORIGINAL.ROWTIME : INT64 , PAGEVIEWS_ORIGINAL.ROWKEY : STRING , PAGEVIEWS_ORIGINAL.VIEWTIME : INT64 , PAGEVIEWS_ORIGINAL.USERID : STRING , PAGEVIEWS_ORIGINAL.PAGEID : STRING]. Processing topology ------------------- Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [pageviews]) --> KSTREAM-MAPVALUES-0000000001 Processor: KSTREAM-MAPVALUES-0000000001 (stores: ) --> KSTREAM-TRANSFORMVALUES-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-TRANSFORMVALUES-0000000002 (stores: ) --> KSTREAM-MAPVALUES-0000000003 <-- KSTREAM-MAPVALUES-0000000001 Processor: KSTREAM-MAPVALUES-0000000003 (stores: ) --> KSTREAM-FILTER-0000000004 <-- KSTREAM-TRANSFORMVALUES-0000000002 Processor: KSTREAM-FILTER-0000000004 (stores: ) --> Aggregate-groupby <-- KSTREAM-MAPVALUES-0000000003 Processor: Aggregate-groupby (stores: ) --> KSTREAM-FILTER-0000000009 <-- KSTREAM-FILTER-0000000004 Processor: KSTREAM-FILTER-0000000009 (stores: ) --> KSTREAM-SINK-0000000008 <-- Aggregate-groupby Sink: KSTREAM-SINK-0000000008 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition) <-- KSTREAM-FILTER-0000000009 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000010 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition]) --> KSTREAM-AGGREGATE-0000000007 Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006]) --> KTABLE-MAPVALUES-0000000011 <-- KSTREAM-SOURCE-0000000010 Processor: KTABLE-MAPVALUES-0000000011 (stores: ) --> KTABLE-TOSTREAM-0000000012 <-- KSTREAM-AGGREGATE-0000000007 Processor: KTABLE-TOSTREAM-0000000012 (stores: ) --> KSTREAM-MAPVALUES-0000000013 <-- KTABLE-MAPVALUES-0000000011 Processor: KSTREAM-MAPVALUES-0000000013 (stores: ) --> KSTREAM-SINK-0000000014 <-- KTABLE-TOSTREAM-0000000012 Sink: KSTREAM-SINK-0000000014 (topic: PAGEVIEWS_BY_PAGE) <-- KSTREAM-MAPVALUES-0000000013
Observe that there are 2 sub-topologies. This means that the stream is being re-partitioned.
State Store Changelog Topics¶
ksqlDB uses an embedded storage engine to manage state locally for
operations such as aggregations. For fault-tolerance reasons it also
persists the state for aggregations, like SUM, COUNT, and TOPKDISTINCT in a
compacted changelog topic. The changelog topic has the same number of
partitions as the input stream. It defaults to a single replica, but
this can be explicitly set via the
The amount of data stored in the changelog topic depends on the number of keys, key size, aggregate size, and whether the aggregation is windowed and if so, what the window retention time is.
For un-windowed aggregations the total size should be roughly the (key size + aggregate size) multiplied by the number of keys.
For windowed aggregates the size is determined by the number of outstanding windows multiplied by the size of each window. The number of outstanding windows is bound by the window retention time. The size of each window depends on message throughput, key space size and the average key size. If you have a large key space then each window's size will likely be a multiple of the throughput, window size, and average key size. If your key space is small then the window's size is bound by the number of keys multiplied by the average key size.
ksqlDB Command Topics¶
When run in interactive configuration, a ksqlDB cluster creates an
internal topic (whose name is derived from the
setting) to persist the log of queries to run across all the servers in
this ksqlDB cluster. These special-purpose topics for ksqlDB are called
command topics. Command topics have a single partition and default to a
replication factor of 1.
Headless mode deployments don't have a command topic.
Consumption and Production¶
You might need to provision additional Kafka brokers to accommodate ksqlDB production and consumption to and from your Kafka cluster.
Minimally, each query consumes each record from an input Kafka topic and produces records to an output Kafka topic.
Some queries require that the input stream be repartitioned so that all messages being aggregated or joined together reside in the same partition. Each repartition produces and consumes every record.
Finally, stateful queries such as aggregations and joins produce records to a changelog topic for their respective state stores.
Important Sizing Factors¶
This section describes the important factors to consider when scoping out your ksqlDB deployment.
Throughput: In general, higher throughput requires more resources.
Query Types: Your realized throughput will largely be a function of the type of queries you run. You can think of ksqlDB queries as falling into these categories:
- Project/Filter, e.g.
SELECT <columns> FROM <table/stream> WHERE <condition>
- Aggregations, e.g.
SUM, COUNT, TOPK, TOPKDISTINCT
A project/filter query reads records from an input stream or table, may filter the records according to some predicate, and performs stateless transformations on the columns before writing out records to a sink stream or table. Project/filter queries require the fewest resources. For a single project/filter query running on an instance provisioned as recommended above you can expect to realize from ~40 MB/second up to the rate supported by your network. The throughput depends largely on the average message size and complexity. Processing small messages with many columns is CPU intensive and will saturate your CPU. Processing large messages with fewer columns requires less CPU and ksqlDB will start saturating the network for such workloads.
Stream-table joins read and write to Kafka Streams state stores and require around twice the CPU of project/filter. Though Kafka Streams state stores are stored on disk, we recommend that you provision sufficient memory to keep the working set memory-resident to avoid expensive disk I/O. So expect around half the throughput and expect to provision higher-memory instances.
Aggregations read from and may write to a state store for every record. They consume around twice the CPU of joins. The CPU required increases if the aggregation uses a window as the state store must be updated for every window.
Number of Queries: The available resources on a server are shared across all queries. So expect that the processing throughput per server will decrease proportionally with the number of queries it is executing (see the notes on vertically and horizontally scaling a ksqlDB cluster in this document to add more processing capacity in such situations) . Furthermore, SQL queries run as Kafka Streams applications. Each query starts its own Kafka Streams worker threads, and uses its own consumers and producers. This adds a little bit of CPU overhead per query. You should avoid running a large number of queries on one ksqlDB cluster. Instead, use interactive mode to play with your data and develop sets of queries that function together. Then, run these in their own headless cluster. Check out the Recommendations and Best Practices section for more details.
Data Schema: ksqlDB handles mapping serialized Kafka records to columns in a stream or table's schema. In general, more complex schemas with a higher ratio of columns to bytes of data require more CPU to process.
Number of Partitions: Kafka Streams creates one RocksDB state store instance for aggregations and joins for every topic partition processed by a given ksqlDB server. Each RocksDB state store instance has a memory overhead of 50 MB for its cache plus the data actually stored.
Key Space: For aggregations and joins, Kafka Streams/RocksDB tries to keep the working set of a state store in memory to avoid I/O operations. If there are many keys, this requires more memory. It also makes reads and writes to the state store more expensive. Note that the size of the data in a state store is not limited by memory (RAM) but only by available disk space on a ksqlDB server.
Recommendations and Best Practices¶
Interactive ksqlDB Servers vs. Non-Interactive (Headless) Servers¶
By default, ksqlDB Servers is configured for interactive use, which means you can use the ksqlDB CLI to interact with a ksqlDB cluster in order to, for example, execute new queries. Interactive ksqlDB usage allows for easy and quick iterative development and testing of your SQL queries via the ksqlDB CLI.
You can also configure the servers for headless, non-interactive operation, where servers collaboratively run only a predefined list of queries. The result is essentially a scalable, fault-tolerant, and distributed stream processing application that communicates to the outside world by reading from and writing to Kafka topics. Sizing, deploying, and managing in this scenario is similar to a Kafka Streams application. You should integrate ksqlDB deployments with your own CI/CD pipeline, for example, to version-control the .sql file.
Here are some guidelines for choosing between the configuration types:
- For production deployments, headless, non-interactive ksqlDB clusters are recommended. This configuration provides the best isolation and, unlike interactive ksqlDB clusters, minimizes the likelihood of operator error and human mistakes.
- For exploring and experimenting with your data, interactive ksqlDB clusters are recommended. With this method you can quickly create queries for your use case that will function as a streaming "application" to produce meaningful results. You can then run this "application" with headless, non-interactive ksqlDB clusters in production.
- For interactive ksqlDB usage, you should deploy an interactive ksqlDB cluster per project or per team instead of a single, large ksqlDB cluster for your organization.
You can scale ksqlDB by adding more capacity per server (scaling vertically) or by adding more servers (scaling horizontally). You can scale ksqlDB clusters during live operations without loss of data. For example, you can add and remove ksqlDB servers to increase or decrease processing capacity without disturbing running queries. When scaling vertically, configure servers with a larger number of stream threads. For more information, see ksql.streams.num.stream.threads. If you're scaling past eight cores, we recommend scaling horizontally by adding servers.
Similar to Kafka Streams, ksqlDB throughput scales well as resources are added, if your Kafka topics have enough partitions to increase parallelism. For example, if your input topic has five partitions, the maximum parallelism is also five; a maximum of five cores/threads would execute a query on this topic in parallel. If you want to increase the maximum level of parallelism, you must increase the number of partitions that are being processed by using one of these methods:
- Re-partition your input data into a new stream with the
CREATE STREAM AS SELECTstatement and then write subsequent queries against the repartitioned stream. Also, if you want to save storage space in your Kafka cluster, consider lowering the data retention configuration for that underlying stream topic.
- Increase the number of partitions in the input topic.
To scale ksqlDB horizontally, run additional ksqlDB servers with the same
You can add ksqlDB Server instances continuously to scale load horizontally, as long as there are more partitions than consumers.
How to Know When to Scale¶
If ksqlDB can't keep up with the production rate of your Kafka topics, it will start to fall behind in processing the incoming data. Consumer lag is the Kafka terminology for describing how far a Kafka consumer, including ksqlDB, has fallen behind. It's important to monitor consumer lag on your topics and add resources if you observe that the lag is growing. Confluent Control Center is the recommended tool for monitoring. You can also check out Confluent Platform documentation for details on metrics exposed by Kafka that can be used to monitor lag.
Your workload may involve multiple queries, perhaps with some queries feeding data into others in a streaming pipeline, for example, a project/filter to transform some data that is then aggregated. Monitoring consumer lag for each query's input topic is especially important for such workloads. ksqlDB currently doesn't have a mechanism to guarantee resource utilization fairness between queries. So a faster query like a project/filter may "starve" a more expensive query like a windowed aggregate if the production rate into the source topics is high. If this happens, you will observe growing lag on the source topic for the more expensive queries and very low throughput to their sink topics.
You can fix this situation by using either of these methods:
- Tune the cheaper queries to consume less CPU by decreasing
kafka.streams.num.threadsfor that query.
- Add resources to reduce the per-CPU usage of the cheaper queries, which in turn will increase the throughput for the more expensive queries.
ksqlDB Capacity Sizing Examples¶
This section provides sizing scenarios with examples of how to think
about sizing. These examples analyze a stream of
The environment and numbers in this section are hypothetical and only meant for illustration purposes. You should perform your own benchmarking and testing to match your use cases and environments.
The examples assume the following DDL statements to declare the schema for the input data:
CREATE STREAM pageviews_original (viewtime BIGINT, userid VARCHAR, pageid VARCHAR, client_ip INT, url VARCHAR, duration BIGINT, from_url VARCHAR, analytics VARCHAR) WITH (kafka_topic='pageviews', value_format=’JSON’, KEY=’userid’); CREATE TABLE users (registertime BIGINT, gender VARCHAR, city INT, country INT, userid VARCHAR, email VARCHAR) WITH (kafka_topic='users', value_format='JSON', key = 'userid');
The following assumptions are also made:
- The production rate into the
pageviewstopic is 50 MB/s.
- The messages in
pageviewsaverage 256 bytes.
pageviewstopic has 64 partitions.
- The messages are in JSON format. Serialization to JSON adds some space overhead. You can assume an extra 25 percent to account for this.
Scenario 1: Project/Filter Only (Stateless Queries)¶
In this scenario, my application is a single project/filter query that tries to capture meaningful pageviews by filtering out all the views that lasted less than 10 seconds:
CREATE STREAM pageviews_meaningful WITH (PARTITIONS=64) AS SELECT * FROM pageviews_original WHERE duration > 10 EMIT CHANGES;
The example pageviews messages are under 256 bytes. For smaller messages, in this hypothetical environment, you can assume each 4-core ksqlDB Server is CPU-bound at around 50 MB/s. This throughput can be managed with a single ksqlDB Server. For increased fault tolerance, you can run a second server.
Project/Filter is stateless and therefore doesn't need to account for state store memory. 8 GB are recommended for the Java heap space for record processing.
ksqlDB uses the network to consume records from the Kafka input topic and produce records to the output topic. In this example query 50 MB/s are received. If you assume that 90 percent of the page views are meaningful, then you would produce 45 MB/s as output.
On the Kafka side you would need to provision for the additional production and consumption bandwidth as calculated above. Additionally, you would need to account for the output topic itself, which would add 64 partitions to the Kafka cluster.
Scenario 2: Large Messages¶
In this example, the same query as Scenario 1 is performed, but each
message is 8 KB. For larger messages, each ksqlDB node is usually network
bound, instead of CPU bound. One node with a 1 Gb/s should be able to
manage the original 50 MB/s (400 Mb/s) of throughput coming into the
pageviews_original topic. You can assume the production throughput is
larger at 256 MB/s. A 1 Gb/s full-duplex NIC can handle 1 Gb/s, or 128
MB/s in each direction. You can estimate 2-3 ksqlDB nodes are required to
manage this load.
Scenario 3: More Advanced Usage¶
In this example, the messages are 256 bytes and you want to enrich
pageviews_meaningful with information about the user, and then count
up views by city:
CREATE STREAM pageviews_meaningful_with_user_info WITH (PARTITIONS=64) AS SELECT pv.viewtime, pv.userid, pv.pageid, pv.client_ip, pv.url, pv.duration, pv.from_url, u.city, u.country, u.gender, u.email FROM pageviews_meaningful pv LEFT JOIN users u ON pv.userid = u.userid EMIT CHANGES; CREATE TABLE pageview_counts_by_city WITH (PARTITIONS=64) AS SELECT country, city, count(*) FROM pageviews_meaningful_with_user_info GROUP BY country, city EMIT CHANGES;
Since the example messages are small, you can expect ksqlDB to be
CPU-bound. To estimate the throughput from each ksqlDB Server, first
estimate the throughput each query would get from a single server if run
in isolation. The rule-of-thumb heuristic is that the join will consume
about twice the CPU of the project/filter. In this hypothetical
environment, you can estimate 25 MB/s for it. Aggregations consume
around twice the CPU of joins, so you can estimate 12.5 MB/s for the
To estimate the cumulative throughput from this pipeline, you can use the following:
- The ksqlDB nodes are CPU-bound, and for a query to process R bytes/second each byte consumes 1/R CPU-seconds.
- 3 queries with rates R1, R2, and R3 are processing one record for each query, which takes 1/R1 + 1/R2 + 1/R3 CPU-seconds.
- The expected throughput should be 1/(1/R1 + 1/R2 + 1/R3).
Calculating these rates gives an expected throughput of approximately 7.7 MB/s, so you would need about seven 4-core ksqlDB nodes.
To calculate how much memory is required per server, consider the following:
- You should reserve 8 GB for the Java heap.
- You must account for up-front state store memory overhead.
Across the join and aggregate, create 128 state store instances, one for each partition. Each state store allocates a 50 MB cache, which adds up to 6.25 GB total, and therefore a little under 1 GB per KSQL server.
To make joins as fast as possible, you should make sure that the
table fits in the page cache. To estimate the size of
users, you need to
know the number of registered users and the size of each user record and
key. Each entry in the user table contains a
registertime (13 bytes),
gender (1 byte),
city ID (10 bytes),
country ID (10 bytes),
(32 bytes), and
user ID, which is estimated at 32 bytes. If your site has 100,000,000
registered users, it will require approximately 14.4 GB to store your
whole table, and about 2.1 GB per ksqlDB server.
To make aggregation as fast as possible, you should ensure that all of
the aggregates fit in the page cache. To estimate the size of the
aggregates, you need the number of aggregates and the size of the state
required to store each one. Each aggregate requires storing a
city ID (10 bytes) and
count (20 bytes), adding up to 40
bytes. With overhead, you can estimate 50 bytes. The number of the
aggregates is the number of cities with registered users. You can
estimate 50,000 cities. To store all the aggregates will require
approximately 2.4 MB of memory, which is negligible.
Each ksqlDB Server should have at least about 12 GB of memory.
ksqlDB would create 5 new topics (3 output topics, 1 repartition topic, and 1 changelog topic), each with 64 partitions. You would have to account for 256 additional partitions in the Kafka cluster.
Page last revised on: 2019-12-12