Параллелизм в Писпарке - PullRequest
0 голосов
/ 07 апреля 2020

У меня есть программа Pyspark, в которой у меня есть функция, которая принимает строку в качестве параметра. Сама строка содержится в массиве строк. По сути, я перебираю строковый массив и вызываю функцию из l oop. При этом я добавляю результат String, возвращаемый функцией, в другой массив String.

Теперь в моей функции есть ряд операторов if - else, которые проверяют аргумент и выбирают l oop выполнить. Все эти блоки if являются независимыми кодами и имеют общий глобальный кэшированный фрейм данных и глобальный сеанс искры.

Я хочу, чтобы вызов функции выполнялся одновременно, а не в режиме FIFO, который происходит сейчас. Какой вариант лучше в Python для этого?

  • Многопоточность?
  • Многопроцессорность?
  • AsyncIO?

Было бы полезно, если можно предоставить пример кода!

Мой пример псевдокода:

global spark_session
global cached_dataframe

x = ["A","B","C"]
y=[]

function test(z):
   if z=="A":
      -------SOME CODE FOR "A" -------
   elif z=="B":
      -------SOME CODE FOR "B" -------
   elif z=="C":
      -------SOME CODE FOR "C" -------

for i in x:
   y.append(test(i))

Если параллелизм здесь невозможен, можете ли вы предложить лучший способ организовать мой код? Например, избегать циклов if else и так далее. Потому что в моем текущем требовании этот блок if else будет go бесконечным!

1 Ответ

1 голос
/ 08 апреля 2020

Первой мыслью было бы изменить конфигурацию планирования с 'FIFO' на 'FAIR':

spark.conf.set('spark.scheduler.mode', 'FAIR')

(при условии, что spark является вашим объектом SparkSession).

Подробнее о настройка расписания здесь: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling -within-an-application

Я не думаю, что многопроцессорность будет иметь здесь смысл, так как она больше связана с планированием заданий на запуск (вычисление тяжелая работа, вероятно, выполняется компанией Spark). Другой идеей было бы потенциально использовать очередь с несколькими потоками:

def process_queue(queue, func, num_workers=None):
    if not num_workers:
        num_workers = 5

    def process_elements(queue):
        while True:
            try:
                item = queue.get(timeout=1)
                func(item)
                queue.task_done()
            except Empty:
                break

    threads = [Thread(target=process_elements, args=(queue,)) for _ in range(num_workers)]
    for t in threads:
        t.start()
    queue.join()
    for t in threads:
        t.join()
for i in x:
   queue.put(i)
process_queue(queue, test)

Возможно, вы могли бы также сделать что-то с ThreadPoolExecutor в модуле concurrent.futures (https://docs.python.org/3/library/concurrent.futures.html):

with ThreadPoolExecutor(5) as pool:
    pool.map(test, x) # maps the test function to all elements in x

Или даже:

with ThreadPoolExecutor(5) as pool:
    [pool.submit(test, e) for e in x] 

и использовать преимущества future объектов, которые возвращает исполнитель. Поскольку я не очень хорошо знаком с требованиями приложения, я не уверен, насколько это будет полезно для вас, но я надеюсь, что выложил несколько потенциально полезных подходов с использованием многопоточности! Я лично использовал оба подхода в приложениях Spark и видел улучшения производительности.

...