Apache Flink, количество слотов задач против env.setParallelism - PullRequest
1 голос
/ 14 января 2020

Не могли бы вы объяснить различия между слотом задач и параллелизмом в Apache Flink v1.9?

  • Вот мое понимание до сих пор

    • Флинк говорит, что TaskManager - это рабочий ПРОЦЕСС. И обычно у вас должен быть один TaskManager на один компьютер.
    • Допустим, у меня есть 3 компьютера, и у них обоих по 16 процессорных ядер. Каждый компьютер будет TaskManager. Поэтому у меня будет 3 TaskManager
    • Я думал, что если на одном компьютере установлено 16 ядер процессора, то TaskManager может создать до 16 слотов задач. Поэтому там есть изоляция процессора. Однако Флинк говорит, что link => " Обратите внимание, что здесь не происходит изоляция ЦП; в настоящее время слоты отделяют только управляемую память задач. "
    • Это означает, что 16 слотов = 16 потоков? А также numberOfSlot can be >= numberOfCpuCores?
  • Если временные интервалы означают поток, это может привести к «общей проблеме доступа к данным, состоянию гонки» и т. Д. c ..? Это мой первый вопрос.

  • Второй вопрос - тот, который я написал в начале моего поста => различий между слотом задач и параллелизмом. Я говорю о env.setparalellism (число).
    • Допустим, мой номер параллелизма = 2
    • Тогда для каждого слота задачи (потока или чего бы то ни было) будет выполняться 2 потока?
      • если это так, это может привести к «проблеме доступа к данным, состоянию гонки» и т. Д. c ..?
      • , если это не так, что означает параллелизм?
  • Вот пример. В этом примере я должен заботиться о написании метода apply() из-за среды потоков ?:

public class AverageSensorReadings {
 public static void main(String[] args) throws Exception {
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  int paralellism = env.getParallelism();
  int maxParal = env.getMaxParallelism();

  // ingest sensor stream
  DataStream < SensorReading > sensorData = env
   // SensorSource generates random temperature readings
   .addSource(new SensorSource())
   // assign timestamps and watermarks which are required for event time
   .assignTimestampsAndWatermarks(new SensorTimeAssigner());

  DataStream < SensorReading > avgTemp = sensorData
   // convert Fahrenheit to Celsius using and inlined map function
   .map(r -> new SensorReading(r.id, r.timestamp, (r.temperature - 32) * (5.0 / 9.0)))
   // organize stream by sensor
   .keyBy(r -> r.id)
   // group readings in 1 second windows
   .timeWindow(Time.seconds(4))
   // compute average temperature using a user-defined function
   .apply(new TemperatureAverager());

  // print result stream to standard out
  //avgTemp.print();
  System.out.println("paral: " + paralellism + " max paral: " + maxParal);
  // execute application
  env.execute("Compute average sensor temperature");
 }

 public static class TemperatureAverager extends RichWindowFunction < SensorReading, SensorReading, String, TimeWindow > {

  /**
   * apply() is invoked once for each window.
   *
   * @param sensorId the key (sensorId) of the window
   * @param window meta data for the window
   * @param input an iterable over the collected sensor readings that were assigned to the window
   * @param out a collector to emit results from the function
   */
  @Override
  public void apply(String sensorId, TimeWindow window, Iterable < SensorReading > input, Collector < SensorReading > out) {
   System.out.println("APPLY FUNCTION START POINT");
   System.out.println("sensorId: " + sensorId + "\n");

   // compute the average temperature
   int cnt = 0;
   double sum = 0.0;
   for (SensorReading r: input) {
    System.out.println("collected item: " + r);
    cnt++;
    sum += r.temperature;
   }
   double avgTemp = sum / cnt;
   System.out.println("APPLY FUNCTION END POINT");
   System.out.println("----------------------------\n\n");
   // emit a SensorReading with the average temperature
   out.collect(new SensorReading(sensorId, window.getEnd(), avgTemp));
  }
 }
}

1 Ответ

3 голосов
/ 14 января 2020

Обычно каждый слот запускает один параллельный экземпляр вашего конвейера. Таким образом, параллелизм задания равен количеству слотов, необходимых для его выполнения. (Используя группы совместного использования слотов, вы можете принудительно задать указанные c задачи в их собственные слоты, что затем увеличит количество требуемых слотов.)

Каждая задача (которая состоит из одного или нескольких операторов, соединенных вместе) выполняется в один Java поток.

Менеджер задач может создать столько слотов, сколько вы хотите. В типичных конфигурациях используется 1 ядро ​​ЦП на слот, но для конвейеров с высокими требованиями к обработке может потребоваться 2 или более ядер на слот, а для конвейеров, которые в основном простаивают, вы можете go в другом направлении и настроить несколько слотов на ядро .

Все задачи / потоки, работающие в диспетчере задач, будут просто конкурировать за ресурсы ЦП, которые диспетчер задач может получить от машины или контейнера, в котором он размещен.

Все состояния локальные к одному экземпляру оператора (задаче), который его использует, поэтому весь доступ происходит внутри этого одного потока. Единственное место, где гипотетически может быть условие гонки, - это обратные вызовы onTimer и processElement в ProcessFunction, но эти методы синхронизированы, поэтому вам не нужно об этом беспокоиться. Поскольку весь доступ к состоянию является локальным, это приводит к высокой пропускной способности, низкой задержке и высокой масштабируемости.

В вашем примере, если параллелизм равен двум, у вас будет два слота, независимо выполняющих один и тот же лог c на разных кусках ваших данных. Если они используют состояние, то это будет состояние с разделением ключей, которым управляет Flink, который можно рассматривать как хранилище ключей / значений.

В случае данных датчика во времени windows вам не нужно беспокоиться о многопоточности. KeyBy разделит данные так, что один экземпляр будет обрабатывать все события и windows для некоторых датчиков, а другой экземпляр (при условии, что их два) будет обрабатывать остальные.

...