Импорт заявлений, отнимающих время у искровых исполнителей (Pyspark Executors) - PullRequest
0 голосов
/ 09 октября 2018

Я разрабатываю скрипт прогнозирования Python с использованием потоковой передачи Spark (PySpark) и Keras.Предсказание происходит на исполнителе, где я вызываю model.predict ().

Импортированные мной модули:

from keras.layers.core import Dense, Activation, Dropout 
from keras.layers.recurrent import LSTM 
from keras.models import Sequential

Я проверил, и этот импорт занимает 2,5 секунды на Sparkдрайвер (2 cor + 2gb) для загрузки.Что меня удивляет, так это то, что каждый раз, когда исполнитель получает работу, он автоматически выполняет этот импорт снова.Причина, по которой я уверен, что этот импорт происходит каждый раз, когда задание отправляется исполнителю, заключается в том, что я вижу нижеприведенные инструкции в журналах исполнителя для задания, которые появляются только тогда, когда я выполняю импорт вышеуказанных модулей.

/opt/conda/lib/python3.6/site-packages/h5py/__init__.py:36: FutureWarning: Conversion of the second argument of issubdtype fromfloattonp.floatingis deprecated. In future, it will be treated asnp.float64 == np.dtype(float).type. from ._conv import register_converters as _register_converters Using TensorFlow backend.

Моя цель - сделать прогноз за 1 секунду, но сам импорт занимает 2,5 секунды (каждый раз, когда импорт выполняется на искровом исполнителе).Это предполагаемое поведение?Могу ли я что-нибудь сделать, чтобы минимизировать это время, скажем, в миллисекундах?

Обновление 1

В течение последних нескольких дней я анализировал его и обнаружил, что есть 2основные проблемы.

  1. Я нашел способ обернуть модель keras и протравить ее на драйвере и открепить на исполнителе.Это сокращает время на 1 сек.
  2. Но в драйвере, когда mapPartition () выполняется для каждого пакета, вся кераза и инициализация тензорного потока происходят каждый раз для исполнителя (для каждого задания), который занимает 2,5 секунды.Есть ли способ инициализировать этот импорт один раз для каждого исполнителя, а не для каждого задания.Может быть какой-нибудь файл в pyspark, куда я могу поместить эти импорты (при условии, что эти файлы выполнялись один раз, когда executor подходит)
...