Я пытался прочитать авро-сериализованные сообщения Кафки из структурированной потоковой передачи (2.4.4) с Scala 2.11. Для этого я использовал spark-avro (зависимость ниже). Я генерирую сообщения kafka из python, используя библиотеку confluent-kafka. Spark Streaming может использовать сообщения со схемой, но не может правильно прочитать значения полей. Я подготовил простой пример, чтобы показать проблему, код доступен здесь: https://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala
Я создаю записи в python, схема записей:
{
"type": "record",
"namespace": "example",
"name": "RawRecord",
"fields": [
{"name": "int_field","type": "int"},
{"name": "string_field","type": "string"}
]
}
И они генерируются так:
from time import sleep
from confluent_kafka.avro import AvroProducer, load, loads
def generate_records():
avro_producer_settings = {
'bootstrap.servers': "localhost:19092",
'group.id': 'groupid',
'schema.registry.url': "http://127.0.0.1:8081"
}
producer = AvroProducer(avro_producer_settings)
key_schema = loads('"string"')
value_schema = load("schema.avsc")
i = 1
while True:
row = {"int_field": int(i), "string_field": str(i)}
producer.produce(topic="avro_topic", key="key-{}".format(i),
value=row, key_schema=key_schema, value_schema=value_schema)
print(row)
sleep(1)
i+=1
Потребление от потоковой структурированной искры (в Scala) выполняется следующим образом:
import org.apache.spark.sql.{ Dataset, Row}
import org.apache.spark.sql.streaming.{ OutputMode, StreamingQuery}
import org.apache.spark.sql.avro._
...
try {
log.info("----- reading schema")
val jsonFormatSchema = new String(Files.readAllBytes(
Paths.get("./src/main/resources/schema.avsc")))
val ds:Dataset[Row] = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", topic)
.load()
val output:Dataset[Row] = ds
.select(from_avro(ds.col("value"), jsonFormatSchema) as "record")
.select("record.*")
output.printSchema()
var query: StreamingQuery = output.writeStream.format("console")
.option("truncate", "false").outputMode(OutputMode.Append()).start();
query.awaitTermination();
} catch {
case e: Exception => log.error("onApplicationEvent error: ", e)
//case _: Throwable => log.error("onApplicationEvent error:")
}
...
Печать схема в искре, странно, что поля обнуляются, хотя схема avro не позволяет этого. Spark показывает это:
root
|-- int_field: integer (nullable = true)
|-- string_field: string (nullable = true)
Я проверил сообщения с другим получателем в python, и сообщения в порядке, но независимо от содержимого сообщения, искра показывает это.
+---------+------------+
|int_field|string_field|
+---------+------------+
|0 | |
+---------+------------+
Основные используемые зависимости:
<properties>
<spark.version>2.4.4</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
Кто-нибудь знает, почему это может происходить?
Заранее спасибо. Код для воспроизведения ошибки здесь:
https://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala
РЕШЕНИЕ
Проблема заключалась в том, что я использовал библиотеку confluent_kafka в python и я читал авро-сообщения в структурированной потоковой передаче с использованием библиотеки spark-avro.
В библиотеке Confluent_kafka используется авро-формат confluent, а при автовом чтении avro используется стандартный формат avro.
Разница является то, что для использования реестра схемы, Confluent Avro добавляет в сообщение четыре байта, которые указывают, какую схему следует использовать.
Источник: https://www.confluent.io/blog/kafka-connect-tutorial-transfer-avro-schemas-across-schema-registry-clusters/
Для того, чтобы быть в состоянии использовать конфлюэнтное avro и читать его из потоковой структурированной искры, я заменил библиотеку spark-avro на Abris (abris позволяет интегрировать avro и confluent avro с искрой). https://github.com/AbsaOSS/ABRiS