В Flip-106
есть пример того, как вызвать пользовательскую функцию python в пакетном задании java приложение через SQL Function DDL ...
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");
tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();
I Я пытался воспроизвести этот же пример в приложении java потокового задания, и это мой код:
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");
fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
Table table = fsTableEnv.fromValues("1", "2", "3").as("str").select("func1(str)");
/* Missing line */
для этой конкретной строки в пакетном задании:
tEnv.toDataSet(table, String.class).collect();
Я не нашел эквивалента для задания потоковой передачи
1. Можете ли вы помочь мне сопоставить этот пример flip-106 от партии к потоку?
В конечном итоге я хочу вызвать с помощью flink 1.11 функцию python в потоковом задании java переверните приложение следующим образом:
final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");
fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
final Table table = fsTableEnv.fromDataStream(stream_filtered.map(x->x.idsUmid)).select("func1(f0)").as("umid");
System.out.println("Result --> " + table.select($("umid")) + " --> End of Result");
и используйте результат этого udf для дальнейшего процесса (не обязательно печатать его в консоли)
Я редактировал test.py
файл, чтобы увидеть, хотя бы независимо от безымянной таблицы что-то делается в python.
from pyflink.table.types import DataTypes
from pyflink.table.udf import udf
from os import getcwd
@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def func1(line):
print(line)
print(getcwd())
with open("test.txt", "a") as myfile:
myfile.write(line)
return line
и ничего не печатается, файл test.txt не создается и значение не вернулся к потоковой работе. Таким образом, эта функция python не вызывается.
2. Что мне здесь не хватает?
Спасибо Дэвиду, Вей и Синбо за поддержку, потому что каждая предложенная деталь сработала для меня.
С уважением,
Джонатан