Я пытаюсь декодировать данные в теме Kafka, которая закодирована в avro с помощью Spark Steaming (PySpark) и получаю следующую ошибку:
2019-05-19 14:00:46 ERROR PythonRunner:91 - Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/robert.dempsey/Applications/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 238, in main
eval_type = read_int(infile)
File "/Users/robert.dempsey/Applications/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 692, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:332)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:471)
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:454)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:286)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:223)
at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:440)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:249)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1992)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:172)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/robert.dempsey/Applications/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 240, in main
func, profiler, deserializer, serializer = read_command(pickleSer, infile)
File "/Users/robert.dempsey/Applications/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 60, in read_command
command = serializer._read_with_length(file)
File "/Users/robert.dempsey/Applications/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 171, in _read_with_length
return self.loads(obj)
File "/Users/robert.dempsey/Applications/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 566, in loads
return pickle.loads(obj, encoding=encoding)
File "/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/avro/schema.py", line 173, in __setitem__
% (key, value, self))
Exception: Attempting to map key 'name' to value <avro.schema.Field object at 0x1112bbb70> in ImmutableDict {}
Все решения, которые я нашел до сих пор, говорятоб использовании библиотеки Python от Confluent, для которой требуется реестр схемы, чего у меня нет и не будет в моей среде.С учетом сказанного, я сделал это, используя библиотеку, и она не работала, потому что сообщения не кодируются с использованием библиотеки.
Кроме того, этот же код работает вне контекста Spark, и я могу декодироватьсообщения, так что я не уверен, является ли декодирование проблемой или что-то с Spark.
Мой код Python:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import avro.schema
from avro.io import BinaryDecoder, DatumReader
import os
cwd = os.getcwd()
schema_path = os.path.join(cwd, 'avro/schemas/user.avsc')
schema = avro.schema.Parse(open(schema_path).read())
def decoder(msg):
bytes_reader = io.BytesIO(msg)
decoder = BinaryDecoder(bytes_reader)
reader = DatumReader(schema)
user = reader.read(decoder)
return user
sc = SparkContext(appName="SparkStreamingTest")
ssc = StreamingContext(sc, 5)
broker = "localhost:9092"
kafka_params = {
"bootstrap.servers": broker,
"auto.offset.reset": "smallest",
"group.id": "test.group"
}
kafka_stream = KafkaUtils.createDirectStream(ssc,
topics=['test.avro'],
kafkaParams=kafka_params,
valueDecoder=decoder
)
messages = kafka_stream.map(lambda x: x[1])
messages.pprint()
ssc.start()
ssc.awaitTermination()
Я вызываю этот файл Python через скрипт оболочки:
#!/usr/bin/env bash
spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 ./avro/avro_stream.py
Моя схема avro, содержащаяся в файле:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
Моя цель здесь заключается в том, чтобы иметь возможность декодировать сообщения в кодировке avro, чтобы я мог затем преобразовать их в JSON и выполнитьпреобразования данных.
Я ценю вашу помощь!