How to create a user-defined function¶
Context¶
You have a piece of logic for transforming or aggregating events that ksqlDB can't currently express. You want to extend ksqlDB to apply that logic in your queries. To do that, ksqlDB exposes hooks through Java programs. This functionality is broadly called user-defined functions, or UDFs for short.
In action¶
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
|
Set up a Java project¶
To implement a user-defined function, start by creating a Java project with a dependency on ksqlDB's UDF library. This library contains the annotations you use to signal that the classes you're implementing are UDFs specifically. You can manage your Java project with any build tool, but this guide demonstrates how it works with Gradle. If you like, you can use the Maven archetype instead. What matters is that you can put an uberjar in ksqlDB's extension directory.
In a fresh directory, create the following build.gradle
file to set
up the Java project:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
|
Notice that:
- Dependencies are also declared on
kafka
andconnect-api
. You need both of these dependencies to make your UDFs capable of being externally configured or able to handle structs. This guide does both of those things.
- The Java compiler is invoked with the
-parameters
flag. This enables the JVM to retain method parameter names at runtime, which ksqlDB can use as auto-generated documentation inDESCRIBE FUNCTION
statements.
Important
Parameter names must be supplied using one of two approaches. You
can either compile your jar with the -parameters
flag to infer them
from the Java code, or you can explicitly name them with the
@UdfParameter
annotation, as seen below.
Implement the classes¶
There are three kinds of UDFs which manipulate rows in different ways: scalar functions, tabular functions, and aggregation functions. Each is demonstrated below with simple examples using a variety of features. You can learn about more sophisticated usage in the concepts section.
Start by creating a directory for the class files:
1 |
|
Scalar functions¶
A scalar function consumes one row as input and produces one row as output. Use this when you simply want to transform a value.
Create a file at src/main/java/com/example/FormulaUdf.java
and
populate it with the following code. This UDF takes two parameters and
executes a simple formula.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
|
Some important points to notice:
- The
@UdfDescription
annotation marks the class as a scalar UDF. Thename
parameter gives the function a name so you can refer to it in SQL. You can optionally add a version, which is a string of your choosing. You might find this useful if you evolve a function over time and want to know which particular version a server is currently using.
- The
@Udf
annotation marks a method as a body of code to invoke when the function is called. Because ksqlDB is strongly typed, you need to supply multiple signatures if you want your function to work with different column types. This UDF has two signatures: one that takes integer parameters and another that takes doubles.
- The
@UdfParameter
annotation marks each parameter of the method. This can be used for providing some amount of optional metadata about each parameter, but it's mainly useful so that ksqlDB can infer information at runtime.
- This UDF uses an external parameter,
ksql.functions.formula.base.value
. When a UDF implements theConfigurable
interface, it will be invoked once as the server starts up.configure()
supplies a map of the parameters that ksqlDB server was started with. You will see how this value is populated later in the guide.
Warning
External parameters aren't yet supported for tabular functions.
Either continue following this guide by implementing more functions or skip ahead to compiling the classes so you can use the functions in ksqlDB.
Tabular functions¶
A tabular function (UDTF for short) takes one row as input and produces zero or more rows as output. This is sometimes called "flat map" or "mapcat" in different programming languages. Use this when a value represents many smaller values and needs to be "exploded" into its individual parts to be useful.
Create a file at src/main/java/com/example/IndexSequenceUdtf.java
and populate it with the following code. This UDTF takes one parameter
as input, an array of any type, and returns a sequence of rows, where
each element is the element in the array concatenated with its index
position as a string.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
|
Notice how:
- The UDTF returns a Java
List
. This is the collection type that ksqlDB expects all tabular functions to return.
- This UDTF uses Java generics, which enable operating over any ksqlDB supported types. Use this if you want to express logic that operates uniformly over many different column types. The generic parameter must be declared at the head of the method, since you can have multiple signatures, each with a different generic type parameter.
Info
Java arrays and List
s are not interchangeable in user-defined functions,
which is especially important to remember when working with UDTFs. ksqlDB
arrays correspond to the Java List
type, not native Java arrays.
Either continue following this guide by implementing more functions or skip ahead to compiling the classes so you can use the functions in ksqlDB.
Aggregation functions¶
An aggregation function (UDAF for short) consumes one row at a time and maintains a stateful representation of all historical data. Use this when you want to compound data from multiple rows together.
Create a file at src/main/java/com/example/RollingSumUdaf.java
and
populate it with the following code. This UDAF maintains a rolling sum
of the last 3
integers in a stream, discarding the oldest values as
new ones arrive.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
|
There are many things to observe in this class:
- Aggregation functions are designated by a static method with the
@UdafFactory
annotation, which differs from scalar and tabular functions. Because aggregations must implement multiple methods, this helps ksqlDB differentiate aggregations when multiple type signatures are used.
- The static factory method must either return
Udaf
orTableUdaf
in packageio.confluent.ksql.function.udaf
. - Use theUdaf
return value, as shown in this example, to aggregate streams into tables.- Use the
TableUdaf
return value, which derives fromUdaf
, to aggregate tables into other tables. Also, you must implement theundo()
method.
- ksqlDB decouples the internal representation of an aggregate from
its use in an operation. This is useful because aggregations can
maintain complex state and expose it in a simpler way in a query. In
this example, the internal representation is a
LinkedList
, as indicated by theinitialize()
method. But when ksqlDB interacts with the aggregation value,map()
is called, which sums the values in the list. TheList
is needed to keep a running history of values, but the summed value is needed for the query itself.
- UDAFs must be parameterized with three generic types. In this
example, they are
<Integer, List<Integer>, Integer>
. The first parameter represents the type of the column to aggregate over. The second column represents the internal representation of the aggregation, which is established ininitialize()
. The third parameter represents the type that the query interacts with, which is converted bymap()
.
- All types, including inputs, intermediate representations, and final representations, must be types that ksqlDB supports.
- The
merge
method controls how two session windows fuse together when one extends and overlaps another. In this example, the content of the "later" aggregate is simply taken since it by definition contains values from a later window of time. If you're using session windows, consider what good merge semantics are for your aggregation.
Dynamic UDAFs¶
If a UDAF's aggregate or return types vary based on the input type, you can either write a separate
function annotated with @UdafFactory per type or override the following three methods
initializeTypeArguments(List<SqlArgument> argTypeList)
, getAggregateSqlType()
, and
getReturnSqlType()
. To see a concrete example in the ksqlDB codebase, check out the
implementation of latest_by_offset
or collect_list
.
Add the uberjar to ksqlDB server¶
In order for ksqlDB to be able to load your UDFs, they need to be compiled from classes into an uberjar. Run the following command to build an uberjar:
1 |
|
You should now have a directory, extensions
, with a file named
example-udfs-0.0.1.jar
in it.
In order to use the uberjar, you need to make it available to ksqlDB
server. Create the following docker-compose.yml
file:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
|
Notice that:
- A volume is mounted from the local
extensions
directory (containing your uberjar) to the container/opt/ksqldb-udfs
directory. The latter can be any directory that you like. This command effectively puts the uberjar on ksqlDB server's file system.
- The environment variable
KSQL_KSQL_EXTENSION_DIR
is configured to the same path that was set for the container in the volume mount. This is the path that ksqlDB will look for UDFs in.
- The environment variable
KSQL_KSQL_FUNCTIONS_FORMULA_BASE_VALUE
is set to5
. Recall that in the UDF example, the function loads an external parameter namedksql.functions.formula.base.value
. AllKSQL_
environment variables are converted automatically to server configuration properties, which is where UDF parameters are looked up.
Info
Although this is a single node setup, remember that every node in your ksqlDB cluster needs to have event variable parameters configured since any node can handle any query at any time.
Invoke the functions¶
Bring up your stack by running:
1 |
|
And connect to ksqlDB's server by using its interactive CLI:
1 |
|
Verify that your functions have been loaded by running the following ksqlDB command:
1 |
|
You should see a long list of built-in functions, including your own
FORMULA
, INDEX_SEQ
, and ROLLING_SUM
(which are listed as
SCALAR
, TABLE
, and AGGREGATE
respectively). If they aren't there,
check that your uberjar was correctly mounted into the container. Be
sure to check the log files of ksqlDB server, too, using docker logs
-f ksqldb-server
. You should see log lines similar to:
1 |
|
Info
UDFs are loaded only once as ksqlDB server starts up. ksqlDB does not support hot-reloading UDFs. If you want to change the code of a UDF, you must create a new uberjar, replace the one that is available to ksqlDB, and restart the server. Keep in mind that in a multi-node setup, different nodes may be running different versions of a UDF at the same time.
Before you run any queries, be sure to have ksqlDB start all queries from the earliest point in each topic.
1 |
|
Invoke the scalar function¶
Inspect the formula
function by running:
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
Create a stream named s1
:
1 2 3 4 5 6 7 8 9 |
|
formula
function, there are two type
signatures.
Insert some rows into the stream:
1 2 3 |
|
Execute a push query. The formula
function multiplies two integers
and adds the value of the parameter
ksql.functions.formula.base.value
, which is set to 5
in your
Docker Compose file:
1 |
|
Your output should resemble:
1 2 3 4 5 6 |
|
Try the other variant of the formula
function, which takes two
doubles. This implementation takes the ceiling of a
and b
before
multiplying. Notice how you can use constants instead of column names
as arguments to the function:
1 |
|
Your output should resemble:
1 2 3 4 5 6 |
|
Invoke the tabular function¶
Inspect the index_seq
function by running:
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 9 10 11 |
|
The DESCRIBE FUNCTION statement shows that index_seq
is a generic
function with the type parameter E
, which means that this UDTF can
take a parameter that is an array of any type.
Create a stream named s2
:
1 2 3 4 5 6 7 8 |
|
Insert some rows into the stream:
1 2 3 |
|
Execute a push query. The index_seq
function creates one row for
each element in an array, concatenated with the element's index
position.
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 9 |
|
Invoke the aggregation function¶
Inspect the rolling_sum
function by running:
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 9 10 11 |
|
Create a stream named s3
:
1 2 3 4 5 6 7 8 |
|
Insert some rows into the stream:
1 2 3 4 5 |
|
Execute a push query. The rolling_sum
function aggregates the
previous three elements together, sums them, and emits their output.
1 |
|
Your output should resemble:
1 2 3 4 5 |
|
k1
sums 3
, 5
, and 7
together to get a result of 15
. k2
sums 6
and 2
together to get a result of 8
.
Insert more rows, to shift older elements out of the aggregate:
1 2 3 |
|
Run the query again:
1 |
|
Your output should resemble:
1 2 3 4 5 |
|
The output from the rolling_sum
function has changed. In k1
, the
previous three values are now 5
, 7
, and 9
. In k2
, the
elements are now 2
, 1
, and 6
.
Using structs and decimals¶
Working with structs and decimals in UDFs requires a more specific type contract with ksqlDB. Both of these types are different from the other Java types that ksqlDB interfaces with because their typing is more dynamic. In the case of structs, fields can be added and removed, and their types are inferred on the fly. In the case of decimals, their precision and scale can change based on the inputs they are computed against. Because of this dynamism, UDFs need to be more explicit in their type contract with ksqlDB.
ksqlDB has two mechanisms handling these situations: explicitly
provided schemas and dynamic schemas. The former is generally used for
working with structs, and the latter is generally used for working
with decimals. This guide only uses explicitly provided schemas, but
you can read more about dynamic schema returns using the
@UdfSchemaProvider
in the concepts
section.
As an example of explicitly provided schemas, create a simple function that maintains simple statistics. This example uses a UDAF, but the concepts are applicable for UDFs and UDTFs. Although the example is a bit contrived, it is useful because it demonstrates using a struct in all possible positions.
Implement the class¶
Create a file at src/main/java/com/example/StatsUdaf.java
and
populate it with the following code. This UDAF maintains the minimum,
maximum, count, and difference between the min and max for a series of
numbers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
|
There are a few important things to call out in this class:
- Schemas are declared for the input struct parameter, intermediate aggregation struct value, and output struct value. These structs hold different data, so they each need their own schema.
- Descriptor strings are created for each schema, too. This communicates the underlying types to ksqlDB is a way that its type system can understand.
- The schemas, and all of the contained fields, are declared as optional. ksqlDB doesn't have null constraints, meaning that any value can be null. To handle this, all schemas and field values must be marked as optional.
- The field names in the descriptors must match their declared field names in ksqlDB exactly, including their casing. By default, ksqlDB uppercases all field names, which is why the descriptors are also uppercased. If you use backticks for your field names to preserve casing, you must also use backticks in the descriptors, too.
- Explicitly declaring schemas is only needed when using structs and decimals. But because this example makes use of structs in all possible places (input, intermediate, and output values), schemas are declared for all of them.
Info
If you're using a struct with a UDF or UDTF, you can set the
schema using the Udf
, Udtf
, and UdfParameter
annotations. Each provides the option to supply a schema for
various positions.
Create an uberjar in the same manner. If you already have ksqlDB running, be sure to restart it and remount the jar so that it picks up the new code. When you restart the server, keep an eye on the log files. If any of the type schemas are missing or incoherent, ksqlDB will log an error, such as:
1 2 |
|
Invoke the function¶
Try using the UDAF. Create the stream s4
:
1 2 3 4 5 6 7 8 |
|
And insert some rows into it:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
|
Remember to have ksqlDB start all queries from the earliest point in each topic:
1 |
|
Execute the following push query:
1 |
|
Your output should resemble:
1 2 3 4 |
|
If you like, you can destructure the output into individual columns. Try following the query structured data guide.
Tear down the stack¶
When you're done, tear down the stack by running:
1 |
|