Pyspark UDF Pickling error, не могу выбрать объекты SwigPyObject - PullRequest
0 голосов
/ 08 октября 2019

Я пытаюсь применить функции udf к столбцу данных, который состоит из строк. Функция использует Tensorflow GUSE и преобразует строку в массив с плавающей точкой.

import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import tf_sentencepiece
# Graph set up.
g = tf.Graph()
with g.as_default():
  text_input = tf.placeholder(dtype=tf.string, shape=[None])
  embed = hub.Module("https://tfhub.dev/google/universal-sentence-encoder-multilingual-large/1")
  embedded_text = embed(text_input)
  init_op = tf.group([tf.global_variables_initializer(), tf.tables_initializer()])
g.finalize()

# Initialize session.
session = tf.Session(graph=g)
session.run(init_op)

def embed_mail(x): 
    embedding = session.run(embedded_text, feed_dict={text_input:[x]})
    embedding = flatten(embedding)
    result = [np.float32(i).item() for i in embedding]
    return result

Но всякий раз, когда я пытаюсь запустить эту функцию с:

embed_mail_udf = udf(embed_mail, ArrayType(FloatType()))
df = df.withColumn('embedding',embed_mail_udf(df.text))

Я получаю сообщение об ошибке: Не удалось сериализоватьobject: TypeError: невозможно выбрать объекты SwigPyObject. Что я делаю не так?

1 Ответ

1 голос
/ 08 октября 2019

Для запуска кода вашей UDF в кластере Spark необходимо иметь возможность сериализации всех данных, «привязанных» к этой функции. Ваш UDF embed_mail содержит ссылку на сеанс TF, поэтому функция имеет значение closure, и Spark сначала должен сериализовать содержимое tf.Session. Могу поспорить, что это является причиной проблемы. К сожалению, у меня нет опыта работы с TF, но кажется, что вы можете получить свои почтовые данные от TF до запуска Spark, транслировать их и затем использовать в своем udf?

...