Тестирование Flink со встроенным Kafka - PullRequest
1 голос
/ 01 марта 2020

У меня есть простое приложение Flink, которое суммирует события с тем же идентификатором и отметкой времени за последнюю минуту:

DataStream<String> input = env
                .addSource(consumerProps)
                .uid("app");

DataStream<Event> events = input.map(record -> mapper.readValue(record, Event.class));

pixels
        .assignTimestampsAndWatermarks(new TimestampsAndWatermarks())
        .keyBy("id")
        .timeWindow(Time.minutes(1))
        .sum("constant")
        .addSink(simpleNotificationServiceSink);


env.execute(jobName);


private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Pixel> {
        public TimestampsAndWatermarks() {
            super(Time.seconds(90));
        }

        // timestampReadable is timestamp rounded on minutes, in format yyyyMMddhhmm
        @Override
        public long extractTimestamp(Pixel pixel) {
            return Long.parseLong(pixel.timestampReadable);
        }
    }

Я хотел бы реализовать сценарий:

  1. Запустите встроенный Kafka

  2. Publi sh пару сообщений в топи c

  3. Используйте сообщения с Flink

  4. Проверьте правильность вывода, произведенного Flink

Предоставляет ли Flink утилиты для тестирования работы со встроенным Kafka? Если да, каков рекомендуемый подход?

Спасибо.

1 Ответ

0 голосов
/ 01 марта 2020

Есть правило JUnit, которое вы можете использовать для вызова встроенного Kafka - см. (См. https://github.com/charithe/kafka-junit).

Чтобы тесты завершались без ошибок, попробуйте что-то вроде этого:

public class TestDeserializer extends YourKafkaDeserializer<T> {
  public final static String END_APP_MARKER = "END_APP_MARKER"; // tests send as last record

  @Override
  public boolean isEndOfStream(ParseResult<T> nextElement) {
    if (nextElement.getParseError() == null)
      return false;

    if (END_APP_MARKER.equals(nextElement.getParseError().getRawData()))
      return true;

    return false;
  }
}
...