User-defined functions
User-defined functions enable you to extend ksqlDB's suite of built-in functions using Java hooks. This section is a reference for how they work. Use the how-to guide to learn how to use them.
Data type mapping¶
Because SQL has a type system that is independent from Java’s, user-defined functions (UDFs) need to use specific Java types so that ksqlDB can manage the correspondence from SQL to Java. Below is the mapping to use for all UDF parameters and return types. Use boxed types when you want to tolerate null values.
SQL Type | Java Type |
---|---|
INT |
int , java.lang.Integer |
BOOLEAN |
boolean , java.lang.Boolean |
BIGINT |
long , java.lang.Long |
DOUBLE |
double , java.lang.Double |
DECIMAL |
java.math.BigDecimal |
VARCHAR |
java.lang.String |
BYTES |
java.nio.ByteBuffer |
TIME |
java.sql.Time |
DATE |
java.sql.Date |
TIMESTAMP |
java.sql.Timestamp |
ARRAY |
java.util.List |
MAP |
java.util.Map |
STRUCT |
org.apache.kafka.connect.data.Struct |
BYTES |
java.nio.ByteBuffer |
Note
Using Struct
or BigDecimal
in your functions requires specifying the
schema by using paramSchema
, returnSchema
, aggregateSchema
, or a
schema provider.
Classloading¶
How does ksqlDB choose which classes to load as user-defined functions? At start up time, ksqlDB scans the jars in its extensions directory looking for classes with UDF annotated. Each function that is found is parsed and, if successful, loaded into ksqlDB.
Each function instance has its own child-first ClassLoader
that is
isolated from other functions. If you need to use any third-party
libraries with your functions, they should also be part of your jar,
which means that you should create an uberjar. The classes in your
uberjar are loaded in preference to any classes on the ksqlDB classpath,
excluding anything vital to the running of ksqlDB, i.e., classes that are
part of org.apache.kafka
and io.confluent
.
Annotations¶
Annotations
not only help ksqlDB figure out which classes will be used as UDFs,
they also help commands like DESCRIBE FUNCTION
display helpful metadata.
Scalar functions¶
When a class is annotated with @UdfDescription
, it's scanned for any public methods
that are annotated with @Udf
. If it matches, the class is loaded as a scalar function.
Each method's parameters may optionally be annotated with @UdfParameter
. Here is what
each of these annotations can be parameterized with.
@UdfDescription
¶
The @UdfDescription
annotation is applied at the class level.
Field | Description | Required |
---|---|---|
name | The case-insensitive name of the UDF(s) represented by this class. | Yes |
description | A string describing generally what the function(s) in this class do. | Yes |
category | For grouping similar functions in the output of SHOW FUNCTIONS. |
No |
author | The author of the UDF. | No |
version | The version of the UDF. | No |
@Udf
¶
The @Udf
annotation is applied to public methods of a class annotated
with @UdfDescription
. Each annotated method will become an invocable function
in SQL.
Field | Description | Required |
---|---|---|
description | A string describing generally what a particular version of the UDF does. | No |
schema | The ksqlDB schema for the return type of this UDF. | For complex types such as STRUCT if schemaProvider is not passed in. |
schemaProvider | A reference to a method that computes the return schema of this UDF (e.g. dynamic return type). | For complex types, like STRUCT , if schema is not provided. |
@UdfParameter
¶
The @UdfParameter
annotation is applied to parameters of methods
annotated with @Udf
. ksqlDB uses the information in the
@UdfParameter
annotation to specify the parameter schema (if it can't
be inferred from the Java type) and to convey metadata.
Field | Description | Required |
---|---|---|
value | The case-insensitive name of the parameter | Required if the UDF JAR was not compiled with the -parameters javac argument. |
description | A string describing generally what the parameter represents | No |
schema | The ksqlDB schema for the parameter. | For complex types, like STRUCT |
Note
If schema
is supplied in the @UdfParameter
annotation for a STRUCT
it is considered "strict" - any inputs must match exactly, including
order and names of the fields.
If your Java 8 class is compiled with the -parameters
compiler flag,
the name of the parameter will be inferred from the method declaration.
Tabular functions¶
When a class is annotated with @UdtfDescription
, it's scanned for any public methods
that are annotated with @Udtf
. If it matches, the class is loaded as a tabular function.
Each method's parameters may optionally be annotated with @UdfParameter
. Here is what
each of these annotations can be parameterized with.
@UdtfDescription
¶
The @UdtfDescription
annotation is applied at the class level.
Field | Description | Required |
---|---|---|
name | The case-insensitive name of the UDTF(s) represented by this class. | Yes |
description | A string describing generally what the function(s) in this class do. | Yes |
author | The author of the UDTF. | No |
version | The version of the UDTF. | No |
@Udtf
¶
The @Udtf
annotation is applied to public methods of a class annotated with
@UdtfDescription
. Each annotated method becomes an invocable function in
SQL. This annotation supports the following fields:
Field | Description | Required |
---|---|---|
description | A string describing generally what a particular version of the UDTF does. | No |
schema | The ksqlDB schema for the return type of this UDTF. | For complex types such as STRUCT if schemaProvider is not passed in. |
schemaProvider | A reference to a method that computes the return schema of this UDTF. (e.g. dynamic return type). | For complex types such as STRUCT if schema is not passed in. |
@UdfParameter
¶
You can use the @UdfParameter
annotation to provide extra information for
UDTF parameters. This is the same annotation as used for UDFs. Please see the
earlier documentation on this for further information.
Aggregation functions¶
When a class is annotated with @UdafDescription
, it's scanned for any public static methods that are annotated with @UdafFactory
that return either Udaf
or TableUdaf
. If it matches, the class is loaded as an aggregation function. The factory function represents a collection of UDAFs all with the same name but
may have different arguments and return types. Here is what each of these annotations can be parameterized with.
Both Udaf
and TableUdaf
are parameterized by three generic types:
I
is the input type of the UDAF.I
can be a tuple type, one ofPair
,Triple
,Quadruple
, orQuintuple
, when there are multiple column arguments.VariadicArgs
can be nested inside a tuple to create a variadic column argument. A function can have at most one variadic argument anywhere in its signature (including the parameters ofUdafFactory
). A variadic column argument may haveObject
as its type parameter to accept any number of columns of any type, though a variadicObject
factory argument is not supported. A variadic column argument outside a tuple is not supported.A
is the data type of the intermediate storage used to keep track of the state of the UDAF.O
is the data type of the return value.
Decoupling the data types of the state and return value
enables you to define UDAFs like average
, as shown in the following example.
When you create a UDAF, you can use the map
method to provide the logic that
transforms an intermediate aggregate value to the returned value.
The merge
method is only called when merging sessions when session windowing is used.
@UdafDescription
¶
The @UdafDescription
annotation is applied at the class level.
Field | Description | Required |
---|---|---|
name | The case-insensitive name of the UDAF(s) represented by this class. | Yes |
description | A string describing generally what the function(s) in this class do. | Yes |
author | The author of the UDF. | No |
version | The version of the UDF. | No |
@UdafFactory
¶
The @UdafFactory
annotation is applied to public static methods of a
class annotated with @UdafDescription
. The method must return either
Udaf
, or, if it supports table aggregations, TableUdaf
. Each
annotated method is a factory for an invocable aggregate function in
SQL. The annotation supports the following fields:
Field | Description | Required |
---|---|---|
description | A string describing generally what the function(s) in this class do. | Yes |
paramSchema | The ksqlDB schema(s) for the input parameter(s). If you provide fewer schemas than there are parameters, the schemas for the remaining parameters will default to being empty. if you provide more schemas than there are arguments, the extra schemas will be ignored. | For complex types, like STRUCT |
aggregateSchema | The ksqlDB schema for the intermediate state. | For complex types, like STRUCT |
returnSchema | The ksqlDB schema for the return value. | For complex types, like STRUCT |
Note
If paramSchema
, aggregateSchema
or returnSchema
is supplied in
the @UdafFactory
annotation for a STRUCT
, it's considered
"strict" - any inputs must match exactly, including order and names of
the fields.
Null values¶
If a user defined function uses primitive types in its signature it is indicating that the
parameter should never be null
. Conversely, using boxed types indicates
the function can accept null
values for the parameter. It's up to the
implementer of the UDF to choose which is the more appropriate. A common
pattern is to return null
if the input is null
, though generally
this is only for parameters that are expected to be supplied from the
source row being processed.
For example, a substring(String str, int pos)
UDF might return null
if str
is null
, but a null
value for the pos
parameter would be
treated as an error, and so should be a primitive. In fact, the built-in
substring is more lenient and would return null
if pos
is null
).
The return type of a UDF can also be a primitive or boxed type. A
primitive return type indicates the function will never return null
,
whereas a boxed type indicates that it may return null
.
ksqlDB checks the value that's passed to each parameter and
reports an error to the server log for any null
values being passed to a
primitive type. The associated column in the output row will be null
.
Dynamic types¶
UDFs support dynamic return types that are resolved at runtime. This is
useful if you want to implement a UDF with a non-deterministic return
type, like DECIMAL
or STRUCT
. For example, a UDF that returns
BigDecimal
, which maps to the SQL DECIMAL
type, may vary the
precision and scale of the output based on the input schema.
To use this functionality, you need to specify a method with signature
public SqlType <your-method-name>(final List<SqlArgument> params)
and
annotate it with @UdfSchemaProvider
. Also, you need to link it to the
corresponding UDF by using the schemaProvider=<your-method-name>
parameter of the @Udf
annotation.
When implementing dynamic returns for a UDTF function, if your method returns
a value of type List<T>
, the type referred to by the schema provider method
is the type T
, not the type List<T>
.
For dynamic UDAFs, the aggregate
or map
methods may depend on the input SQL type, so
implementations of the Udaf
interface override some of the following three methods:
initializeTypeArguments(List<SqlArgument> argTypeList)
, getAggregateSqlType()
, and
getReturnSqlType()
.
Generics¶
A UDF declaration can utilize generics if they match the following conditions:
-
Any generic in the return value of a method must appear in at least one of the method parameters
-
The generic must not adhere to any interface. For example,
<T extends Number>
is not valid. -
The generic does not support type coercion or inheritance. For example,
add(T a, T b)
will acceptBIGINT, BIGINT
but notINT, BIGINT
.
External parameters¶
If the UDF class needs access to the ksqlDB Server configuration, it can
implement org.apache.kafka.common.Configurable
. configure()
will be
invoked with the map of server parameters. This can be useful for parameterizing
a function on a per-deployment basis.
For security reasons, only settings whose name is prefixed with
ksql.functions.<lowercase-udfname>.
or ksql.functions._global_.
are
propagated to the UDF.
Security¶
Blacklisting¶
In some deployment environments, it may be necessary to restrict the
classes that UDFs have access to, as they may represent a security
risk. To reduce the attack surface of ksqlDB user defined functions you can
optionally blacklist classes and packages so that they can't be used from a
UDF. An example blacklist is in a file named resource-blacklist.txt
in the extensions directory. All of the entries in the default version of the
file are commented out, but it shows how you can use the blacklist.
This file contains one entry per line, where each line is a class or
package that should be blacklisted. The matching of the names is based
on a regular expression, so if you have an entry, java.lang.Process
like
this:
1 |
|
This matches any paths that begin with java.lang.Process
, like
java.lang.Process
, java.lang.ProcessBuilder
, etc.
If you want to blacklist a single class, for example,
java.lang.Compiler
, then you would add:
1 |
|
Any blank lines or lines beginning with #
are ignored. If the file is
not present, or is empty, then no classes are blacklisted.
Security Manager¶
By default, ksqlDB installs a simple Java security manager for executing
user defined functions. The security manager blocks attempts by any functions
to fork processes from the ksqlDB Server. It also prevents them from calling
System.exit(..)
.
You can disable the security manager by setting
ksql.udf.enable.security.manager
to false
.
Disabling ksqlDB Custom Functions¶
You can disable the loading of all UDFs in the extensions directory by
setting ksql.udfs.enabled
to false
. By default, they are enabled.
Metrics¶
Metric collection can be enabled by setting the config
ksql.udf.collect.metrics
to true
. This defaults to false
and is
generally not recommended for production usage, as metrics are
collected on each invocation and introduce some overhead to
processing time. See more details in the
UDF metrics reference section.