Структурированная потоковая передача с mapGroupState, вызывающая проблемы с ГХ и производительностью - PullRequest
0 голосов
/ 27 сентября 2018

В нашем приложении мы используем структурированную потоковую передачу с MapGroupWithState в сочетании с чтением из Kafka.

После запуска приложения во время начальных пакетов производительность хорошая, если я вижу, что kafka lastProgress почти 65K в секунду,После нескольких пакетов производительность полностью снижается примерно до 2000 в секунду.

В функции MapGroupWithState происходит обновление и сравнение со значением из хранилища состояний (фрагмент кода представлен ниже).

Числосмещений от Kafka - 100000

После запуска приложения во время начальных пакетов производительность хорошая, если я вижу, что kafka lastProgress почти 65K в секунду.После нескольких пакетов производительность полностью снижается примерно до 2000 в секунду.

Если мы видим дамп потока от одного из исполнителей, то нет никаких подозрений, кроме заблокированных потоков из пользовательского интерфейса spark

Thread Count

Thread Block

GC Статистика от одного из исполнителей, как показано ниже, кажется

Не вижу большой разницыпосле GC

Before GC

After GC

Фрагмент кода

case class MonitoringEvent(InternalID: String, monStartTimestamp: Timestamp, EndTimestamp: Timestamp, Stream: String, ParentID: Option[String])


val df = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", Config.uatKafkaUrl)
      .option("subscribe", Config.interBranchInputTopic)
      .option("startingOffsets", "earliest")
      .option("failOnDataLoss", "true")
      .option("maxOffsetsPerTrigger", "100000")
      .option("request.required.acks", "all")
      .load()
      .selectExpr("CAST(value AS STRING)")    


        val me: Dataset[MonitoringEvent] = df.select(from_json($"value", schema).as("data")).select($"data.*").as[MonitoringEvent]

        val IB =   me.groupByKey(x => (x.ParentID.getOrElse(x.InternalID)))
        .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(IBTransformer.mappingFunctionIB _)
        .flatMap(x => x)


    val IBStream = IB
      .select(to_json(struct($"*")).as("value"), $"InternalID".as("key"))
      .writeStream
      .format("kafka")
      .queryName("InterBranch_Events_KafkaWriter")
      .option("kafka.bootstrap.servers", Config.uatKafkaUrl)
      .option("topic", Config.interBranchTopicComplete)
      .option("checkpointLocation", Config.interBranchCheckPointDir)
      .outputMode("update")
      .start()



object IBTransformer extends Serializable {

  case class IBStateStore(InternalID: String, monStartTimestamp: Timestamp)

  def mappingFunctionIB(intrKey: String, intrValue: Iterator[MonitoringEvent], intrState: GroupState[IBStateStore]): Seq[MonitoringEvent] = {

    try {
      if (intrState.hasTimedOut) {
            intrState.remove()
               Seq.empty
      } else {

        val events = intrValue.toSeq
        if (events.map(_.Status).contains(Started)) {

           val tmp = events.filter(x => (x.Status == Started && x.InternalID == intrKey)).head
           val toStore = IBStateStore(tmp.InternalID, tmp.monStartTimestamp)
            intrState.update(toStore)
            intrState.setTimeoutDuration(1200000) 
        }
        val IB = events.filter(_.ParentID.isDefined)
        if (intrState.exists && IB.nonEmpty) {
          val startEvent = intrState.get
          val IBUpdate = IB.map {x => x.copy(InternalID = startEvent.InternalID, monStartTimestamp = startEvent.monStartTimestamp) }
          IBUpdate.foreach(id => intrState.update((IBStateStore(id.InternalID, id.monStartTimestamp)))) // updates the state with new IDs
          IBUpdate
        } else {
          Seq.empty
        }
      }
    }

    catch
    .
    .
    .       
  }
}

Количество используемых исполнителей - 8 Exector Memory - 8G Memory Driver - 8G

Параметры и память Java, которые я предоставляю в моей искре Отправить скрипт

 --executor-memory 8G \
 --executor-cores 8 \
 --num-executors 4 \
 --driver-memory 8G \
--driver-java-options "-Dsun.security.krb5.debug=true -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dconfig.file=configIB.conf -Dlog4j.configuration=IBprocessor.log4j.properties" \

Пробовал с помощью G1GC в javaварианты, но улучшения нет.Ключи, которые мы держим, также меньше предоставленного размера, поэтому не уверены, где они работают неправильно.

Есть предложения по улучшению производительности и устранению проблем с сборкой мусора?

...