Дросселирование шага в приложении луча - PullRequest
0 голосов
/ 05 сентября 2018

Я использую Python Beam в потоке данных Google, мой конвейер выглядит так:

Чтение URL-адресов изображений из файла >> Загрузка изображений >> Обработка изображений

Проблема в том, что я не могу позволить скачивать изображения пошаговое масштабирование столько, сколько нужно, потому что мое приложение может быть заблокировано сервером изображений.

Это способ, которым я могу задушить шаг? Либо на входе или выходе в минуту.

Спасибо.

1 Ответ

0 голосов
/ 11 сентября 2018

Одна возможность, может быть наивная, состоит в том, чтобы ввести сон в шаг. Для этого вам нужно знать максимальное количество экземпляров ParDo, которые могут быть запущены одновременно. Если autoscalingAlgorithm установлено на NONE, вы можете получить его из numWorkers и workerMachineType (DataflowPipelineOptions). Точно, эффективная ставка будет разделена на общее количество потоков: desired_rate/(num_workers*num_threads(per worker)). Время сна будет обратно пропорционально этой эффективной скорости:

Integer desired_rate = 1; // QPS limit

if (options.getNumWorkers() == 0) { num_workers = 1; }
else { num_workers = options.getNumWorkers(); }

if (options.getWorkerMachineType() != null) { 
    machine_type = options.getWorkerMachineType();
    num_threads = Integer.parseInt(machine_type.substring(machine_type.lastIndexOf("-") + 1));
}
else { num_threads = 1; }

Double sleep_time = (double)(num_workers * num_threads) / (double)(desired_rate);

Тогда вы можете использовать TimeUnit.SECONDS.sleep(sleep_time.intValue()); или его эквивалент в ограниченном Fn. В моем примере, как пример использования, я хотел прочитать из открытого файла, разобрать пустые строки и вызвать API обработки естественного языка с максимальной скоростью 1 QPS (я инициализировал desired_rate до 1 ранее):

p
    .apply("Read Lines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt"))
    .apply("Omit Empty Lines", ParDo.of(new OmitEmptyLines()))
    .apply("NLP requests", ParDo.of(new ThrottledFn()))
    .apply("Write Lines", TextIO.write().to(options.getOutput()));

Ограниченная скорость Fn равна ThrottledFn, обратите внимание на функцию sleep:

static class ThrottledFn extends DoFn<String, String> {
    @ProcessElement
    public void processElement(ProcessContext c) throws Exception {

        // Instantiates a client
        try (LanguageServiceClient language = LanguageServiceClient.create()) {

          // The text to analyze
          String text = c.element();
          Document doc = Document.newBuilder()
              .setContent(text).setType(Type.PLAIN_TEXT).build();

          // Detects the sentiment of the text
          Sentiment sentiment = language.analyzeSentiment(doc).getDocumentSentiment();                 
          String nlp_results = String.format("Sentiment: score %s, magnitude %s", sentiment.getScore(), sentiment.getMagnitude());

          TimeUnit.SECONDS.sleep(sleep_time.intValue());

          Log.info(nlp_results);
          c.output(nlp_results);
        }
    }
}

При этом я получаю скорость 1 элемент / с, как показано на рисунке ниже, и избегаю введения квоты при использовании нескольких рабочих, даже если запросы на самом деле не распределены (вы можете получить 8 одновременных запросов, а затем 8 снов и т. Д. И т. Д.). ). Это был всего лишь тест, возможно, лучшей реализацией было бы использование гуавы rateLimiter .

enter image description here

Если в конвейере используется автоматическое масштабирование (THROUGHPUT_BASED), то это будет более сложным, и число работников должно быть обновлено (например, Stackdriver Monitoring имеет метрику job/current_num_vcpus). Другими общими соображениями было бы управление количеством параллельных ParDos с помощью фиктивного GroupByKey или разделение источника с помощью splitIntoBundles и т. Д. Я хотел бы посмотреть, есть ли другие более хорошие решения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...