Scala: Ошибка чтения сообщений Kafka Avro из структурированной потоковой передачи - PullRequest
0 голосов
/ 29 февраля 2020

Я пытался прочитать авро-сериализованные сообщения Кафки из структурированной потоковой передачи (2.4.4) с Scala 2.11. Для этого я использовал spark-avro (зависимость ниже). Я генерирую сообщения kafka из python, используя библиотеку confluent-kafka. Spark Streaming может использовать сообщения со схемой, но не может правильно прочитать значения полей. Я подготовил простой пример, чтобы показать проблему, код доступен здесь: https://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala

Я создаю записи в python, схема записей:

{
    "type": "record",
    "namespace": "example",
    "name": "RawRecord",
    "fields": [
        {"name": "int_field","type": "int"},
        {"name": "string_field","type": "string"}
    ]
}

И они генерируются так:

from time import sleep
from confluent_kafka.avro import AvroProducer, load, loads

def generate_records():
    avro_producer_settings = {
        'bootstrap.servers': "localhost:19092",
        'group.id': 'groupid',
        'schema.registry.url': "http://127.0.0.1:8081"
    }
    producer = AvroProducer(avro_producer_settings)
    key_schema = loads('"string"')
    value_schema = load("schema.avsc")
    i = 1
    while True:
        row = {"int_field": int(i), "string_field": str(i)}
        producer.produce(topic="avro_topic", key="key-{}".format(i), 
                         value=row, key_schema=key_schema, value_schema=value_schema)
        print(row)
        sleep(1)
        i+=1

Потребление от потоковой структурированной искры (в Scala) выполняется следующим образом:

import org.apache.spark.sql.{ Dataset, Row}
import org.apache.spark.sql.streaming.{ OutputMode, StreamingQuery}
import org.apache.spark.sql.avro._
...
        try {

            log.info("----- reading schema")
            val jsonFormatSchema = new String(Files.readAllBytes(
                                                    Paths.get("./src/main/resources/schema.avsc")))

            val ds:Dataset[Row] = sparkSession
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", kafkaServers)
                .option("subscribe", topic)
                .load()

            val output:Dataset[Row] = ds
                .select(from_avro(ds.col("value"), jsonFormatSchema) as "record")
                .select("record.*")

            output.printSchema()

            var query: StreamingQuery = output.writeStream.format("console")
                .option("truncate", "false").outputMode(OutputMode.Append()).start();


            query.awaitTermination();

        } catch {
            case e: Exception => log.error("onApplicationEvent error: ", e)
            //case _: Throwable => log.error("onApplicationEvent error:")
        }
...

Печать схема в искре, странно, что поля обнуляются, хотя схема avro не позволяет этого. Spark показывает это:

root
 |-- int_field: integer (nullable = true)
 |-- string_field: string (nullable = true)

Я проверил сообщения с другим получателем в python, и сообщения в порядке, но независимо от содержимого сообщения, искра показывает это.

+---------+------------+
|int_field|string_field|
+---------+------------+
|0        |            |
+---------+------------+

Основные используемые зависимости:

<properties>
    <spark.version>2.4.4</spark.version>
    <scala.version>2.11</scala.version>
</properties>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-avro_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>

Кто-нибудь знает, почему это может происходить?

Заранее спасибо. Код для воспроизведения ошибки здесь:

https://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala

РЕШЕНИЕ

Проблема заключалась в том, что я использовал библиотеку confluent_kafka в python и я читал авро-сообщения в структурированной потоковой передаче с использованием библиотеки spark-avro.

В библиотеке Confluent_kafka используется авро-формат confluent, а при автовом чтении avro используется стандартный формат avro.

Разница является то, что для использования реестра схемы, Confluent Avro добавляет в сообщение четыре байта, которые указывают, какую схему следует использовать.

Источник: https://www.confluent.io/blog/kafka-connect-tutorial-transfer-avro-schemas-across-schema-registry-clusters/

Для того, чтобы быть в состоянии использовать конфлюэнтное avro и читать его из потоковой структурированной искры, я заменил библиотеку spark-avro на Abris (abris позволяет интегрировать avro и confluent avro с искрой). https://github.com/AbsaOSS/ABRiS

1 Ответ

0 голосов
/ 05 марта 2020

SOLUTION

Проблема заключалась в том, что я использовал библиотеку confluent_kafka в python и читал сообщения avro в структурированной потоковой передаче с использованием библиотеки spark-avro.

Библиотека confluent_kafka использует конфронтный формат avro и запускает авро-чтение с использованием стандартного формата avro.

Разница заключается в том, что для использования реестра схемы, confluent avro добавляет в сообщение четыре байта, которые указывают, какую схему следует использовать.

Источник: https://www.confluent.io/blog/kafka-connect-tutorial-transfer-avro-schemas-across-schema-registry-clusters/

Для возможности использовать конфлуентное avro и читать его из потоковой структурированной искры я заменил библиотеку spark-avro для Abris (abris позволяет интегрировать avro и сливное авро с искрой). https://github.com/AbsaOSS/ABRiS

Мои зависимости изменились следующим образом:

<properties>
        <spark.version>2.4.4</spark.version>
        <scala.version>2.11</scala.version>
</properties>
<!-- SPARK- AVRO -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-avro_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<!-- SPARK -AVRO AND CONFLUENT-AVRO -->
<dependency>
    <groupId>za.co.absa</groupId>
    <artifactId>abris_2.11</artifactId>
    <version>3.1.1</version>
</dependency>

И здесь вы можете увидеть простой пример, который получает сообщение и десериализует его значения как авро и сливной авро.

var input: Dataset[Row] = sparkSession.readStream
    //.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
    .format("kafka")
    .option("kafka.bootstrap.servers", kafkaServers)
    .option("subscribe", topicConsumer)
    .option("failOnDataLoss", "false")
    // .option("startingOffsets", "latest")
    // .option("startingOffsets", "earliest")
    .load();


// READ WITH spark-avro library (standard avro)

val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./src/main/resources/schema.avsc")))

var inputAvroDeserialized: Dataset[Row] = input
    .select(from_avro(functions.col("value"), jsonFormatSchema) as "record")
    .select("record.*")

//READ WITH Abris library (confuent avro) 

val schemaRegistryConfig = Map(
    SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
    SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> topicConsumer,
    SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME, // choose a subject name strategy
    SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest" // set to "latest" if you want the latest schema version to used
)

var inputConfluentAvroDeserialized: Dataset[Row] = inputConfluentAvroSerialized
    .select(from_confluent_avro(functions.col("value"), schemaRegistryConfig) as "record")
    .select("record.*")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...