Первой мыслью было бы изменить конфигурацию планирования с 'FIFO' на 'FAIR':
spark.conf.set('spark.scheduler.mode', 'FAIR')
(при условии, что spark
является вашим объектом SparkSession).
Подробнее о настройка расписания здесь: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling -within-an-application
Я не думаю, что многопроцессорность будет иметь здесь смысл, так как она больше связана с планированием заданий на запуск (вычисление тяжелая работа, вероятно, выполняется компанией Spark). Другой идеей было бы потенциально использовать очередь с несколькими потоками:
def process_queue(queue, func, num_workers=None):
if not num_workers:
num_workers = 5
def process_elements(queue):
while True:
try:
item = queue.get(timeout=1)
func(item)
queue.task_done()
except Empty:
break
threads = [Thread(target=process_elements, args=(queue,)) for _ in range(num_workers)]
for t in threads:
t.start()
queue.join()
for t in threads:
t.join()
for i in x:
queue.put(i)
process_queue(queue, test)
Возможно, вы могли бы также сделать что-то с ThreadPoolExecutor в модуле concurrent.futures (https://docs.python.org/3/library/concurrent.futures.html):
with ThreadPoolExecutor(5) as pool:
pool.map(test, x) # maps the test function to all elements in x
Или даже:
with ThreadPoolExecutor(5) as pool:
[pool.submit(test, e) for e in x]
и использовать преимущества future
объектов, которые возвращает исполнитель. Поскольку я не очень хорошо знаком с требованиями приложения, я не уверен, насколько это будет полезно для вас, но я надеюсь, что выложил несколько потенциально полезных подходов с использованием многопоточности! Я лично использовал оба подхода в приложениях Spark и видел улучшения производительности.