Мы заинтересованы в подключении к обычному приложению Flink Streaming из новых Stateful функций ? , в идеале с использованием Table API. Идея состоит в том, чтобы обратиться к таблицам, зарегистрированным во Flink, из Statefun, возможно ли это, и как правильно это сделать?
До сих пор моя идея состояла в том, чтобы инициализировать мой поток таблиц в некоторые основные функции и зарегистрируйте провайдера функций с состоянием для соединения с таблицей:
@AutoService(StatefulFunctionModule.class)
public class Module implements StatefulFunctionModule {
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// SQL query with an inlined (unregistered) table
Table myTable = tableEnv.fromDataStream(ds, "user, product, amount");
tableEnv.createTemporaryView("my_table", myTable);
TableFunctionProvider tableProvider = new TableFunctionProvider();
binder.bindFunctionProvider(FnEnrichmentCallback.TYPE, tableProvider);
//continue registering my other messages
//...
}
}
Провайдер функций с состоянием будет возвращать FnTableQuery
, который просто запрашивает таблицу при получении сообщения:
public class TableFunctionProvider implements StatefulFunctionProvider {
@Override
public StatefulFunction functionOfType(FunctionType type) {
return new FnTableQuery();
}
}
Объект функции запроса будет тогда действовать как субъект для каждого установленного процесса и просто запрашивать таблицу при вызове:
public class FnTableQuery extends StatefulMatchFunction {
static final FunctionType TYPE = new FunctionType(Identifiers.NAMESPACE, "my-table");
private Table myTable;
@Override
public void configure(MatchBinder binder) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
myTable = tableEnv.from("my_table");
binder
.otherwise(this::catchAll);
}
private void catchAll(Context context, Object message) {
context.send(FnEnrichmentCallback.TYPE, myTable.select("max(amount)").toString(), message);
}
}
Заранее извиняюсь, если этот подход не имеет смысла, потому что Я не знаю, могут ли:
Приложения Flink и Statefun работать вместе вне области источников / приемников, тем более что эта конкретная функция не имеет состояния и таблица имеет состояние
Мы можем запросить таблицы Flink, как это, я только запросил их как промежуточный объект для отправки в приемник или да tastream
Имеет смысл инициализировать вещи в Module.configure, и если и поставщик функций с состоянием, и его функция сопоставления вызываются один раз для параллельного рабочего