У меня есть конвейер Apache Beam, работающий в потоке данных Google, работа которого довольно проста:
- Он считывает отдельные объекты JSON из Pub / Sub
- Анализирует их
- И отправляет их через HTTP на некоторый API
Этот API требует, чтобы я отправлял элементы партиями по 75 штук. Поэтому я создал DoFn
, который накапливает события в списке и публикует их через этот APIкак только они получаются 75. Это приводит к слишком медленным результатам, поэтому я подумал вместо того, чтобы выполнять эти HTTP-запросы в разных потоках с использованием пула потоков.
Реализация того, что у меня сейчас есть, выглядит так:
private class WriteFn : DoFn<TheEvent, Void>() {
@Transient var api: TheApi
@Transient var currentBatch: MutableList<TheEvent>
@Transient var executor: ExecutorService
@Setup
fun setup() {
api = buildApi()
executor = Executors.newCachedThreadPool()
}
@StartBundle
fun startBundle() {
currentBatch = mutableListOf()
}
@ProcessElement
fun processElement(processContext: ProcessContext) {
val record = processContext.element()
currentBatch.add(record)
if (currentBatch.size >= 75) {
flush()
}
}
private fun flush() {
val payloadTrack = currentBatch.toList()
executor.submit {
api.sendToApi(payloadTrack)
}
currentBatch.clear()
}
@FinishBundle
fun finishBundle() {
if (currentBatch.isNotEmpty()) {
flush()
}
}
@Teardown
fun teardown() {
executor.shutdown()
executor.awaitTermination(30, TimeUnit.SECONDS)
}
}
Кажется, это работает "отлично" в том смысле, что данные поступают в API.Но я не знаю, является ли это правильным подходом, и у меня есть чувство, что это очень медленно.
Причина, по которой я думаю, что это медленно, заключается в том, что при нагрузочном тестировании (отправка нескольких миллионов событий в Pub /Sub), конвейеру нужно пересылать эти сообщения в API (который имеет время отклика менее 8 мс) в 8 раз больше, чем моему ноутбуку, чтобы передать их в Pub / Sub.
IsЕсть ли проблемы с моей реализацией?Это то, как я должен это делать?
Также ... я должен ждать завершения всех запросов в моем методе @FinishBundle
(то есть путем получения фьючерсов, возвращаемых исполнителем, и ожидания наих)?