UDF python должен быть обернут декоратором udf в pyflink.table.udf
, например:
from pyflink.table.types import DataTypes
from pyflink.table.udf import udf
@udf(input_types=[DataTypes.INT()], result_type=DataTypes.INT())
def add_one(a):
return a + 1
И flink- python jar должен быть загружен при запуске sql -client, например:
$ cd $FLINK_HOME/bin
$ ./start-cluster.sh
$ ./sql-client.sh embedded -pyfs xxx.py -j ../opt/flink-python_2.11-1.11.0.jar
Кроме того, вам нужно добавить taskmanager.memory.task.off-heap.size: 79mb
в $FLINK_HOME/conf/flink-conf.yaml
или другие файлы, которые можно использовать для установки конфигураций (например, файл среды sql клиента ), иначе вы получите ошибку при выполнении python udf:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key'taskmanager.memory .task.off-heap.size'.
Best,
Wei