Implement a User-defined Function (UDF, UDAF, and UDTF)
Prerequisites¶
- Apache Maven
- Confluent Platform installed locally
- Internet connectivity for downloading Confluent POM files
Create a user-defined function (UDF), a user-defined aggregation function (UDAF), or a user-defined table function (UDTF) by following these steps:
- Create the ksql extensions directory that contains your packages.
- Create Java source and project files for your implementation.
- Build the package for your function.
- Use your custom function in a SQL query or statement.
For more information on custom functions, see Functions.
Create the ksql Extensions Directory¶
When you create a custom user-defined function (UDF), you implement it
in Java and deploy it as a JAR to the ksql
extensions directory. By
default, this directory doesn't exist, so you need to create it and
assign it in the ksqlDB Server configuration properties.
Create the ksql
extensions directory,
<path-to-confluent>/etc/ksql/ext
:
1 |
|
Edit the ksql-server.properties
configuration file in
<path-to-confluent>/etc/ksql
to add the fully qualified path to the
ext
directory:
1 |
|
Note
Use the fully qualified path or the relative path from
<path-to-confluent>/bin
, which is ../etc/ksql/ext
. ksqlDB Server
won't load extensions if the path begins with ~
.
Create the Source and Project Files¶
The following steps shows how to implement your UDF in a Java class and build it by defining a Maven POM file.
- Create a root directory for your UDF's source code and project files.
- Create the source code directory, which has a path that corresponds with the package name.
- Create the Java source code file in the source code directory.
- Create a Project Object Model (POM) file that defines how Maven builds the source code.
Create a Project Root Directory¶
Create the directory that holds your UDF or UDAF project:
1 |
|
Create the Source Code Directory¶
From the root directory for your UDF, create the source code directory.
In this example, the package name is my.company.ksql.udfdemo
.
1 |
|
Create the Java Source Code File¶
The following Java code defines four overloads for a multiply
function. The UdfDescription
and Udf
annotations tell ksqlDB Server to
load the Multiply
class and look for methods to add to its list of
available functions. For more information, see
functions.
Copy the following code into a new file, named Multiply.java
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
|
Save the file to the source code directory that you created in the
previous step, src/main/java/my/company/ksql/udfdemo
.
Create the POM File¶
In the root directory for your custom UDF implementation, create the
Project Object Model (POM) file for the Maven build, and name it
pom.xml
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
|
Important
For production environments, we strongly recommend that you write comprehensive tests to cover your custom functions.
Build the UDF Package¶
Use Maven to build the package and create a JAR. Copy the JAR to the
ksql
extensions directory.
In the root folder for your UDF, run Maven to build the package:
1 |
|
After a great deal of build info, your output should resemble:
1 2 3 4 5 6 7 8 9 10 |
|
The Maven build creates a directory named target
and saves the build
output there. Copy the JAR file,
ksqldb-udf-demo-1.0-jar-with-dependencies.jar
, from the target
directory to the ext
directory of your ksqlDB installation. For example,
if your Confluent Platform installation is at
/home/my-home-dir/confluent-5.5.0
,
copy the JAR to
/home/my-home-dir/confluent-5.5.0/etc/ksqldb/ext
.
1 |
|
The custom UDF is deployed and ready to run.
Use Your Custom UDF in a SQL Query¶
When your custom UDF is deployed in the ksql
extensions directory, it's
loaded automatically when you start ksqlDB Server, and you can use it like
you use the other ksqlDB functions.
Note
ksqlDB loads user defined functions only on startup, so when you make changes to your UDF code and re-deploy the JAR, you must restart ksqlDB Server to get the latest version of your UDF.
Start Confluent Platform and ksqlDB Server:
1 |
|
Start the ksqlDB CLI:
1 |
|
In the ksqlDB CLI, list the available functions to ensure that ksqlDB Server loaded the MULTIPLY user-defined function:
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 9 10 11 |
|
Inspect the details of the MULTIPLY function:
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
Use the MULTIPLY function in a query. If you follow the steps in
Write Streaming Queries Against Apache Kafka® Using ksqlDB (Local),
you can multiply the two BIGINT fields in the pageviews_original
stream:
1 |
|
Your output should resemble:
1 2 3 4 5 6 7 8 |
|
Press Ctrl+C to terminate the query.
User Defined Aggregation Function (UDAF)¶
Implementing a user-defined aggregation function (UDAF) is similar to
the way that you implement a UDF. You use the UdafDescription
and
UdafFactory
annotations in your Java code, and you deploy a JAR to the
ksql
extensions directory. For more information, see
UDAFs.
User Defined Table Function (UDTF)¶
Implementing a user-defined table function (UDTF) is similar to the way
that you implement a UDF. You use the UdtfDescription
and Udtf
annotations in your Java code, and you deploy a JAR to the ksql
extensions
directory. For more information, see UDTFs.
Next Steps¶
- How to Build a UDF and/or UDAF in KSQL 5.0
- Aggregate Streaming Data With ksqlDB
- Join Event Streams with ksqlDB
Page last revised on: 2020-05-06