Flink Несколько окон на одних и тех же данных - PullRequest
0 голосов
/ 30 января 2019

Мое приложение flink выполняет следующий

  1. источник: чтение данных в виде записей из Kafka
  2. split: на основе определенных критериев
  3. window: timewindow of10 секунд, чтобы объединиться в одну массовую запись
  4. приемник: сбросить эти массовые записи в эластичный поиск

Я столкнулся с проблемой, когда потребитель flink не может хранить данные в течение 10 секунд и выдает следующее исключение:

Причина: java.util.concurrent.ExecutionException: java.io.IOException: Размер состояния больше максимально допустимого состояния с поддержкой памяти.Size = 18340663, maxSize = 5242880

Я не могу применить countWindow, потому что, если частота записей слишком медленная, приемник упругого поиска может быть отложен на длительное время.

Мой вопрос:

Можно ли применить функцию ИЛИ TimeWindow и CountWindow, которая выглядит как

> if ( recordCount is 500 OR 10 seconds have elapsed)
>           then dump data to flink

Ответы [ 2 ]

0 голосов
/ 30 января 2019

Вы также можете использовать RocksDB State Backend , но пользовательский триггер будет работать лучше.

0 голосов
/ 30 января 2019

Не напрямую.Но вы можете использовать GlobalWindow с пользовательской логикой запуска.Взгляните на источник для счетчика триггеров здесь .

Ваша логика запуска будет выглядеть примерно так:

private final ReducingStateDescriptor<Long> stateDesc = 
    new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private long triggerTimestamp = 0;

@Override
public TriggerResult onElement(String element, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {

    ReducingState<Long> count = triggerContext.getPartitionedState(stateDesc);

    // Increment window counter by one, when an element is received
    count.add(1L); 

    // Start the timer when the first packet is received
    if (count.get() == 1) {
        triggerTimestamp = triggerContext.getCurrentProcessingTime() + 10000; // trigger at 10 seconds from reception of first event
        triggerContext.registerProcessingTimeTimer(triggerTimestamp); // Override the onProcessingTime method to trigger the window at this time
    }

    // Or trigger the window when the number of packets in the window reaches 500
    if (count.get() >= 500) {
        // Delete the timer, clear the count and fire the window   
        triggerContext.deleteProcessingTimeTimer(triggerTimestamp);
        count.clear();
        return TriggerResult.FIRE;
    }

    return TriggerResult.CONTINUE;
}
...