Экземпляр объекта, относящегося к flink Parallelism & Apply Method - PullRequest
1 голос
/ 27 мая 2020
  • Сначала позвольте мне задать свой вопрос, тогда не могли бы вы прояснить мое предположение о методе apply?

  • Вопрос: если мое приложение создает 1.500.000 (приблизительно ) записи с интервалом в одну минуту, и задание flink считывает эти записи от потребителя kafka с помощью, скажем, 15 ++ различных операторов, тогда этот logi c может создавать задержку, противодавление и т. д. c ..? (вы можете предположить, что параллелизм равен 16)

public class Sample{
  //op1 = 
     kafkaSource
                .keyBy(something)
                .timeWindow(Time.minutes(1))
                .apply(new ApplySomething())
                .name("Name")
                          .addSink(kafkaSink);
  //op2 = 
    kafkaSource
                .keyBy(something2)
                .timeWindow(Time.seconds(1)) // let's assume that this one second
                .apply(new ApplySomething2())
                .name("Name")
                          .addSink(kafkaSink);
 // ...

  //op16 = 
    kafkaSource
                .keyBy(something16)
                .timeWindow(Time.minutes(1)) 
                .apply(new ApplySomething16())
                .name("Name")
                          .addSink(kafkaSink);

}
// ..
public class ApplySomething ... {
  private AnyObject object;
  private int threshold = 30, 40, 100 ...;

      @Override
    public void open(Configuration parameters) throws Exception{
        object = new AnyObject();
    }

    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Record> input, Collector<Result> out) throws Exception{
        int counter = 0;
        for (Record each : input){
          counter += each.getValue();
          if (counter > threshold){
            out.collec(each.getResult());
            return;
          }
        }
    }
}
  • Если да, следует ли мне использовать flatMap с состоянием (RockDB) вместо timeWindow?
  • Мой прогноз - «ДА». Позвольте мне объяснить, почему я так думаю:
    • Если параллелизм равен 16, то будет 16 различных экземпляров индивидуального ApplySomething1(), ApplySomething2()...ApplySomething16(), а также будет шестнадцать AnyObject() экземпляров для каждого ApplySomething..() классов.
    • Когда приложение работает, если keyBy(something) номер раздела больше 16 (предполагается, что мое приложение имеет 1.000.000 различных something в день), то некоторые из ApplySomething..() экземпляров будут обрабатывать разные ключи поэтому один apply() должен ждать циклов другими перед обработкой. Тогда это создаст задержку ?

1 Ответ

2 голосов
/ 27 мая 2020

Время windows Flink выровнено по эпохе (например, если у вас есть набор почасовых windows, все они будут срабатывать по часу). Поэтому, если вы намерены использовать в своей работе несколько разных windows, вам следует настроить их так, чтобы они имели отдельные смещения, чтобы они не запускались одновременно. Это позволит распределить нагрузку. Это будет выглядеть примерно так:

.window(TumblingProcessingTimeWindows.of(Time.minutes(1), Time.seconds(15))

(или используйте TumblingEventTimeWindows в зависимости от обстоятельств). Это создаст минутную windows, которая будет запускаться через 15 секунд после каждой минуты.

Если позволяет ваш вариант использования, вы должны использовать инкрементную агрегацию (через reduce или aggregate), а не использовать WindowFunction (или ProcessWindowFunction), который должен собрать все события, назначенные каждому окну в списке, прежде чем обрабатывать их как своего рода мини-пакет.

Временное окно с ключом будет сохранять свое состояние в RocksDB, если вы настроили RocksDB в качестве бэкэнда состояния. Вам не нужно переключаться на использование RichFlatMap, чтобы получить доступ к RocksDB. (Более того, поскольку flatMap не может использовать таймеры, я предполагаю, что вы действительно в конечном итоге используете вместо этого функцию процесса.)

Пока любой из параллельных экземпляров оконного оператора занят выполнением своей оконной функции (один из ApplySomethings) вы правы, полагая, что эта задача не будет делать ничего другого - и, таким образом, она (если не завершится очень быстро) создаст временное противодавление. При необходимости вы захотите увеличить параллелизм, чтобы задание могло удовлетворить ваши требования к пропускной способности и задержке.

...