Apache DoFn с разделением лучей не распределяет работу между несколькими рабочими потоками данных - PullRequest
0 голосов
/ 26 мая 2020

Я учусь использовать разделяемую DoFn. Я ожидаю, что моя работа будет распределена между 500 рабочими, но Dataflow выполняла ее только с 1 или 2 рабочими. Я понимаю или неправильно реализую разделяемый DoFn?

моя версия луча - 2.16.0

мой DoFn

    class Calculate extends DoFn<String, String> {

    private static final long serialVersionUID = 1L;

    @ProcessElement
    public void process(ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
        for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
            try {
                c.output(i + "_" + InetAddress.getLocalHost().getHostName() + "_" + Math.random() + "_" +  c.element());
                for (int j = 0; j < 10000000; j++ ) {
                    Math.sqrt(j);
                }
            } catch (UnknownHostException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    @GetInitialRestriction
    public OffsetRange getInitialRange(String element) {
        return new OffsetRange(1L, 50000000L);
    }
    @SplitRestriction
    public void splitRestriction(String element, OffsetRange restriction, OutputReceiver<OffsetRange> receiver) {

        for (long i = restriction.getFrom(); i < restriction.getTo(); i += 100000) {
            receiver.output(new OffsetRange(i, i + 100000));
        }

    }
   }

моя основная функция

public static void main(String[] args) {

    //PipelineOptions options = PipelineOptionsFactory.create();
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.read().from("gs://mybucket/EMEA/20200525/singleposition.csv"))
     .apply(ParDo.of(new Calculate()))
     .apply(TextIO.write().to("gs://mybucket/EMEA/20200525/obligor.csv"));

    p.run().waitUntilFinish();
  }

1 Ответ

1 голос
/ 26 мая 2020

Я обнаружил два основных пробела в вашем понимании.

  1. Как мы можем контролировать количество рабочих в облачном потоке данных (то есть вы хотите создать 500 рабочих для вашего Calculate () DoFn ())?

Cloud Dataflow предлагает 2 алгоритма масштабирования.

a) На основе пропускной способности: в этом сервисе Dataflow автоматически выбирается соответствующее количество рабочих экземпляров, необходимых для выполнения вашего задания. Однако вы можете ограничить максимальное количество рабочих, которые он будет порождать, используя --max_num_workers.

Алгоритм на основе пропускной способности является настройкой по умолчанию для облачного потока данных.

b) Нет (--autoscaling_algorithm = NONE): здесь вы можете указать количество рабочих, которых вы хотите создать для своих заданий вручную. Вы можете указать количество рабочих, используя параметр --num_worker.

В вашем случае использования вы используете режим по умолчанию. Следовательно, вы получаете только 2 рабочих в зависимости от вашей нагрузки.

Прочтите эту ссылку для получения более подробной информации.

Что такое Splittable DoFn () и когда его использовать?

Рассмотрим следующие два варианта использования для чтения данных из источника:

- Чтение имена файлов из Kafka и читать каждый файл с помощью textIO (ограниченный источник)

- читать весь список разделов для kafka topi c и читать данные из каждого раздела (ограниченный источник)

Приведенные выше варианты использования очень сложно реализовать с помощью ParDo (читайте старый ParDO), потому что

функции ParDo монолитны c - единственный processElement () вызов, который может выводить только ограниченное количество элементов, но не предоставляет информацию о прогрессе или размере, принимает подсказки параллелизации, разделение или контрольную точку.

SDF - это DoFn с немонолитным c элементом обработка - при обработке одного элемента исполнитель может делать запросы к SDF, чтобы немонолитно выполнять работу, связанную с этим элементом, по нескольким, возможно, параллельным процессам. () вызывает

Итак, вы можете читать несколько текстовых файлов параллельно с Splittable DoFn для описанного выше варианта использования.

В дополнение к вышесказанному вы можете обратиться к этим двум документам для получения более подробной информации: SDFProposal , Dynami c балансировка

Теперь в вашем сценарии я не вижу необходимости в немонолитных c обработка. Здесь вы не добьетесь какого-либо дополнительного преимущества в производительности, используя SDF.

...