Я разрабатываю скрипт прогнозирования Python с использованием потоковой передачи Spark (PySpark) и Keras.Предсказание происходит на исполнителе, где я вызываю model.predict ().
Импортированные мной модули:
from keras.layers.core import Dense, Activation, Dropout
from keras.layers.recurrent import LSTM
from keras.models import Sequential
Я проверил, и этот импорт занимает 2,5 секунды на Sparkдрайвер (2 cor + 2gb) для загрузки.Что меня удивляет, так это то, что каждый раз, когда исполнитель получает работу, он автоматически выполняет этот импорт снова.Причина, по которой я уверен, что этот импорт происходит каждый раз, когда задание отправляется исполнителю, заключается в том, что я вижу нижеприведенные инструкции в журналах исполнителя для задания, которые появляются только тогда, когда я выполняю импорт вышеуказанных модулей.
/opt/conda/lib/python3.6/site-packages/h5py/__init__.py:36: FutureWarning: Conversion of the second argument of issubdtype fromfloattonp.floatingis deprecated. In future, it will be treated asnp.float64 == np.dtype(float).type. from ._conv import register_converters as _register_converters Using TensorFlow backend.
Моя цель - сделать прогноз за 1 секунду, но сам импорт занимает 2,5 секунды (каждый раз, когда импорт выполняется на искровом исполнителе).Это предполагаемое поведение?Могу ли я что-нибудь сделать, чтобы минимизировать это время, скажем, в миллисекундах?
Обновление 1
В течение последних нескольких дней я анализировал его и обнаружил, что есть 2основные проблемы.
- Я нашел способ обернуть модель keras и протравить ее на драйвере и открепить на исполнителе.Это сокращает время на 1 сек.
- Но в драйвере, когда mapPartition () выполняется для каждого пакета, вся кераза и инициализация тензорного потока происходят каждый раз для исполнителя (для каждого задания), который занимает 2,5 секунды.Есть ли способ инициализировать этот импорт один раз для каждого исполнителя, а не для каждого задания.Может быть какой-нибудь файл в pyspark, куда я могу поместить эти импорты (при условии, что эти файлы выполнялись один раз, когда executor подходит)