Python Spark-Streaming Kafka Ошибка: исключение: Попытка сопоставить ключ 'name' со значением <объект avro.schema.Field в 0x1112bbb70> - PullRequest
0 голосов
/ 19 мая 2019

Я пытаюсь декодировать данные в теме Kafka, которая закодирована в avro с помощью Spark Steaming (PySpark) и получаю следующую ошибку:

2019-05-19 14:00:46 ERROR PythonRunner:91 - Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/robert.dempsey/Applications/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 238, in main
    eval_type = read_int(infile)
  File "/Users/robert.dempsey/Applications/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 692, in read_int
    raise EOFError
EOFError
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:332)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:471)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:454)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:286)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:223)
    at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:440)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:249)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1992)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:172)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/robert.dempsey/Applications/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 240, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/robert.dempsey/Applications/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 60, in read_command
    command = serializer._read_with_length(file)
  File "/Users/robert.dempsey/Applications/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 171, in _read_with_length
    return self.loads(obj)
  File "/Users/robert.dempsey/Applications/spark-2.3.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 566, in loads
    return pickle.loads(obj, encoding=encoding)
  File "/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/avro/schema.py", line 173, in __setitem__
    % (key, value, self))
Exception: Attempting to map key 'name' to value <avro.schema.Field object at 0x1112bbb70> in ImmutableDict {}

Все решения, которые я нашел до сих пор, говорятоб использовании библиотеки Python от Confluent, для которой требуется реестр схемы, чего у меня нет и не будет в моей среде.С учетом сказанного, я сделал это, используя библиотеку, и она не работала, потому что сообщения не кодируются с использованием библиотеки.

Кроме того, этот же код работает вне контекста Spark, и я могу декодироватьсообщения, так что я не уверен, является ли декодирование проблемой или что-то с Spark.

Мой код Python:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import avro.schema
from avro.io import BinaryDecoder, DatumReader
import os

cwd = os.getcwd()
schema_path = os.path.join(cwd, 'avro/schemas/user.avsc')
schema = avro.schema.Parse(open(schema_path).read())

def decoder(msg):
    bytes_reader = io.BytesIO(msg)
    decoder = BinaryDecoder(bytes_reader)
    reader = DatumReader(schema)
    user = reader.read(decoder)
    return user

sc = SparkContext(appName="SparkStreamingTest")
ssc = StreamingContext(sc, 5)

broker = "localhost:9092"

kafka_params = {
    "bootstrap.servers": broker,
    "auto.offset.reset": "smallest",
    "group.id": "test.group"
}

kafka_stream = KafkaUtils.createDirectStream(ssc,
                                             topics=['test.avro'],
                                             kafkaParams=kafka_params,
                                             valueDecoder=decoder
)

messages = kafka_stream.map(lambda x: x[1])
messages.pprint()

ssc.start()
ssc.awaitTermination()

Я вызываю этот файл Python через скрипт оболочки:

#!/usr/bin/env bash

spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 ./avro/avro_stream.py

Моя схема avro, содержащаяся в файле:

{
    "namespace": "example.avro",
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "favorite_number",  "type": ["int", "null"]},
        {"name": "favorite_color", "type": ["string", "null"]}
    ]
}

Моя цель здесь заключается в том, чтобы иметь возможность декодировать сообщения в кодировке avro, чтобы я мог затем преобразовать их в JSON и выполнитьпреобразования данных.

Я ценю вашу помощь!

...