Как использовать SparkContext.submitJob для вызова REST API - PullRequest
0 голосов
/ 12 марта 2020

Может ли кто-нибудь предоставить пример вызова метода submitJob

Найдена ссылка здесь: Как выполнить асинхронные c операции (т.е. возвращение будущего) из map / filter / et c. ?

Я полагаю, что могу реализовать это для своего варианта использования

В моей текущей реализации я использую партиции для вызова параллельных вызовов, но они ждут ответа, прежде чем вызвать следующую call

Dataframe.rdd.reparition(TPS allowed on API)
.map(row => {
            val response = callApi(row)
            parse(response)
    })

Но из-за задержки в конце API я жду ответа 10 секунд перед синтаксическим анализом и затем выполняю следующий вызов. У меня 100 TPS, но текущие логи c я вижу только 4-7 TPS

Если кто-то использовал SparkContext.submitJob , чтобы выполнять асинхронные вызовы, приведите пример, поскольку я новичок spark и scala

Я хочу вызывать вызовы, не ожидая ответа, обеспечивая 100 TPS, а затем, как только я получу ответ, я хочу проанализировать и создать Dataframe поверх него.

I ранее пытался собирать строки и вызывать вызовы API из главного узла, похоже, ограничен оборудованием для создания большого пула потоков

submitJob [T, U, R] (rdd: RDD [T], processPartition: (Iterator [T]) ⇒ U, разделы: Seq [Int], resultHandler: (Int, U) ⇒ Единица, resultFun c: ⇒ R): SimpleFutureAction [R]

Rdd - rdd из моего Dataframe

paritition - мой rdd уже разделен, я предоставляю диапазон от 0 до No.of.partitions в моем rdd?

processPartition - это мой callApi () ?

resultHandler - не уверен, что здесь делать

resultFun c - я думаю, что это будет анализ моего ответа

Как создать Dataframe после SimpleFutureAction

Может кто-нибудь помочь, пожалуйста

1 Ответ

0 голосов
/ 12 марта 2020

submitJob не сделает ваши вызовы API автоматически быстрее. Это часть низкоуровневой реализации параллельной обработки Spark - Spark разделяет действия на задания и затем отправляет их в любой планировщик кластера. Вызов submitJob подобен запуску потока Java - задание будет выполняться асинхронно, но не быстрее, чем если бы вы просто вызвали действие на фрейме данных / RDD.

ИМХО, ваш лучший вариант - использовать mapPartitions который позволяет запускать функцию в контексте каждого раздела. У вас уже есть данные, разделенные таким образом, чтобы обеспечить максимальный параллелизм, просто убедитесь, что у вас достаточно исполнителей Spark, чтобы фактически запустить эти разделы:

df.rdd.repartition(#concurrent API calls)
  .mapPartitions(partition => {
    partition.map(row => {
      val response = callApi(row)
      parse(response)
    })
  })
  .toDF("col1", "col2", ...)

mapPartitions ожидает функцию, которая отображает Iterator[T] (все данные в одном разделе) в Iterator[U] (преобразованный раздел) и возвращает RDD[U]. Преобразование обратно в фрейм данных - это вопрос объединения в цепочку вызова toDF() с соответствующими именами столбцов.

Вы можете с помощью sh реализовать какое-либо ограничение скорости на поток в callApi, чтобы сделать конечно, ни один исполнитель не запускает большое количество запросов в секунду. Помните, что исполнители могут работать как в отдельных потоках, так и в отдельных JVM.

Конечно, простой вызов mapPartitions ничего не делает. Вам нужно инициировать действие на результирующем кадре данных, чтобы вызовы API действительно запускались.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...