Проблема UDF десериализации PySpark и Protobuf - PullRequest
0 голосов
/ 03 мая 2020

Я получаю эту ошибку

Can't pickle <class 'google.protobuf.pyext._message.CMessage'>: it's not found as google.protobuf.pyext._message.CMessage

при попытке создать UDF в PySpark. По-видимому, он использует CloudPickle для сериализации команды, однако, я знаю, что сообщения protobuf содержат C++ реализаций, что означает, что его нельзя засечь.

Я пытался найти способ возможно переопределить CloudPickleSerializer, однако я не смог найти способ.

Вот мой пример кода:

from MyProject.Proto import MyProtoMessage
from google.protobuf.json_format import MessageToJson
import pyspark.sql.functions as F

def proto_deserialize(body):
  msg = MyProtoMessage()
  msg.ParseFromString(body)
  return MessageToJson(msg)

from_proto = F.udf(lambda s: proto_deserialize(s))

base.withColumn("content", from_proto(F.col("body")))

Заранее спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...