Как пакетировать элементы из PySpark DataFrame - PullRequest
0 голосов
/ 04 мая 2019

У меня есть фрейм данных PySpark, и для каждой (партии) записи я хочу вызвать API.В общем, скажем, у меня есть 100000k записей, я хочу объединить элементы в группы, скажем, 1000 и вызвать API.Как я могу сделать это с PySpark?Причина пакетирования заключается в том, что API, вероятно, не примет огромный кусок данных из системы больших данных.

Сначала я подумал о LIMIT, но это не будет "детерминистическим".Кроме того, кажется, что это будет неэффективно?

Ответы [ 2 ]

1 голос
/ 04 мая 2019
df.foreachPartition { ele =>
   ele.grouped(1000).foreach { chunk =>
   postToServer(chunk)
}

Код в Scala, вы можете проверить это в Python.Будет создано 1000 партий.

1 голос
/ 04 мая 2019

Использование foreachPartition, а затем что-то вроде , как разделить итерируемое на куски постоянного размера для пакетирования итерируемых групп по 1000, возможно, является наиболее эффективным способом сделать это с точки зрения ресурса Spark. использование.

def handle_iterator(it):
    # batch the iterable and call API
    pass
df.foreachPartition(handle_iterator)

Примечание: это будет делать параллельные вызовы API от исполнителей и может не быть практическим способом, если, например, ограничение скорости является проблемой.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...