Соединения Flink Statefun с Flink Table API - PullRequest
2 голосов
/ 11 апреля 2020

Мы заинтересованы в подключении к обычному приложению Flink Streaming из новых Stateful функций ? , в идеале с использованием Table API. Идея состоит в том, чтобы обратиться к таблицам, зарегистрированным во Flink, из Statefun, возможно ли это, и как правильно это сделать?

До сих пор моя идея состояла в том, чтобы инициализировать мой поток таблиц в некоторые основные функции и зарегистрируйте провайдера функций с состоянием для соединения с таблицей:

@AutoService(StatefulFunctionModule.class)
public class Module implements StatefulFunctionModule {

  @Override
  public void configure(Map<String, String> globalConfiguration, Binder binder) {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    // ingest a DataStream from an external source
    DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);

    // SQL query with an inlined (unregistered) table
    Table myTable = tableEnv.fromDataStream(ds, "user, product, amount");
    tableEnv.createTemporaryView("my_table", myTable);

    TableFunctionProvider tableProvider = new TableFunctionProvider();
    binder.bindFunctionProvider(FnEnrichmentCallback.TYPE, tableProvider);

    //continue registering my other messages
    //...
  }
}

Провайдер функций с состоянием будет возвращать FnTableQuery, который просто запрашивает таблицу при получении сообщения:

public class TableFunctionProvider implements StatefulFunctionProvider {

  @Override
  public StatefulFunction functionOfType(FunctionType type) {
    return new FnTableQuery();
  }
}

Объект функции запроса будет тогда действовать как субъект для каждого установленного процесса и просто запрашивать таблицу при вызове:

public class FnTableQuery extends StatefulMatchFunction {

  static final FunctionType TYPE = new FunctionType(Identifiers.NAMESPACE, "my-table");

  private Table myTable;

  @Override
  public void configure(MatchBinder binder) {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

    myTable = tableEnv.from("my_table");

    binder
        .otherwise(this::catchAll);
  }

  private void catchAll(Context context, Object message) {
    context.send(FnEnrichmentCallback.TYPE, myTable.select("max(amount)").toString(), message);
  }
}

Заранее извиняюсь, если этот подход не имеет смысла, потому что Я не знаю, могут ли:

  1. Приложения Flink и Statefun работать вместе вне области источников / приемников, тем более что эта конкретная функция не имеет состояния и таблица имеет состояние

  2. Мы можем запросить таблицы Flink, как это, я только запросил их как промежуточный объект для отправки в приемник или да tastream

  3. Имеет смысл инициализировать вещи в Module.configure, и если и поставщик функций с состоянием, и его функция сопоставления вызываются один раз для параллельного рабочего

1 Ответ

0 голосов
/ 14 апреля 2020

Сообщество Apache Flink намерено поддерживать Flink DataStreams как вход / выход StateFun в будущем.

Это означает, что вы можете использовать потоки результатов с помощью API Flink Table / Flink CEP / DataStream API et c. И вызывать функции, используя события в потоках.

...