Учитывая, что в кадре данных есть параметры для вызовов Http в доступный через Интернет API, я хочу распределить эти вызовы API по всем узлам кластера, выполнить вызовы и вернуть новый кадр данных с результатами вызовов. Это относительно просто, и у меня есть решение, которое работает.
Что не является простым, так это ограничение скорости выполнения вызовов API. Если, например, вы можете совершать только 100 вызовов API в секунду, я хочу каким-то образом убедиться, что мы не превышаем ограничение. Кроме того, при запуске на более крупном кластере я хочу каким-то образом убедиться, что этот распределенный вызывающий API не превратится в низкокачественную DDoS-атаку.
Одно работоспособное решение состоит в том, чтобы каждый поток спал, когда он получает сообщение об отказе от сервера (HTTP 429 - слишком много запросов). Однако на данный момент вы уже выполнили DDoS-атаку; Я бы хотел помедленнее, прежде чем мы доберемся до этой точки.
Я попытался реализовать это с помощью аккумулятора и широковещательной переменной. В каждом вызове реализован аккумулятор, а широковещательная переменная была временем начала. Затем каждый работник может разделить накопитель по времени, чтобы увидеть, была ли частота запросов слишком высокой. К сожалению, вы не можете прочитать аккумулятор от рабочего . Это не работает, и я не вижу способа заставить это работать.
Я мог бы использовать то же решение, кроме как контролировать скорость, читая из драйвера. Я мог бы разделить набор данных на несколько небольших разделов, по 10 или 100 в каждом. Затем водитель может проверить скорость, прежде чем наметить каждый раздел. Однако я не знаю, как ввести оператор условного сна, выполняемый на стороне драйвера, в вызов .mapPartition()
.
Рабочий процесс выглядит примерно так (в Spark 1.6.3)
input.repartition(repartitionSetter(n))
.select(...fields of interest...)
.as[(... Tuple DataSet specification ...)]
.mapPartitions(distributedApiCalls)
.toDF().toDF( ... column name specification ...)
Условное выражение будет работать так:
while (tooManyCalls()) {
logger.log("Throttling API rate in APPNAME")
Thread.sleep(1000)
}
def tooManyCalls(): Boolean = {
val now = Calendar.getInstance.getTimeinMillis
val timeElapsed = (now - broadcastStartTime.value) / 1000
(accumulator.value + n) > (timeElapsed * rateLimitPerSec) // true if going too fast
}
Здесь repartitionSetter
делит набор данных на разделы размером n
, а distributedAPICalls
- это функция, передаваемая каждому разделу для доступа к API.
Есть ли способ включить условный оператор в рабочий процесс распределенного вызова API до MapPartion?
Мы находимся в процессе обновления до Spark 2.X, но эта функция должна появиться до этого обновления, поэтому идеальное решение будет работать как для Spark 1.6.3, так и для Spark 2.X