Я бы хотел использовать модель tenorflow.keras в pysark pandas_udf. Тем не менее, я получаю ошибку рассола при сериализации модели перед отправкой рабочим. Я не уверен, что я использую лучший метод для выполнения того, что я хочу, поэтому я приведу минимальный, но полный пример. ошибка срабатывает также для всех предыдущих версий)
pyspark-2.4.5
Операторы импорта:
import pandas as pd
import numpy as np
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from pyspark.sql import SparkSession, functions as F, types as T
UDF Pyspark - это pandas_udf:
def compute_output_pandas_udf(model):
'''Spark pandas udf for model prediction.'''
@F.pandas_udf(T.DoubleType(), F.PandasUDFType.SCALAR)
def compute_output(inputs1, inputs2, inputs3):
pdf = pd.DataFrame({
'input1': inputs1,
'input2': inputs2,
'input3': inputs3
})
pdf['predicted_output'] = model.predict(pdf.values)
return pdf['predicted_output']
return compute_output
Основной код:
# Model parameters
weights = np.array([[0.5], [0.4], [0.3]])
bias = np.array([1.25])
activation = 'linear'
input_dim, output_dim = weights.shape
# Initialize model
model = Sequential()
layer = Dense(output_dim, input_dim=input_dim, activation=activation)
model.add(layer)
layer.set_weights([weights, bias])
# Initialize Spark session
spark = SparkSession.builder.appName('test').getOrCreate()
# Create pandas df with inputs and run model
pdf = pd.DataFrame({
'input1': np.random.randn(200),
'input2': np.random.randn(200),
'input3': np.random.randn(200)
})
pdf['predicted_output'] = model.predict(pdf[['input1', 'input2', 'input3']].values)
# Create spark df with inputs and run model using udf
sdf = spark.createDataFrame(pdf)
sdf = sdf.withColumn('predicted_output', compute_output_pandas_udf(model)('input1', 'input2', 'input3'))
sdf.limit(5).show()
Эта ошибка срабатывает, когда compute_output_pandas_udf (модель) вызывается:
PicklingError: Could not serialize object: TypeError: can't pickle _thread.RLock objects
I нашел эту страницу о выборе модели keras и попробовал ее на tenorflow.keras, но я получил следующую ошибку при вызове функции вести модели в UDF (поэтому сериализация работала, но десериализации нет?):
AttributeError: 'Sequential' object has no attribute '_distribution_strategy'
У кого-нибудь есть идеи о том, как поступить? Заранее спасибо!
PS: Обратите внимание, что я не использовал модель напрямую из библиотеки keras, потому что периодически появляется другая ошибка, и кажется, что ее труднее решить. Однако сериализация модели не приводит к ошибке, как в случае с моделью tenorflow.keras.