Пустой столбец при десериализации avro из apache kafka с pyspark - PullRequest
0 голосов
/ 25 июня 2019

Я делаю проверку концепции с ноутбуками Kafka, Spark и Jupyter, и у меня странная проблема.Я пытаюсь прочитать записи Avro от Кафки до pyspark.Я использую реестр слитых схем, чтобы получить схему для десериализации сообщений avro.После десериализации сообщений avro в кадре данных искры результирующий столбец будет пустым, без каких-либо ошибок.Столбец должен содержать данные, потому что при приведении к строке некоторые из полей avro доступны для чтения.

Я также пытался сделать это на оболочке spark в Scala (без jupyter). Я пробовал использовать как основанный на докереspark, а также отдельная установка spark

Я следовал этой теме SO, чтобы получить функции from_avro и to_avro: Pyspark 2.4.0, чтение avro из kafka с потоком чтения - Python

jars = ["kafka-clients-2.0.0.jar", "spark-avro_2.11-2.4.3.jar", "spark-        
sql-kafka-0-10_2.11-2.4.3.jar"]
jar_paths = ",".join(["/home/jovyan/work/jars/{}".format(jar) for jar in 
jars])

conf = SparkConf()
conf.set("spark.jars", jar_paths)

spark_session = SparkSession \
    .builder \
    .config(conf=conf)\
    .appName("TestStream") \
    .getOrCreate()

def from_avro(col, jsonFormatSchema): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
    return Column(f(_to_java_column(col), jsonFormatSchema)) 


def to_avro(col): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
    return Column(f(_to_java_column(col))) 

schema_registry_url = "http://schema-registry.org"
transaction_schema_name = "Transaction"

transaction_schema = requests.get(" 
{}/subjects/{}/versions/latest/schema".format(schema_registry_url, 
transaction_schema_name)).text


raw_df = spark_session.read.format("kafka") \
# SNIP
    .option("subscribe", "transaction") \
    .option("startingOffsets", "earliest").load()
raw_df = raw_df.limit(1000).cache()

extract_df = raw_df.select(
    raw_df["key"].cast("String"),
    from_avro(raw_df["value"], transaction_schema).alias("value")
)

# This shows data and fields
raw_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(3, truncate=False)

extract_df.show()

Содержимое столбца значения пусто.Я ожидаю либо ошибку, так как декодирование не удалось, либо данные будут там.Кто-нибудь знает, что может вызвать это, или как его отладить?

+---+-----+
|key|value|
+---+-----+
|...| [[]]|
|...| [[]]|
|...| [[]]|
|...| [[]]|
...