Выполнение нескольких запросов в Spark структурированном потоке с водяными знаками и оконными агрегатами - PullRequest
0 голосов
/ 09 апреля 2020

Моя цель - прочитать данные из нескольких тем Kafka, собрать их и записать в hdfs. Я просмотрел список тем kafka для создания нескольких запросов. Код работает нормально при выполнении одного запроса, но выдает ошибку при выполнении нескольких запросов. Я сохранил каталоги контрольных точек для всех тем, поскольку я прочитал во многих постах, что это может вызвать похожую проблему.

Код выглядит следующим образом:

object CombinedDcAggStreaming {

  def main(args: Array[String]): Unit = {


    val jobConfigFile = "configPath"

    /* Read input configuration */
    val jobProps = Util.loadProperties(jobConfigFile).asScala

    val sparkConfigFile = jobProps.getOrElse("spark_config_file", throw new RuntimeException("Can't find spark property file"))
    val kafkaConfigFile = jobProps.getOrElse("kafka_config_file", throw new RuntimeException("Can't find kafka property file"))

    val sparkProps = Util.loadProperties(sparkConfigFile).asScala
    val kafkaProps = Util.loadProperties(kafkaConfigFile).asScala

    val topicList = Seq("topic_1", "topic_2")
    val avroSchemaFile = jobProps.getOrElse("schema_file", throw new RuntimeException("Can't find schema file..."))
    val checkpointLocation = jobProps.getOrElse("checkpoint_location", throw new RuntimeException("Can't find check point directory..."))
    val triggerInterval = jobProps.getOrElse("triggerInterval", throw new RuntimeException("Can't find trigger interval..."))
    val outputPath = jobProps.getOrElse("output_path", throw new RuntimeException("Can't find output directory..."))
    val outputFormat = jobProps.getOrElse("output_format", throw new RuntimeException("Can't find output format...")) //"parquet"
    val outputMode = jobProps.getOrElse("output_mode", throw new RuntimeException("Can't find output mode...")) //"append"
    val partitionByCols = jobProps.getOrElse("partition_by_columns", throw new RuntimeException("Can't find partition by columns...")).split(",").toSeq

    val spark = SparkSession.builder.appName("streaming").master("local[4]").getOrCreate()
    sparkProps.foreach(prop => spark.conf.set(prop._1, prop._2))

    topicList.foreach(
      topicId => {

        kafkaProps.update("subscribe", topicId)


        val schemaPath = avroSchemaFile + "/" + topicId + ".avsc"


        val dimensionMap = ConfigUtils.getDimensionMap(jobConfig)
        val measureMap = ConfigUtils.getMeasureMap(jobConfig)

        val source= Source.fromInputStream(Util.getInputStream(schemaPath)).getLines.mkString
        val schemaParser = new Schema.Parser
        val schema = schemaParser.parse(source)
        val sqlTypeSchema = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]

        val kafkaStreamData = spark
          .readStream
          .format("kafka")
          .options(kafkaProps)
          .load()

        val udfDeserialize = udf(deserialize(source), DataTypes.createStructType(sqlTypeSchema.fields))

        val transformedDeserializedData = kafkaStreamData.select("value").as(Encoders.BINARY)
          .withColumn("rows", udfDeserialize(col("value")))
          .select("rows.*")
          .withColumn("end_time", (col("end_time") / 1000).cast(LongType))
          .withColumn("timestamp", from_unixtime(col("end_time"),"yyyy-MM-dd HH").cast(TimestampType))
          .withColumn("year", from_unixtime(col("end_time"),"yyyy").cast(IntegerType))
          .withColumn("month", from_unixtime(col("end_time"),"MM").cast(IntegerType))
          .withColumn("day", from_unixtime(col("end_time"),"dd").cast(IntegerType))
          .withColumn("hour",from_unixtime(col("end_time"),"HH").cast(IntegerType))
          .withColumn("topic_id", lit(topicId))

        val groupBycols: Array[String] = dimensionMap.keys.toArray[String] ++ partitionByCols.toArray[String]
)

        val aggregatedData = AggregationUtils.aggregateDFWithWatermarking(transformedDeserializedData, groupBycols, "timestamp", "10 minutes", measureMap) //Watermarking time -> 10. minutes, window => window("timestamp", "5 minutes")

        val query = aggregatedData
          .writeStream
          .trigger(Trigger.ProcessingTime(triggerInterval))
          .outputMode("update")
          .format("console")
          .partitionBy(partitionByCols: _*)
          .option("path", outputPath)
          .option("checkpointLocation", checkpointLocation + "//" + topicId)
          .start()
      })

    spark.streams.awaitAnyTermination()

    def deserialize(source: String): Array[Byte] => Option[Row] = (data: Array[Byte]) => {
      try {
        val parser = new Schema.Parser
        val schema = parser.parse(source)
        val recordInjection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
        val record = recordInjection.invert(data).get
        val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
        record.getSchema.getFields.asScala.foreach(field => {
          val fieldVal = record.get(field.pos()) match {
            case x: org.apache.avro.util.Utf8 => x.toString
            case y: Any => y
            case _ => None
          }
          objectArray(field.pos()) = fieldVal
        })
        Some(Row(objectArray: _*))
      } catch {
        case ex: Exception => {
          log.info(s"Failed to parse schema with error: ${ex.printStackTrace()}")
          None
        }
      }
    }
  }
}

Я получаю следующая ошибка при выполнении задания:

java.lang.IllegalStateException: Race while writing batch 0

Но задание выполняется нормально, когда я выполняю один запрос вместо нескольких. Любые предложения о том, как решить эту проблему?

...