В нашем приложении мы используем структурированную потоковую передачу с MapGroupWithState в сочетании с чтением из Kafka.
После запуска приложения во время начальных пакетов производительность хорошая, если я вижу, что kafka lastProgress почти 65K в секунду,После нескольких пакетов производительность полностью снижается примерно до 2000 в секунду.
В функции MapGroupWithState происходит обновление и сравнение со значением из хранилища состояний (фрагмент кода представлен ниже).
Числосмещений от Kafka - 100000
После запуска приложения во время начальных пакетов производительность хорошая, если я вижу, что kafka lastProgress почти 65K в секунду.После нескольких пакетов производительность полностью снижается примерно до 2000 в секунду.
Если мы видим дамп потока от одного из исполнителей, то нет никаких подозрений, кроме заблокированных потоков из пользовательского интерфейса spark
GC Статистика от одного из исполнителей, как показано ниже, кажется
Не вижу большой разницыпосле GC
Фрагмент кода
case class MonitoringEvent(InternalID: String, monStartTimestamp: Timestamp, EndTimestamp: Timestamp, Stream: String, ParentID: Option[String])
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", Config.uatKafkaUrl)
.option("subscribe", Config.interBranchInputTopic)
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "true")
.option("maxOffsetsPerTrigger", "100000")
.option("request.required.acks", "all")
.load()
.selectExpr("CAST(value AS STRING)")
val me: Dataset[MonitoringEvent] = df.select(from_json($"value", schema).as("data")).select($"data.*").as[MonitoringEvent]
val IB = me.groupByKey(x => (x.ParentID.getOrElse(x.InternalID)))
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout)(IBTransformer.mappingFunctionIB _)
.flatMap(x => x)
val IBStream = IB
.select(to_json(struct($"*")).as("value"), $"InternalID".as("key"))
.writeStream
.format("kafka")
.queryName("InterBranch_Events_KafkaWriter")
.option("kafka.bootstrap.servers", Config.uatKafkaUrl)
.option("topic", Config.interBranchTopicComplete)
.option("checkpointLocation", Config.interBranchCheckPointDir)
.outputMode("update")
.start()
object IBTransformer extends Serializable {
case class IBStateStore(InternalID: String, monStartTimestamp: Timestamp)
def mappingFunctionIB(intrKey: String, intrValue: Iterator[MonitoringEvent], intrState: GroupState[IBStateStore]): Seq[MonitoringEvent] = {
try {
if (intrState.hasTimedOut) {
intrState.remove()
Seq.empty
} else {
val events = intrValue.toSeq
if (events.map(_.Status).contains(Started)) {
val tmp = events.filter(x => (x.Status == Started && x.InternalID == intrKey)).head
val toStore = IBStateStore(tmp.InternalID, tmp.monStartTimestamp)
intrState.update(toStore)
intrState.setTimeoutDuration(1200000)
}
val IB = events.filter(_.ParentID.isDefined)
if (intrState.exists && IB.nonEmpty) {
val startEvent = intrState.get
val IBUpdate = IB.map {x => x.copy(InternalID = startEvent.InternalID, monStartTimestamp = startEvent.monStartTimestamp) }
IBUpdate.foreach(id => intrState.update((IBStateStore(id.InternalID, id.monStartTimestamp)))) // updates the state with new IDs
IBUpdate
} else {
Seq.empty
}
}
}
catch
.
.
.
}
}
Количество используемых исполнителей - 8 Exector Memory - 8G Memory Driver - 8G
Параметры и память Java, которые я предоставляю в моей искре Отправить скрипт
--executor-memory 8G \
--executor-cores 8 \
--num-executors 4 \
--driver-memory 8G \
--driver-java-options "-Dsun.security.krb5.debug=true -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dconfig.file=configIB.conf -Dlog4j.configuration=IBprocessor.log4j.properties" \
Пробовал с помощью G1GC в javaварианты, но улучшения нет.Ключи, которые мы держим, также меньше предоставленного размера, поэтому не уверены, где они работают неправильно.
Есть предложения по улучшению производительности и устранению проблем с сборкой мусора?