Одна возможность, может быть наивная, состоит в том, чтобы ввести сон в шаг. Для этого вам нужно знать максимальное количество экземпляров ParDo, которые могут быть запущены одновременно. Если autoscalingAlgorithm
установлено на NONE
, вы можете получить его из numWorkers
и workerMachineType
(DataflowPipelineOptions). Точно, эффективная ставка будет разделена на общее количество потоков: desired_rate/(num_workers*num_threads(per worker))
. Время сна будет обратно пропорционально этой эффективной скорости:
Integer desired_rate = 1; // QPS limit
if (options.getNumWorkers() == 0) { num_workers = 1; }
else { num_workers = options.getNumWorkers(); }
if (options.getWorkerMachineType() != null) {
machine_type = options.getWorkerMachineType();
num_threads = Integer.parseInt(machine_type.substring(machine_type.lastIndexOf("-") + 1));
}
else { num_threads = 1; }
Double sleep_time = (double)(num_workers * num_threads) / (double)(desired_rate);
Тогда вы можете использовать TimeUnit.SECONDS.sleep(sleep_time.intValue());
или его эквивалент в ограниченном Fn. В моем примере, как пример использования, я хотел прочитать из открытого файла, разобрать пустые строки и вызвать API обработки естественного языка с максимальной скоростью 1 QPS (я инициализировал desired_rate
до 1 ранее):
p
.apply("Read Lines", TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt"))
.apply("Omit Empty Lines", ParDo.of(new OmitEmptyLines()))
.apply("NLP requests", ParDo.of(new ThrottledFn()))
.apply("Write Lines", TextIO.write().to(options.getOutput()));
Ограниченная скорость Fn равна ThrottledFn
, обратите внимание на функцию sleep
:
static class ThrottledFn extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
// Instantiates a client
try (LanguageServiceClient language = LanguageServiceClient.create()) {
// The text to analyze
String text = c.element();
Document doc = Document.newBuilder()
.setContent(text).setType(Type.PLAIN_TEXT).build();
// Detects the sentiment of the text
Sentiment sentiment = language.analyzeSentiment(doc).getDocumentSentiment();
String nlp_results = String.format("Sentiment: score %s, magnitude %s", sentiment.getScore(), sentiment.getMagnitude());
TimeUnit.SECONDS.sleep(sleep_time.intValue());
Log.info(nlp_results);
c.output(nlp_results);
}
}
}
При этом я получаю скорость 1 элемент / с, как показано на рисунке ниже, и избегаю введения квоты при использовании нескольких рабочих, даже если запросы на самом деле не распределены (вы можете получить 8 одновременных запросов, а затем 8 снов и т. Д. И т. Д.). ). Это был всего лишь тест, возможно, лучшей реализацией было бы использование гуавы rateLimiter .
Если в конвейере используется автоматическое масштабирование (THROUGHPUT_BASED
), то это будет более сложным, и число работников должно быть обновлено (например, Stackdriver Monitoring имеет метрику job/current_num_vcpus
). Другими общими соображениями было бы управление количеством параллельных ParDos с помощью фиктивного GroupByKey или разделение источника с помощью splitIntoBundles и т. Д. Я хотел бы посмотреть, есть ли другие более хорошие решения.