Как распараллелить HTTP-запросы на шаге Apache Beam? - PullRequest
0 голосов
/ 15 октября 2018

У меня есть конвейер Apache Beam, работающий в потоке данных Google, работа которого довольно проста:

  • Он считывает отдельные объекты JSON из Pub / Sub
  • Анализирует их
  • И отправляет их через HTTP на некоторый API

Этот API требует, чтобы я отправлял элементы партиями по 75 штук. Поэтому я создал DoFn, который накапливает события в списке и публикует их через этот APIкак только они получаются 75. Это приводит к слишком медленным результатам, поэтому я подумал вместо того, чтобы выполнять эти HTTP-запросы в разных потоках с использованием пула потоков.

Реализация того, что у меня сейчас есть, выглядит так:

private class WriteFn : DoFn<TheEvent, Void>() {
  @Transient var api: TheApi

  @Transient var currentBatch: MutableList<TheEvent>

  @Transient var executor: ExecutorService

  @Setup
  fun setup() {
    api = buildApi()
    executor = Executors.newCachedThreadPool()
  }

  @StartBundle
  fun startBundle() {
    currentBatch = mutableListOf()
  }

  @ProcessElement
  fun processElement(processContext: ProcessContext) {
    val record = processContext.element()

    currentBatch.add(record)

    if (currentBatch.size >= 75) {
      flush()
    }
  }

  private fun flush() {
    val payloadTrack = currentBatch.toList()
    executor.submit {
      api.sendToApi(payloadTrack)
    }
    currentBatch.clear()
  }

  @FinishBundle
  fun finishBundle() {
    if (currentBatch.isNotEmpty()) {
      flush()
    }
  }

  @Teardown
  fun teardown() {
    executor.shutdown()
    executor.awaitTermination(30, TimeUnit.SECONDS)
  }
}

Кажется, это работает "отлично" в том смысле, что данные поступают в API.Но я не знаю, является ли это правильным подходом, и у меня есть чувство, что это очень медленно.

Причина, по которой я думаю, что это медленно, заключается в том, что при нагрузочном тестировании (отправка нескольких миллионов событий в Pub /Sub), конвейеру нужно пересылать эти сообщения в API (который имеет время отклика менее 8 мс) в 8 раз больше, чем моему ноутбуку, чтобы передать их в Pub / Sub.

IsЕсть ли проблемы с моей реализацией?Это то, как я должен это делать?

Также ... я должен ждать завершения всех запросов в моем методе @FinishBundle (то есть путем получения фьючерсов, возвращаемых исполнителем, и ожидания наих)?

1 Ответ

0 голосов
/ 18 октября 2018

У вас есть два взаимосвязанных вопроса:

  1. Вы делаете это правильно / вам нужно что-то изменить?
  2. Вам нужно подождать в @FinishBundle?

Второй ответ: да.Но на самом деле вам нужно выполнить более тщательную очистку, как станет ясно.

Как только ваш метод @FinishBundle будет успешным, бегун Beam будет считать, что пакет успешно завершен.Но ваш @FinishBundle только отправляет запросы - это не гарантирует, что они были успешными.Таким образом, вы можете потерять данные таким образом, если впоследствии запросы не будут выполнены.Ваш метод @FinishBundle должен фактически блокировать и ждать подтверждения успеха от TheApi.Кстати, все вышеперечисленное должно быть идемпотентным, так как после завершения связки может произойти землетрясение и вызвать повторную попытку; -)

Итак, чтобы ответить на первый вопрос: следует ли что-то изменить?Просто выше.Практика пакетных запросов таким способом может работать до тех пор, пока вы уверены, что результаты зафиксированы до того, как пакет будет принят.

Вы можете обнаружить, что это приведет к замедлению конвейера, поскольку происходит @FinishBundleчаще, чем @Setup.Для пакетирования запросов по пакетам необходимо использовать низкоуровневые функции состояний и таймеров.Я написал надуманную версию вашего сценария использования по адресу https://beam.apache.org/blog/2017/08/28/timely-processing.html.. Мне было бы интересно узнать, как это работает для вас.

Возможно, просто то, что вы ожидаете крайне низкую задержку вмалый миллисекундный диапазон, недоступен, если в вашем конвейере имеется длительная тасовка.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...