Условное выполнение mapPartition в Spark? - PullRequest
0 голосов
/ 08 ноября 2018

Учитывая, что в кадре данных есть параметры для вызовов Http в доступный через Интернет API, я хочу распределить эти вызовы API по всем узлам кластера, выполнить вызовы и вернуть новый кадр данных с результатами вызовов. Это относительно просто, и у меня есть решение, которое работает.

Что не является простым, так это ограничение скорости выполнения вызовов API. Если, например, вы можете совершать только 100 вызовов API в секунду, я хочу каким-то образом убедиться, что мы не превышаем ограничение. Кроме того, при запуске на более крупном кластере я хочу каким-то образом убедиться, что этот распределенный вызывающий API не превратится в низкокачественную DDoS-атаку.

Одно работоспособное решение состоит в том, чтобы каждый поток спал, когда он получает сообщение об отказе от сервера (HTTP 429 - слишком много запросов). Однако на данный момент вы уже выполнили DDoS-атаку; Я бы хотел помедленнее, прежде чем мы доберемся до этой точки.

Я попытался реализовать это с помощью аккумулятора и широковещательной переменной. В каждом вызове реализован аккумулятор, а широковещательная переменная была временем начала. Затем каждый работник может разделить накопитель по времени, чтобы увидеть, была ли частота запросов слишком высокой. К сожалению, вы не можете прочитать аккумулятор от рабочего . Это не работает, и я не вижу способа заставить это работать.

Я мог бы использовать то же решение, кроме как контролировать скорость, читая из драйвера. Я мог бы разделить набор данных на несколько небольших разделов, по 10 или 100 в каждом. Затем водитель может проверить скорость, прежде чем наметить каждый раздел. Однако я не знаю, как ввести оператор условного сна, выполняемый на стороне драйвера, в вызов .mapPartition().

Рабочий процесс выглядит примерно так (в Spark 1.6.3)

input.repartition(repartitionSetter(n))
  .select(...fields of interest...)
  .as[(... Tuple DataSet specification ...)]
  .mapPartitions(distributedApiCalls)
  .toDF().toDF( ... column name specification ...)

Условное выражение будет работать так:

while (tooManyCalls()) {
  logger.log("Throttling API rate in APPNAME")
  Thread.sleep(1000)
}

def tooManyCalls(): Boolean = {
  val now = Calendar.getInstance.getTimeinMillis
  val timeElapsed = (now - broadcastStartTime.value) / 1000
  (accumulator.value + n) > (timeElapsed * rateLimitPerSec) // true if going too fast
}

Здесь repartitionSetter делит набор данных на разделы размером n, а distributedAPICalls - это функция, передаваемая каждому разделу для доступа к API.

Есть ли способ включить условный оператор в рабочий процесс распределенного вызова API до MapPartion?

Мы находимся в процессе обновления до Spark 2.X, но эта функция должна появиться до этого обновления, поэтому идеальное решение будет работать как для Spark 1.6.3, так и для Spark 2.X

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...