Структурированный поток: чтение из нескольких тем Kafka одновременно - PullRequest
2 голосов
/ 13 апреля 2020

У меня есть Spark Structured Streaming Application, который должен читать сразу из 12 тем Kafka (различные схемы, формат Avro), десериализовать данные и хранить в HDFS. Когда я читаю из одной топи c, используя мой код, он работает нормально и без ошибок, но при одновременном выполнении нескольких запросов я получаю следующую ошибку

java.lang.IllegalStateException: Race while writing batch 0

Мой код выглядит следующим образом:

def main(args: Array[String]): Unit = {


  val kafkaProps = Util.loadProperties(kafkaConfigFile).asScala
  val topic_list = ("topic1", "topic2", "topic3", "topic4")

  topic_list.foreach(x => {
kafkaProps.update("subscribe", x)

val source= Source.fromInputStream(Util.getInputStream("/schema/topics/" + x)).getLines.mkString
val schemaParser = new Schema.Parser
val schema = schemaParser.parse(source)
val sqlTypeSchema = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]

val kafkaStreamData = spark
  .readStream
  .format("kafka")
  .options(kafkaProps)
  .load()

val udfDeserialize = udf(deserialize(source), DataTypes.createStructType(sqlTypeSchema.fields))

val transformedDeserializedData = kafkaStreamData.select("value").as(Encoders.BINARY)
  .withColumn("rows", udfDeserialize(col("value")))
  .select("rows.*")

val query = transformedDeserializedData
  .writeStream
  .trigger(Trigger.ProcessingTime("5 seconds"))
  .outputMode("append")
  .format("parquet")
  .option("path", "/output/topics/" + x)
  .option("checkpointLocation", checkpointLocation + "//" + x)
  .start()  
})  
spark.streams.awaitAnyTermination()  
 }

1 Ответ

0 голосов
/ 13 апреля 2020

Alternative. Вы можете использовать KAFKA Connect (из Confluent), NIFI, StreamSets и др. c. как ваш вариант использования, кажется, подходит "dump / persist to HDFS". Тем не менее, вы должны иметь эти инструменты (установлены). Проблема с небольшими файлами, которую вы заявляете, не является проблемой, так что будьте.

Начиная с Apache Kafka 0.9 или более поздней версии, вы можете использовать Kafka Connect API для KAFKA -> HDFS Sink (различные поддерживаемые форматы HDFS). Тем не менее, вам нужен KAFKA Connect Cluster, но в любом случае он основан на вашем существующем кластере, так что это не имеет большого значения. Но кто-то должен поддерживать.

Некоторые ссылки, чтобы помочь вам в вашем пути:

...