Как Flink и Beam SDK справляются с управлением окнами - что является более эффективным? - PullRequest
0 голосов
/ 05 сентября 2018

Я сравниваю Apache Beam SDK с Flink SDK для потоковой обработки, чтобы определить стоимость / преимущества использования Beam в качестве дополнительной платформы.

У меня очень простая установка, когда поток данных считывается из источника Kafka и обрабатывается параллельно кластером узлов, работающих под управлением Flink.

Из моего понимания того, как работают эти SDK, самый простой способ обработки потока данных окно за окном:

  1. Использование Apache Beam (работает на Flink):

    1.1. Создайте объект конвейера.

    1.2. Создать PC-коллекцию записей Кафки.

    1,3. Примените функцию управления окнами.

    1.4. Преобразовать конвейер в ключ по окну.

    1,5. Группировать записи по ключу (окну).

    1,6. Примените любую функцию, необходимую для оконных записей.

  2. Использование Flink SDK

    2,1. Создайте поток данных из источника Kafka.

    2,2. Превратите его в поток с ключами, обеспечив функцию ключа.

    2,3. Применить функцию управления окнами.

    * +1032 * 2.4. Примените любую функцию, необходимую для оконных записей.

Несмотря на то, что решение Flink выглядит программно более кратким, по моему опыту, оно менее эффективно при больших объемах данных. Я могу только предположить, что служебная нагрузка вводится функцией извлечения ключа, поскольку Beam не требует этого шага.

Мой вопрос: сравниваю ли я с подобным? Эти процессы не эквивалентны? Что может объяснить, как Beam-путь более эффективен, поскольку он использует Flink в качестве бегуна (а все остальные условия одинаковы)?

Это код, использующий Beam SDK

    PipelineOptions options = PipelineOptionsFactory.create();

    //Run with Flink
    FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class);
    flinkPipelineOptions.setRunner(FlinkRunner.class);
    flinkPipelineOptions.setStreaming(true);
    flinkPipelineOptions.setParallelism(-1); //Pick this up from the user interface at runtime

    // Create the Pipeline object with the options we defined above.
    Pipeline p = Pipeline.create(flinkPipelineOptions);

    // Create a PCollection of Kafka records
    PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = p.apply(KafkaIO.<Long, String>readBytes()
            .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT)
            .withTopics(ImmutableList.of(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC))
            .updateConsumerProperties(ImmutableMap.of("group.id", CONSUMER_GROUP)));

    //Apply Windowing Function    
    PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));

    //Transform the pipeline to key by window
    PCollection<KV<IntervalWindow, KafkaRecord<byte[], byte[]>>> keyedByWindow =
            windowedKafkaCollection.apply(
                    ParDo.of(
                            new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() {
                                @ProcessElement
                                public void processElement(ProcessContext context, IntervalWindow window) {
                                    context.output(KV.of(window, context.element()));
                                }
                            }));
    //Group records by key (window)
    PCollection<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>> groupedByWindow = keyedByWindow
            .apply(GroupByKey.<IntervalWindow, KafkaRecord<byte[], byte[]>>create());

    //Process windowed data
    PCollection<KV<IIntervalWindowResult, IPueResult>> processed = groupedByWindow
            .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));

    // Run the pipeline.
    p.run().waitUntilFinish();

А это код, использующий Flink SDK

// Create a Streaming Execution Environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setParallelism(6);

//Connect to Kafka
Properties properties = new Properties();   
properties.setProperty("bootstrap.servers", KAFKA_IP + ":" + KAFKA_PORT);
properties.setProperty("group.id", CONSUMER_GROUP);

DataStream<ObjectNode> stream = env
            .addSource(new FlinkKafkaConsumer010<>(Arrays.asList(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC), new JSONDeserializationSchema(), properties));

//Key by id
stream.keyBy((KeySelector<ObjectNode, Integer>) jsonNode -> jsonNode.get("id").asInt())

        //Set the windowing function.
        .timeWindow(Time.seconds(5L), Time.seconds(1L))

        //Process Windowed Data
        .process(new PueCalculatorFn(), TypeInformation.of(ImmutablePair.class));

// execute program
env.execute("Using Flink SDK");

Заранее большое спасибо за понимание.

Редактировать

Я подумал, что должен добавить некоторые показатели, которые могут иметь отношение к делу.

Полученные в сети байты

Flink SDK

  • taskmanager.2
    • 2644786446
  • taskmanager.3
    • 2645765232
  • taskmanager.1
    • 2827676598
  • taskmanager.6
    • 2422309148
  • taskmanager.4
    • 2428570491
  • taskmanager.5
    • 2431368644

Beam

  • taskmanager.2
    • 4092154160
  • taskmanager.3
    • 4435132862
  • taskmanager.1
    • 4766399314
  • taskmanager.6
    • 4425190393
  • taskmanager.4
    • 4096576110
  • taskmanager.5
    • 4092849114

загрузка процессора (макс.)

Flink SDK

  • taskmanager.2
    • 93,00%
  • taskmanager.3
    • 92,00%
  • taskmanager.1
    • 91,00%
  • taskmanager.6
    • 90,00%
  • taskmanager.4
    • 90,00%
  • taskmanager.5
    • 92,00%

Beam

  • taskmanager.2
    • 52,0%
  • taskmanager.3
    • 71,0%
  • taskmanager.1
    • 72,0%
  • taskmanager.6
    • 40,0%
  • taskmanager.4
    • 56,0%
  • taskmanager.5
    • 26,0%

Кажется, что в Beam используется гораздо больше сетевых возможностей, тогда как Flink использует значительно больше ЦП. Можно ли предположить, что Beam распараллеливает обработку более эффективно?

Ред. №2

Я почти уверен, что классы PueCalculatorFn эквивалентны, но я поделюсь здесь кодом, чтобы увидеть, станут ли очевидными расхождения между этими двумя процессами.

Beam

public class PueCalculatorFn extends DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>> implements Serializable {
private transient List<IKafkaConsumption> realEnergyRecords;
private transient List<IKafkaConsumption> itEnergyRecords;

@ProcessElement
public void procesElement(DoFn<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>, KV<IIntervalWindowResult, IPueResult>>.ProcessContext c, BoundedWindow w) {
    KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>> element = c.element();
    Instant windowStart = Instant.ofEpochMilli(element.getKey().start().getMillis());
    Instant windowEnd = Instant.ofEpochMilli(element.getKey().end().getMillis());
    Iterable<KafkaRecord<byte[], byte[]>> records = element.getValue();

    //Calculate Pue
    IPueResult result = calculatePue(element.getKey(), records);

    //Create IntervalWindowResult object to return
    DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
    IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
            formatter.format(windowEnd), realEnergyRecords, itEnergyRecords);

    //Return Pue keyed by Window
    c.output(KV.of(intervalWindowResult, result));
}

private PueResult calculatePue(IntervalWindow window, Iterable<KafkaRecord<byte[], byte[]>> records) {
    //Define accumulators to gather readings
    final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
    final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);

    //Declare variable to store the result
    BigDecimal pue = BigDecimal.ZERO;

    //Initialise transient lists
    realEnergyRecords = new ArrayList<>();
    itEnergyRecords = new ArrayList<>();

    //Transform the results into a stream
    Stream<KafkaRecord<byte[], byte[]>> streamOfRecords = StreamSupport.stream(records.spliterator(), false);

    //Iterate through each reading and add to the increment count
    streamOfRecords
            .map(record -> {
                byte[] valueBytes = record.getKV().getValue();
                assert valueBytes != null;
                String valueString = new String(valueBytes);
                assert !valueString.isEmpty();
                return KV.of(record, valueString);
            }).map(kv -> {
        Gson gson = new GsonBuilder().registerTypeAdapter(KafkaConsumption.class, new KafkaConsumptionDeserialiser()).create();
        KafkaConsumption consumption = gson.fromJson(kv.getValue(), KafkaConsumption.class);
        return KV.of(kv.getKey(), consumption);

    }).forEach(consumptionRecord -> {
                switch (consumptionRecord.getKey().getTopic()) {
                    case REAL_ENERGY_TOPIC:
                        totalRealIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                        realEnergyRecords.add(consumptionRecord.getValue());
                        break;
                    case IT_ENERGY_TOPIC:
                        totalItIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed());
                        itEnergyRecords.add(consumptionRecord.getValue());
                        break;
                }
            }
    );

    assert totalRealIncrement.doubleValue() > 0.0;
    assert totalItIncrement.doubleValue() > 0.0;

    //Beware of division by zero
    if (totalItIncrement.doubleValue() != 0.0) {
        //Calculate PUE
        pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
    }

    //Create a PueResult object to return
    IWindow intervalWindow = new Window(window.start().getMillis(), window.end().getMillis());
    return new PueResult(intervalWindow, pue.stripTrailingZeros());
}

@Override
protected void finalize() throws Throwable {
    super.finalize();
    RecordSenderFactory.closeSender();
    WindowSenderFactory.closeSender();
}
} 

Flink

public class PueCalculatorFn extends ProcessWindowFunction<ObjectNode, ImmutablePair, Integer, TimeWindow> {
private transient List<KafkaConsumption> realEnergyRecords;
private transient List<KafkaConsumption> itEnergyRecords;

@Override
public void process(Integer integer, Context context, Iterable<ObjectNode> iterable, Collector<ImmutablePair> collector) throws Exception {
    Instant windowStart = Instant.ofEpochMilli(context.window().getStart());
    Instant windowEnd = Instant.ofEpochMilli(context.window().getEnd());
    BigDecimal pue = calculatePue(iterable);

    //Create IntervalWindowResult object to return
    DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC"));
    IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart),
            formatter.format(windowEnd), realEnergyRecords
            .stream()
            .map(e -> (IKafkaConsumption) e)
            .collect(Collectors.toList()), itEnergyRecords
            .stream()
            .map(e -> (IKafkaConsumption) e)
            .collect(Collectors.toList()));


    //Create PueResult object to return
    IPueResult pueResult = new PueResult(new Window(windowStart.toEpochMilli(), windowEnd.toEpochMilli()), pue.stripTrailingZeros());

    //Collect result
    collector.collect(new ImmutablePair<>(intervalWindowResult, pueResult));

}

protected BigDecimal calculatePue(Iterable<ObjectNode> iterable) {
    //Define accumulators to gather readings
    final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);
    final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0);

    //Declare variable to store the result
    BigDecimal pue = BigDecimal.ZERO;

    //Initialise transient lists
    realEnergyRecords = new ArrayList<>();
    itEnergyRecords = new ArrayList<>();

    //Iterate through each reading and add to the increment count
    StreamSupport.stream(iterable.spliterator(), false)
            .forEach(object -> {
                switch (object.get("topic").textValue()) {
                    case REAL_ENERGY_TOPIC:
                        totalRealIncrement.accumulate(object.get("energyConsumed").asDouble());
                        realEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                        break;
                    case IT_ENERGY_TOPIC:
                        totalItIncrement.accumulate(object.get("energyConsumed").asDouble());
                        itEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object));
                        break;
                }

            });

    assert totalRealIncrement.doubleValue() > 0.0;
    assert totalItIncrement.doubleValue() > 0.0;

    //Beware of division by zero
    if (totalItIncrement.doubleValue() != 0.0) {
        //Calculate PUE
        pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP);
    }
    return pue;
}

}

А вот мой собственный десериализатор, используемый в примере с Beam.

KafkaConsumptionDeserialiser

public class KafkaConsumptionDeserialiser implements JsonDeserializer<KafkaConsumption> {

public KafkaConsumption deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
    if(jsonElement == null) {
        return null;
    } else {
        JsonObject jsonObject = jsonElement.getAsJsonObject();
        JsonElement id = jsonObject.get("id");
        JsonElement energyConsumed = jsonObject.get("energyConsumed");
        Gson gson = (new GsonBuilder()).registerTypeAdapter(Duration.class, new DurationDeserialiser()).registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeDeserialiser()).create();
        Duration duration = (Duration)gson.fromJson(jsonObject.get("duration"), Duration.class);
        JsonElement topic = jsonObject.get("topic");
        Instant eventTime = (Instant)gson.fromJson(jsonObject.get("eventTime"), Instant.class);
        return new KafkaConsumption(Integer.valueOf(id != null?id.getAsInt():0), Double.valueOf(energyConsumed != null?energyConsumed.getAsDouble():0.0D), duration, topic != null?topic.getAsString():"", eventTime);
    }
  }

}

Ответы [ 2 ]

0 голосов
/ 07 сентября 2018

Для чего стоит, если обработка окна может быть предварительно агрегирована с помощью метода limit () или aggregate (), тогда собственное задание Flink должно работать лучше, чем в настоящее время.

Многие детали, такие как выбор состояния сервера, сериализация, контрольные точки и т. Д., Также могут оказать большое влияние на производительность.

Используется ли в обоих случаях один и тот же Flink - т.е. одна и та же версия, одна и та же конфигурация?

0 голосов
/ 07 сентября 2018

Не уверен, почему написанный вами конвейер Beam работает быстрее, но семантически он не совпадает с заданием Flink. Подобно тому, как работает управление окнами в Flink, после назначения окон в Beam все последующие операции автоматически учитывают управление окнами. Вам не нужно группировать по окну.

Ваше определение конвейера Beam можно упростить следующим образом:

// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(flinkPipelineOptions);

// Create a PCollection of Kafka records
PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = ...

//Apply Windowing Function
PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(
 Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1))));

//Process windowed data
PCollection<KV<IIntervalWindowResult, IPueResult>> processed = windowedKafkaCollection
    .apply("filterAndProcess", ParDo.of(new PueCalculatorFn()));

// Run the pipeline.
p.run().waitUntilFinish();

Что касается производительности, то она зависит от многих факторов, но имейте в виду, что Beam - это слой абстракции поверх Flink. Вообще говоря, я был бы удивлен, если бы вы увидели увеличение производительности с Beam на Flink.

edit: Просто для пояснения, вы не группируете поле JSON "id" в конвейере Beam, что вы делаете во фрагменте Flink.

...