Общая цель того, чего я пытаюсь достичь, - это отправка модели Keras каждому работнику искры, чтобы я мог использовать модель в UDF, примененную к столбцу DataFrame. Для этого модель Keras нужно будет отобрать.
Похоже, что многие люди добились успеха в травлении моделей керасами путем внесения изменений в класс Model, как показано по ссылке ниже:
http://zachmoshe.com/2017/04/03/pickling-keras-models.html
Однако я не видел ни одного примера того, как сделать это в тандеме со Spark. Моя первая попытка просто включила функцию make_keras_picklable()
в драйвере, которая позволяла мне выбирать модель в драйвере, но я не мог выбрать модель в UDF.
def make_keras_picklable():
"Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"
...
make_keras_picklable()
model = Sequential() # etc etc
def score(case):
....
score = model.predict(case)
...
def scoreUDF = udf(score, ArrayType(FloatType()))
Ошибка, которую я получаю, говорит о том, что при расчесывании модели в UDF не используется класс Model с залатанными обезьянами.
AttributeError: 'Sequential' object has no attribute '_built'
Похоже, что другой пользователь столкнулся с подобными ошибками в этом посте SO , и ответом было "запустить make_keras_picklable()
также на каждом работнике". Пример того, как это сделать, не приводится.
У меня такой вопрос: как правильно позвонить make_keras_picklable()
на всех работников?
Я пытался использовать broadcast()
(см. Ниже), но получил ту же ошибку, что и выше.
def make_keras_picklable():
"Source: https://zachmoshe.com/2017/04/03/pickling-keras-models.html"
...
make_keras_picklable()
spark.sparkContext.broadcast(make_keras_picklable())
model = Sequential() # etc etc
def score(case):
....
score = model.predict(case)
...
def scoreUDF = udf(score, ArrayType(FloatType()))