Functions
ksqlDB has many built-in functions that help with processing records in streaming data, like ABS and SUM. Functions are used within a SQL query to filter, transform, or aggregate data.
With the ksqlDB API, you can implement custom functions that go beyond the built-in functions. For example, you can create a custom function that applies a pre-trained machine learning model to a stream.
ksqlDB supports these kinds of functions:
- Stateless scalar function (UDF)
- A scalar function that takes one input row and returns one output value. No state is retained between function calls. When you implement a custom scalar function, it's called a User-Defined Function (UDF). For more information, see Scalar Function.
- Stateful aggregate function (UDAF)
- An aggregate function that takes N input rows and returns one output value. During the function call, state is retained for all input records, which enables aggregating results. When you implement a custom aggregate function, it's called a User-Defined Aggregate Function (UDAF). For more information, see Aggregate Function.
- Table function (UDTF)
- A table function that takes one input row and returns zero or more output rows. No state is retained between function calls. When you implement a custom table function, it's called a User-Defined Table Function (UDTF). For more information, see Table Function.
Implement a Custom Function¶
Follow these steps to create your custom functions:
-
Write your UDF, UDAF, or UDTF class in Java.
- If your Java class is a UDF, mark it with the
@UdfDescription
and@Udf
annotations. - If your class is a UDAF, mark it with the
@UdafDescription
and@UdafFactory
annotations. - If your class is a UDTF, mark it with the
@UdtfDescription
and@UdtfFactory
annotations.
For more information, see Example UDF class and Example UDAF class.
- If your Java class is a UDF, mark it with the
-
Deploy the JAR file to the
ksql
extensions directory. For more information, see Deploying. - Use your function like any other ksqlDB function in your queries.
Tip
The SHOW FUNCTIONS statement lists the available functions in your ksqlDB Server, including your custom UDF and UDAF functions. Use the DESCRIBE FUNCTION statement to display details about your custom functions.
For a detailed walkthrough on creating a UDF, see the how-to guide for creating a user-defined function.
Creating UDFs, UDAFs, and UDTFs¶
ksqlDB supports creating User Defined Scalar Functions (UDFs), User Defined
Aggregate Functions (UDAFs), and User Defined Table Functions (UDTFs)
by using custom jars that are uploaded to the ext/
directory of the ksqlDB
installation. At start up time, ksqlDB scans the jars in the directory looking
for any classes that annotated with @UdfDescription
(UDF), @UdafDescription
(UDAF), or @UdtfDescription
(UDTF).
- Classes annotated with
@UdfDescription
are scanned for any public methods that are annotated with@Udf
. - Classes annotated with
@UdafDescription
are scanned for any public static methods that are annotated with@UdafFactory
. - Classes annotated with
@UdtfDescription
are scanned for any public methods that are annotated with@Udtf
.
Each function that is found is parsed and, if successful, loaded into ksqlDB.
Each function instance has its own child-first ClassLoader
that is
isolated from other functions. If you need to use any third-party
libraries with your functions, they should also be part of your jar,
which means that you should create an "uber-jar". The classes in your
uber-jar are loaded in preference to any classes on the ksqlDB classpath,
excluding anything vital to the running of ksqlDB, i.e., classes that are
part of org.apache.kafka
and io.confluent
. Further, the
ClassLoader
can restrict access to other classes via a blacklist. The
blacklist file is resource-blacklist.txt
. You can add any classes or
packages that you want blacklisted from UDF use. For example you may not
want a UDF to be able to fork processes. Further details on how to
blacklist are available below.
UDFs¶
To create a UDF you need to create a class that's annotated with
@UdfDescription
. Each method in the class that represents a UDF must
be public and annotated with @Udf
. The class you create represents a
collection of UDFs all with the same name but may have different
arguments and return types.
@UdfParameter
annotations can be added to method parameters to provide
users with richer information, including the parameter schema. This
annotation is required if the SQL type can't be inferred from the Java
type, for example, STRUCT
.
Null Handling¶
If a UDF uses primitive types in its signature it is indicating that the
parameter should never be null
. Conversely, using boxed types indicates
the function can accept null
values for the parameter. It's up to the
implementor of the UDF to chose which is the more appropriate. A common
pattern is to return null
if the input is null
, though generally
this is only for parameters that are expected to be supplied from the
source row being processed.
For example, a substring(String str, int pos)
UDF might return null
if str
is null
, but a null
value for the pos
parameter would be
treated as an error, and so should be a primitive. In fact, the built-in
substring is more lenient and would return null
if pos
is null
).
The return type of a UDF can also be a primitive or boxed type. A
primitive return type indicates the function will never return null
,
whereas a boxed type indicates that it may return null
.
The ksqlDB Server checks the value that's passed to each parameter and
reports an error to the server log for any null
values being passed to a
primitive type. The associated column in the output row will be null
.
Dynamic return type¶
UDFs support dynamic return types that are resolved at runtime. This is
useful if you want to implement a UDF with a non-deterministic return
type, like DECIMAL
or STRUCT
. For example, a UDF that returns
BigDecimal
, which maps to the SQL DECIMAL
type, may vary the
precision and scale of the output based on the input schema.
To use this functionality, you need to specify a method with signature
public SqlType <your-method-name>(final List<SqlType> params)
and
annotate it with @UdfSchemaProvider
. Also, you need to link it to the
corresponding UDF by using the schemaProvider=<your-method-name>
parameter of the @Udf
annotation.
Generics in UDFs¶
A UDF declaration can utilize generics if they match the following conditions:
- Any generic in the return value of a method must appear in at least one of the method parameters
- The generic must not adhere to any interface. For example,
<T extends Number>
is not valid). - The generic does not support type coercion or inheritance. For
example,
add(T a, T b)
will acceptBIGINT, BIGINT
but notINT, BIGINT
.
Example UDF class¶
The class below creates a UDF named multiply
. The name of the UDF is
provided in the name
parameter of the UdfDescription
annotation.
This name is case-insensitive and is what can be used to call the UDF.
As can be seen this UDF can be invoked in different ways:
- with two int parameters returning a long (BIGINT) result.
- with two long (BIGINT) parameters returning a long (BIGINT) result.
- with two nullable Long (BIGINT) parameters returning a nullable Long (BIGINT) result.
- with two double parameters returning a double result.
- with variadic double parameters returning a double result.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
|
If you're using Gradle to build your UDF or UDAF, specify the
ksqldb-udf
dependency:
1 |
|
To compile with the latest version of ksqldb-udf
:
1 |
|
If you're using Maven to build your UDF or UDAF, specify the ksqldb-udf
dependency in your POM file:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
UdfDescription Annotation¶
The @UdfDescription
annotation is applied at the class level and has
four fields, two of which are required. The information provided here is
used by the SHOW FUNCTIONS
and DESCRIBE FUNCTION <function>
commands.
Field | Description | Required |
---|---|---|
name | The case-insensitive name of the UDF(s) represented by this class. | Yes |
description | A string describing generally what the function(s) in this class do. | Yes |
author | The author of the UDF. | No |
version | The version of the UDF. | No |
Udf Annotation¶
The @Udf
annotation is applied to public methods of a class annotated
with @UdfDescription
. Each annotated method will become an invocable function
in SQL. This annotation supports the following fields:
Field | Description | Required |
---|---|---|
description | A string describing generally what a particular version of the UDF does (see the following example). | No |
schema | The ksqlDB schema for the return type of this UDF. | For complex types such as STRUCT if schemaProvider is not passed in. |
schemaProvider | A reference to a method that computes the return schema of this UDF. For more info, see Dynamic return type. | For complex types, like STRUCT, if schema is not provided. |
1 2 3 4 5 6 |
|
UdfParameter Annotation¶
The @UdfParameter
annotation is applied to parameters of methods
annotated with @Udf
. ksqlDB uses the additional information in the
@UdfParameter
annotation to specify the parameter schema (if it can't
be inferred from the Java type) or to provide users with richer
information about the method when, for example, they execute
DESCRIBE FUNCTION
on the method.
Field | Description | Required |
---|---|---|
value | The case-insensitive name of the parameter | Required if the UDF JAR was not compiled with the -parameters javac argument. |
description | A string describing generally what the parameter represents | No |
schema | The ksqlDB schema for the parameter. | For complex types, like STRUCT |
Note
If schema
is supplied in the @UdfParameter
annotation for a STRUCT
it is considered "strict" - any inputs must match exactly, including
order and names of the fields.
1 2 3 4 5 6 7 8 9 |
|
If your Java8 class is compiled with the -parameters
compiler flag,
the name of the parameter will be inferred from the method declaration.
Configurable UDF¶
If the UDF class needs access to the ksqlDB Server configuration it can
implement org.apache.kafka.common.Configurable
, for example:
1 2 3 4 5 6 7 8 9 10 11 |
|
For security reasons, only settings whose name is prefixed with
ksql.functions.<lowercase-udfname>.
or ksql.functions._global_.
are
propagated to the UDF.
UDAFs¶
To create a UDAF you need to create a class that's annotated with
@UdafDescription
. Each method in the class that's used as a factory
for creating an aggregation must be public static
, be annotated with
@UdafFactory
, and must return either Udaf
or TableUdaf
. The class
you create represents a collection of UDAFs all with the same name but
may have different arguments and return types.
Both Udaf
and TableUdaf
are parameterized by three types: I
is the
input type of the UDAF. A
is the data type of the intermediate storage
used to keep track of the state of the UDAF. O
is the data type of the
return value. Decoupling the data types of the state and return value
enables you to define UDAFs like average
, as shown in the following example.
When you create a UDAF, use the map
method to provide the logic that
transforms an intermediate aggregate value to the returned value.
Example UDAF class¶
The following class creates a UDAF named my_average
. The name of the UDAF
is provided in the name
parameter of the UdafDescription
annotation.
This name is case-insensitive and is what can be used to call the UDAF.
The class provides three factories that return a TableUdaf
, one for
each of the input types Long, Integer, and Double. Moreover, it provides
a factory that returns a Udaf
that doesn't support undo. Each method
defines a different type for the intermediate state based on the input
type (I
), which in this case is a STRUCT consisting of two fields, the
SUM, of type I
, and the COUNT, of type Long. To get the result of the
UDAF, each method implements a map
function that returns the Double
division of the accumulated SUM and COUNT.
The UDAF can be invoked in four ways:
- With a Long (BIGINT) column, returning the aggregated value as
Double. Defines the schema for intermediate state type using the
annotation parameter
parameterSchema
. The return type isTableUdaf
and therefore supports theundo
operation. - With an Integer column returning the aggregated value as Double. Likewise defines the schema of the Struct and supports undo.
- With a Double column, returning the aggregated value as Double. Likewise defines the schema of the Struct and supports undo.
- With a String (VARCHAR) column and an initializer that is a String (VARCHAR), returning the average String (VARCHAR) length as a Double.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 |
|
UdafDescription Annotation¶
The @UdafDescription
annotation is applied at the class level and has
four fields, two of which are required. The information provided here is
used by the SHOW FUNCTIONS
and DESCRIBE FUNCTION <function>
commands.
Field | Description | Required |
---|---|---|
name | The case-insensitive name of the UDAF(s) represented by this class. | Yes |
description | A string describing generally what the function(s) in this class do. | Yes |
author | The author of the UDF. | No |
version | The version of the UDF. | No |
UdafFactory Annotation¶
The @UdafFactory
annotation is applied to public static methods of a
class annotated with @UdafDescription
. The method must return either
Udaf
, or, if it supports table aggregations, TableUdaf
. Each
annotated method is a factory for an invocable aggregate function in
SQL. The annotation supports the following fields:
Field | Description | Required |
---|---|---|
description | A string describing generally what the function(s) in this class do. | Yes |
paramSchema | The ksqlDB schema for the input parameter. | For complex types, like STRUCT |
aggregateSchema | The ksqlDB schema for the intermediate state. | For complex types, like STRUCT |
returnSchema | The ksqlDB schema for the return value. | For complex types, like STRUCT |
Note
If paramSchema
, aggregateSchema
or returnSchema
is supplied in
the @UdfParameter
annotation for a STRUCT
, it's considered
"strict" - any inputs must match exactly, including order and names of
the fields.
You can use this to better describe what a particular version of the UDAF does, for example:
1 2 3 4 5 6 7 |
|
UDTFs¶
To create a UDTF you need to create a class that is annotated with
@UdtfDescription
. Each method in the class that represents a UDTF must be
public and annotated with @Udtf
. The class you create represents a collection
of UDTFs all with the same name but may have different arguments and return
types.
@UdfParameter
annotations can be added to method parameters to provide users
with richer information, including the parameter schema. This annotation is
required if the SQL type can't be inferred from the Java type, like STRUCT
.
For an example UDTF implementation, see Data Enrichment in ksqlDB Using UDTFs.
Null Handling¶
If a UDTF uses primitive types in its signature, this indicates that the
parameter should never be null
. Conversely, using boxed types indicates that
the function can accept null
values for the parameter. It's up to the
implementer of the UDTF to chose which is the most appropriate. A common
pattern is to return null
if the input is null
, though generally this is
only for parameters that are expected to be supplied from the source row being
processed.
For example, a substring(String str, int pos)
UDF might return null if str
is null, but a null pos
parameter would be treated as an error and so should
be a primitive. The built-in substring
function is more lenient and return
null
if pos is null
.
The return type of a UDTF can also be a primitive or boxed type. A primitive
return type indicates that the function never returns null
, and a boxed type
indicates that it may return null
.
ksqlDB server checks the value being passed to each parameter and reports an
error to the server log for any null
values being passed to a primitive type.
The associated column in the output row will be null
.
Dynamic return type¶
UDTFs support dynamic return types that are resolved at runtime. This is useful
if you want to implement a UDTF with a non-deterministic return type. For
example, a UDTF that returns a BigDecimal
may vary the precision and scale
of the output based on the input schema.
To use this functionality, specify a method with signature
public SqlType <your-method-name>(final List<SqlType> params)
and annotate it
with @UdfSchemaProvider
. Also, you need to link it to the corresponding UDF by
using the schemaProvider=<your-method-name>
parameter of the @Udtf
annotation.
If your UDTF method returns a value of type List<T>
, the type referred to
by the schema provider method is the type T
, not the type List<T>
.
Example UDTF class¶
The following class creates a UDTF named split_string
. The name of the UDTF
is provided in the name
parameter of the UdtfDescription
annotation. This
name is case-insensitive, and you can use it to call the UDTF.
UDTF methods must return a value of type List<T>
, where T
is any of the
supported SQL Java types.
You can invoke this UDTF in two different ways:
- with a single string containing the string to split;
- with a string containing the string to split and a regex to define the delimiter.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
If you're using Gradle to build your UDF or UDAF, specify the ksqldb-udf
dependency:
1 |
|
To compile with the latest version of ksqldb-udf
:
1 |
|
If you're using Maven to build your function, specify the ksqldb-udf
dependency in your POM file:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
UdtfDescription Annotation¶
The @UdtfDescription
annotation is applied at the class level and has four
fields, two of which are required. The information provided here is used by
the SHOW FUNCTIONS
and DESCRIBE FUNCTION <function>
commands.
Field | Description | Required |
---|---|---|
name | The case-insensitive name of the UDTF(s) represented by this class. | Yes |
description | A string describing generally what the function(s) in this class do. | Yes |
author | The author of the UDTF. | No |
version | The version of the UDTF. | No |
Udtf Annotation¶
The @Udtf
annotation is applied to public methods of a class annotated with
@UdtfDescription
. Each annotated method becomes an invocable function in
SQL. This annotation supports the following fields:
Field | Description | Required |
---|---|---|
description | A string describing generally what a particular version of the UDTF does (see example). | No |
schema | The ksqlDB schema for the return type of this UDTF. | For complex types such as STRUCT if schemaProvider is not passed in. |
schemaProvider | A reference to a method that computes the return schema of this UDTF. (See Dynamic Return Types for more info). | For complex types such as STRUCT if schema is not passed in. |
Annotating UDTF Parameters¶
You can use the @UdfParameter
annotation to provide extra information for
UDTF parameters. This is the same annotation as used for UDFs. Please see the
earlier documentation on this for further information.
Supported Types¶
ksqlDB supports the following Java types for UDFs, UDAFs, and UDTFs.
Java Type | SQL Type |
---|---|
int | INTEGER |
Integer | INTEGER |
boolean | BOOLEAN |
Boolean | BOOLEAN |
long | BIGINT |
Long | BIGINT |
double | DOUBLE |
Double | DOUBLE |
String | VARCHAR |
List | ARRAY |
Map | MAP |
Struct | STRUCT |
BigDecimal | DECIMAL |
Note
Using Struct
or BigDecimal
in your functions requires specifying the
schema by using paramSchema
, returnSchema
, aggregateSchema
, or a
schema provider.
Deploying¶
To deploy your user defined functions, you create a jar containing all of the
classes required by the functions. If you depend on third-party libraries,
this should be an uber-jar containing these libraries. Once the jar
is created, deploy it to each ksqlDB server instance. Copy the jar
to the ext/
directory that's part of the ksqlDB distribution. The ext/
directory can be configured via the ksql.extension.dir
.
The jars in the ext/
directory are scanned only at start-up, so you
must restart your ksqlDB Server instances to pick up new and updated UD(A)Fs.
It s important to ensure that you deploy the custom jars to each server instance. Failure to do so results in errors when processing any statements that try to use these functions. The errors may go unnoticed in the ksqlDB CLI if the ksqlDB Server instance it is connected to has the jar installed, but one or more other ksqlDB servers don't have it installed. In these cases, the errors will appear in the ksqlDB Server log (ksql.log) . The error would look something like:
1 2 |
|
The servers that don't have the jars don't process any queries using the custom UD(A)Fs. Processing will continue, but it's restricted to only the servers with the correct jars installed.
Usage¶
Once your functions are deployed, you can call them in the same way you
would invoke any of the ksqlDB built-in functions. The function names are
case-insensitive. For example, using the multiply
example:
1 2 3 4 |
|
ksqlDB Custom Functions and Security¶
Blacklisting¶
In some deployment environments, it may be necessary to restrict the
classes that UD(A)Fs have access to, as they may represent a security
risk. To reduce the attack surface of ksqlDB user defined functions you can
optionally blacklist classes and packages so that they can't be used from a
UD(A)F. An example blacklist is in a file named resource-blacklist.txt
in the ext/
directory. All of the entries in the default version of the
file are commented out, but it shows how you can use the blacklist.
This file contains one entry per line, where each line is a class or
package that should be blacklisted. The matching of the names is based
on a regular expression, so if you have an entry, java.lang.Process
like
this:
1 |
|
This matches any paths that begin with java.lang.Process
, like
java.lang.Process
, java.lang.ProcessBuilder
, etc.
If you want to blacklist a single class, for example,
java.lang.Compiler
, then you would add:
1 |
|
Any blank lines or lines beginning with #
are ignored. If the file is
not present, or is empty, then no classes are blacklisted.
Security Manager¶
By default, ksqlDB installs a simple Java security manager for executing
user defined functions. The security manager blocks attempts by any functions
to fork processes from the ksqlDB Server. It also prevents them from calling
System.exit(..)
.
You can disable the security manager by setting
ksql.udf.enable.security.manager
to false
.
Disabling ksqlDB Custom Functions¶
You can disable the loading of all UDFs in the ext/
directory by
setting ksql.udfs.enabled
to false
. By default, they are enabled.
Metric Collection¶
Metric collection can be enabled by setting the config
ksql.udf.collect.metrics
to true
. This defaults to false
and is
generally not recommended for production usage, as metrics are
collected on each invocation and introduce some overhead to
processing time.
Suggested Reading¶
- Data Enrichment in ksqlDB Using UDTFs
- ksqlDB UDFs and UDAFs Made Easy
- How to Build a UDF and/or UDAF in ksqlDB 5.0