Флинк, как не сохранить состояние для оператора? - PullRequest
0 голосов
/ 22 апреля 2019

У меня есть настройка checkpointing в моей работе Flink, и у нее есть 2 скользящих окна (это не соединения) и 1 соединение с переворачивающимися окнами.Идея заключается в том, что мне не нужно сохранять состояние для самого join, так как достаточно сохранить состояние для самих 2 раздвижных окон.Join в конечном итоге находится в состоянии 20-30 ГБ, что приводит к задержке и аварийному завершению задания, а контрольная точка никогда не заканчивается сохранением.

Как я могу это сделать?

Я пытаюсь что-то вроде:

public class CustomJoin implements JoinFunction<A, A, A>, ListCheckPointed<A> {

@Override
public A join(A a, A b){
 // Some irrelevant join logic
}

@Override
    public List<A> snapshotState(long l, long l1) throws Exception {
      return new ArrayList<>();
    }

    @Override
    public void restoreState(List<A> list) throws Exception {

    }
}

Действительно ли это позволяет избежать сохранения состояния для соединения?Это называется как:

stream
.assignTimestampsAndWatermarks(...)
.join(secondStream.assingTimestampsAndWatermarks(...))
.where(KeySelector...)
.equalTo(KeySelector...)
.window(TumblingEventTimeWindows.of(Time.minutes(1L))
.trigger(EventTimeTrigger.create())
.apply(new CustomJoin());

Это работает на практике?Каков наилучший способ избежать сохранения состояния?

Ответы [ 2 ]

0 голосов
/ 01 мая 2019

В оконном соединении JoinFunction выполняется оператором окна.У него нет своего государства.То, что вы пытаетесь, не поможет.

Более того, раздвижные окна используют гораздо больше состояний, чем вы можете себе представить.Каждый перекрывающийся экземпляр имеет свою собственную копию содержимого окна.Так, например, если у вас есть часовые окна, которые сдвигаются на 1 минуту, то каждое событие копируется 60 раз.

0 голосов
/ 23 апреля 2019

согласно моему пониманию Flink, контрольная точка должна гарантировать, что весь расчет может быть восстановлен безопасно и эффективно, поэтому это глобальное состояние неизбежно. Но собственную контрольную точку Флинка можно закрыть (она основана на алгоритме ABS , который имеет небольшую потерю производительности, я не рекомендую его), но использует SavePoint , предоставленный Flink для пользовательских снимки, но контрольная точка Flink является инкрементной. Сохранить, а SavePoint - это полное сохранение. Я бы посоветовал вам взглянуть на эти материалы: 1. Распределенные моментальные снимки, определяющие глобальные состояния распределенной системы. 2. Легкие асинхронные моментальные снимки для распределенных потоков данных. 3 https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html Я думаю, что это может решить вашу проблему очень хорошо.

...