десериализация avro в pyspark с помощью сливной кафки с использованием AbasOSS / ABRis от ConfluentAvro - PullRequest
0 голосов
/ 28 сентября 2019

У меня есть приложение pyspark, которое должно десериализовать слитые сообщения kafka avro в структурированной потоковой передаче с искрой.

Пробовал эту опцию:

Интеграция потоковой передачи Spark со структурой реестра Confluent *

def expand_avro(spark_context, sql_context, data_frame, schema_registry_url, topic):
    j = spark_context._gateway.jvm
    dataframe_deserializer = j.za.co.absa.abris.avro.AvroSerDe.DataframeDeserializer(data_frame._jdf)
    naming_strategy = getattr(
        getattr(j.za.co.absa.abris.avro.read.confluent.SchemaManager,
                "SchemaStorageNamingStrategies$"), "MODULE$").TOPIC_NAME()
    conf = getattr(getattr(j.scala.collection.immutable.Map, "EmptyMap$"), "MODULE$")
    conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.url", schema_registry_url))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.topic", topic))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.id", "latest"))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.naming.strategy", naming_strategy))
    schema_path = j.scala.Option.apply(None)
    conf = j.scala.Option.apply(conf)
    policy = getattr(j.za.co.absa.abris.avro.schemas.policy.SchemaRetentionPolicies, "RETAIN_SELECTED_COLUMN_ONLY$")()
    data_frame = dataframe_deserializer.fromConfluentAvro("value", schema_path, conf, policy)
    data_frame = DataFrame(data_frame, sql_context)
    return data_frame

Приложение работает около получаса, изатем завершается с этим сообщением:

IllegalArgumentException: u'Неверная политика хранения схемы: RETAIN_SELECTED_COLUMN_ONLY '

spark 2.4.3, python 2.7, структурированный поток с readStream и writeStream.foreachBatch, который используетФункция десериализации, описанная выше, обрабатывает данные.

добавлено предложение try / exception, добавлена ​​повторная попытка с 10-секундным сном.Когда приложение перезапускается, оно работает нормально и принимает тему, по которой ранее не удавалось (снова работает нормально около 30 минут, а затем завершается с тем же сообщением).Ошибка не относится к теме, каждый раз происходит сбой на разных темах.Попытка проверки связи с URL-адресом реестра схемы во время сбоя, URL-адрес реестра схемы отвечает 200.

Озадаченный тем, что происходит - как он может получить политику хранения схемы в течение 30 минут (обработано несколько потоковых микропакетов), а затем пожаловаться на то, что он недействителен.

Также пытался использовать RETAIN_ORIGINAL_SCHEMA вместо RETAIN_SELECTED_COLUMN_ONLY, та же ошибка:

    IllegalArgumentException: u'Invalid Schema Retention Policy: RETAIN_ORIGINAL_SCHEMA'

    at py4j.Protocol.getReturnValue(Protocol.java:473)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
    at com.sun.proxy.$Proxy38.call(Unknown Source)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

Еще одна вещь, которую стоит отметить, - это то, что я читаю несколько тем, и так как яЯ использую RETAIN_SELECTED_COLUMN_ONLY, результирующая схема будет отличаться для каждой темы.Кроме того, можно иметь несколько версий схем для каждой темы.

...