Есть ли способ передать только отфильтрованные события из потока Apache Flink в поток AsyncDataStream / AsyncIO? - PullRequest
0 голосов
/ 29 апреля 2019

Итак, у меня есть несколько журналов в Json, и у меня есть поток, который проверяет / отфильтровывает требуемый Json и прекрасно работает!

Теперь я хочу использовать AsyncIO для поиска в БД из отфильтрованного Json, но кажется, что asyncInvoke выполняется на каждом входе потока вместо отфильтрованных результатов.

DataStream<String> stringInputStream = env.addSource(flinkKafkaConsumer);

stringInputStream
    .flatMap(stringToJsonObject()) // Make sure only JSON logs go through.
    .returns(JsonObject.class)
    .filter(filterLogs("my-app")) // Filter logs for my-app
    .flatMap(jsonStringToJsonObject("someJsonEncodedStringField"))
    .returns(JsonObject.class)
    .filter(filterSpecificEvent()); // This stream works as expected, putting print() here only prints filtered events.

DataStream<JsonObject> lookupCarrierCodeStream = 
    AsyncDataStream.orderedWait(stringInputStream, lookupCodesInDB(), 3000, TimeUnit.MILLISECONDS, 100);

private static RichAsyncFunction<String, JsonObject> lookupCodesInDB() {
  return new RichAsyncFunction<String, JsonObject>() {
      @Override
      public void asyncInvoke(String input, ResultFuture<JsonObject> resultFuture) throws Exception {
          // This seems to receive all events, rather then the filtered ones.
          System.out.println("Input:" + input);

          resultFuture.complete(Collections.singleton(new JsonObject(input)));
      }
  };
}

Обновление

Кажется, это работает, если я разделю потоки так ...

DataStream<String> kafkaStringInput = env.addSource(flinkKafkaConsumer);

DataStream<JsonObject> jsonLogsInput = ...;
DataStream<JsonObject> appLogsInput = ...;
DataStream<JsonObject> evenInput = ...;

DataStream<JsonObject> lookupStream = AsyncDataStream.orderedWait(evenInput, ...);

Не знаю, почему это не сработает, но хорошо.

1 Ответ

1 голос
/ 30 апреля 2019

Применение функции к потоку, как в

eventStream
  .flatmap()

, не изменяет eventStream, но вместо этого возвращает новый поток.

Итак, вы хотите сделать что-то вроде этого:

DataStream<JsonObject>filteredStream = stringInputStream
  .flatMap(stringToJsonObject())
  .returns(JsonObject.class)
  .filter(filterLogs("my-app"))
  .flatMap(jsonStringToJsonObject("someJsonEncodedStringField"))
  .returns(JsonObject.class)
  .filter(filterSpecificEvent());

DataStream<JsonObject> lookupCarrierCodeStream = 
  AsyncDataStream.orderedWait(filteredStream, lookupCodesInDB(), 3000, TimeUnit.MILLISECONDS, 100);
...