IMPORTANT: Do not upgrade existing ksqlDB installations in-place
Why does ksqlDB not currently support in-place upgrades?¶
Past releases of KSQL were backwards compatible. However, there was a cost to this backwards compatibility: progress was slower and the code base incurred increased complexity. ksqlDB is a young product and we're wanting to move fast, so have decided to choose speed of development over strong backwards compatibility guarantees for a few released.
Until version 1.0 of ksqlDB, each minor release will potentially have breaking changes in it that mean you can not simply update the ksqlDB binaries and restart the server(s).
The data models and binary formats used within ksqlDB are in flux. This means data local to each ksqlDB node and stored centrally within internal Kafka topics may not be compatible with the new version you are trying to deploy.
Should I upgrade?¶
It's great that you're interested in trying out the new features and fixes that new versions of ksqlDB bring. However, before rushing off to upgrade all your ksqlDB clusters ask yourself the question "do I need to upgrade this cluster"?
If you're running ksqlDB in production and you don't yet need the features or fixes the new version brings, then consider delaying any upgrade until either another release has features or fixes you need, or until ksqlDB reaches version 1.0 and therefore promises backwards compatibility.
How to upgrade¶
Upgrading a cluster involves leaving the old cluster running on the old version, bringing up a new cluster on the new version, porting across your database schema and finally thinking about your data.
Porting the database schema¶
To port your database schema from one cluster to another you need to recreate all the streams, tables and types in the source cluster. This is currently a manual process, until support is added to dump the schema.
The recommended process it to:
Note: you can use the [SPOOL
]
(https://github.com/confluentinc/ksql/blob/master/docs-md/developer-guide/ksqldb-reference/spool.md)
command to capture the output of the commands you run in the CLI to a file.
- Capture streams SQL:
- Run
list streams extended;
to list all of the streams. - Grab the SQL statement that created each stream from the output, ignoring
KSQL_PROCESSING_LOG
. - Capture tables SQL:
- Run
list tables extended;
to list all of the tables. - Grab the SQL statement that created each table from the output.
- Capture custom types SQL:
- Run
list types;
to list all of the custom types. - Convert the output into
CREATE TYPE <name> AS <schema>
syntax by grabbing the name from the first column and the schema from the second column of the output. - Order by dependency: you'll now have the list of SQL statements to rebuild the schema, but they are not yet ordered in terms of dependencies. You will need to reorder the statements to ensure each statement come after any other statements it depends on.
- Update the script to take into account any changes in syntax or functionality between the old
and new clusters. The release notes can help here. It can also be useful to have a test ksqlDB
cluster, pointing to a different test Kafka cluster, where you can try running the script to get
feedback on any errors. Note: you may want to temporarily add
PARTITIONS=1
to theWITH
clause of anyCREATE TABLE
orCREATE STREAM
command, so that the command will run without requiring you to first create the necessary topics in the test Kafka cluster. - Stop the old cluster: if you do not do so then both the old and new cluster will be publishing to sink topics, resulting in undefined behaviour.
- Build the schema in the new instance. Now you have the SQL file you can run this against the
new cluster to build a copy of the schema. This is best achieved with the [
RUN SCRIPT
] (https://github.com/confluentinc/ksql/blob/master/docs-md/developer-guide/ksqldb-reference/run-script.md) command, which takes a SQL file as an input.
Rebuilding state¶
Porting the database schema to the new cluster will cause ksqlDB to start processing data. As this is a new cluster it will start processing all data from the start, i.e. it will likely be processing data the old cluster has already processed.
IMPORTANT: while ksqlDB is processing historic data it will output historic results to sink topics. Such historic results may cause issues with downstream consumers of these data. The historic results will be correctly timestamped, allowing well behaved consumers to correctly process or ignore the historic results.
NOTE: source data that the old cluster processed may not longer be available in Kafka for the new cluster to process, e.g. topics with limited retention. It is therefore possible for the new cluster to have different results to the old.
It is possible to monitor how far behind the processing is through JMX. Monitor the
kafka.consumer/consumer-fetch-manager-metrics/<consumer-name>/records-lag-max
metrics to observe
the new nodes processing the historic data.
Destroy the old cluster¶
Once you're happy with your new cluster you can destroy the old one using the terminate endpoint. This will stop all processing and delete any internal topics in Kafka.
Upgrade notes¶
Upgrading from ksqlDB 0.6.0 to 0.7.0¶
IMPORTANT: ksqlDB 0.7.0 is not backwards compatible. Do not upgrade in-place.
The following changes in SQL syntax and functionality may mean SQL statements that previously ran not longer run:
PARTITION BY
and GROUP BY
result schema changes:¶
Materialized views created with CREATE TABLE AS SELECT
or CREATE STREAM AS SELECT
that include a
PARTITION BY
or GROUP BY
on a single column may fail or result in a different result schema.
The statement may fail if the type of the single column is not a supported primitive type: INT
,
BIGINT
, DOUBLE
or STRING
. For example:
1 2 3 |
|
Workaround: Change the statement to CAST
the GROUP BY
or PARTITION BY
column to a STRING
, for example
1 |
|
The statement will result in a stream or table with a different schema if the single column has
supported primitive key other than STRING
. The type of the ROWKEY
system column in the resulting
table or stream will match the type of the single column. This can cause downstream statements to
fail.
1 2 3 4 5 |
|
Workaround 1: Fix the downstream queries to use the new SQL type.
Workaround 2: Change the statement to CAST
the GROUP BY
or PARTITION BY
column to a STRING
.
This allows the query to operate as it did previously, for example
1 2 |
|
Joins may result in different schema or may fail¶
Some existing joins may now fail and others may see the the type of ROWKEY
in the result schema
may have changed, due to primitive key support.
The statement may fail if the two sides of the join have a different SQL Type. For example:
1 2 3 4 5 |
|
The join now fails because joins require the join keys to exactly match, which can not be the case if they are a different SQL type.
workaround:: ksqlDB now supports arbitrary expressions in the join criteria, allowing you to CAST
either, or both, sides to the same SQL type. For example,
1 |
|
The statement will result in a stream or table with a different schema if the join column has
supported primitive key other than STRING
. The type of the ROWKEY
system column in the resulting
table or stream will match the type of the join column. This can cause downstream statements to
fail.
1 2 3 4 5 6 7 |
|
Workaround 1: Fix the downstream queries to use the new SQL type.
Workaround 2: Change the statement to CAST
the join criteria to a STRING
.
This allows the query to operate as it did previously, for example
1 2 |
|
Incompatibility between ROWKEY
type and WITH(KEY)
type¶
CREATE TABLE
and CREATE STREAM
statements accept an optional KEY
property in their WITH
clause.
ksqlDB now requires the column identified in the KEY
property to have the same SQL type as key of the
Kafka message, i.e. same type as ROWKEY
. This makes sense, given the column identified in the KEY
property is expected to hold the exact same data as ROWKEY
if things are to work correctly.
For example, the statement below would previously have executed, but will now fail:
1 2 |
|
Workaround 1: Update the statement to explicitly set the type of ROWKEY
If the key type matches the column type, e.g. in the example above the key is
an INT
, then the statement can be updated to explicitly set ROWKEY
type:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
These window bound UDAFs have been removed in ksqlDB 0.7.
workaround: Please use the WindowStart
and WindowEnd
system columns instead, for example:
1 |
|
Windowed ROWKEY
data change¶
Any query of a windowed source that uses ROWKEY
in the SELECT projection will see the contents of
ROWKEY
change, for example:
1 2 |
|
For previous versions of ksqlDB KEY_COPY
would be of type STRING
and would contain data in the
format <key> : Window{start=<window-start>, end=<window-end>}
. From v0.7 onwards the type of
KEY_COPY
will match the type and contents of ROWKEY
.
workaround: if required, the statement can be updated to reconstruct the old string value by accessing
the window bounds using the WINDOWSTART
and WINDOWEND
system columns, for example:
1 2 3 4 5 |
|
Change in array base index¶
Previous versions of ksqlDB used base-0 indexing when accessing array elements. For example:
1 2 3 |
|
Starting from v0.7 ksqlDB more correctly uses base-one indexing.
Workaround: update the statements to use base-one indexing. For example:
1 |
|
Change in required order for EMIT CHANGES
and PARTITION BY
¶
Previous releases of ksqlDB required the EMIT CHANGES
bfore the PARTITION BY
. For example:
1 |
|
Starting from v0.7, ksqlDB requires the PARTITION BY
to come before the EMIT CHANGES
Workaround: update the statement to reflect the new required order. For example:
1 |
|
ALL
, WINDOWSTART
and WINDOWEND
are now reserved identifiers¶
Any query using these identifiers will need to be changed to either use some other identifier, or to quote them.
Page last revised on: 2020-04-29