У нас есть прототип esper run, но производительности значительно не хватает. Я полагаю, что это моя вина, а не проблема, связанная с esper, поэтому я искал помощи в поиске проблемы с производительностью.
Я запускаю один экземпляр службы esper, и я выделил ограничения памяти следующим образом: -Xmx6G -Xms1G (я пробовал различные комбинации этих значений). И он может использовать 4 ядра процессора. На момент проведения этих тестов другие сервисы не работали, только esper, kafka, zookeeper.
Я использую Akka Streams для потоковой передачи событий в Esper, сервис очень простой, он передается из kafka, вставляет события в Esper Runtime, Esper уже протестировал и работает 3 EPStatements. Есть один слушатель, и я добавляю его ко всем 3 операторам, слушатель выводит сопоставленные события в kafka.
Некоторые вещи, которые я пытался выделить, где проблема производительности:
- Удалить некоторые EPStatements
- Удалить все EPStatements
- Удалить слушателя
- Удалить EPStatements и Listener
- Удалить esper .sendEvent (...) (Это значительно повышает производительность, поэтому кажется, что проблема esper, а не проблема akka)
Только номер 4, приведенный выше, обеспечил какое-либо заметное улучшение производительности.
Ниже приведен пример запроса, который мы выполняем через esper. Он протестирован и работает, я прочитал раздел документации по настройке производительности, и он мне кажется нормальным. Все мои запросы имеют одинаковый формат:
select * from EsperEvent#time(5 minutes)
match_recognize (
partition by asset_id
measures A as event1, B as event2, C as event3
pattern (A Z* B Z* C)
interval 10 seconds or terminated
define
A as A.eventtype = 13 AND A.win_EventID = "4624" AND A.win_LogonType = "3",
B as B.eventtype = 13 AND B.win_EventID = "4672",
C as C.eventtype = 13 AND (C.win_EventID = "4697" OR C.win_EventID = "7045")
)
Какой-то код ..
Вот мой поток akka:
kafkaConsumer
.via(parsing) // Parse the json event to a POJO for esper. Have tried without this step also, no performance impact
.via(esperFlow) // mapAsync call to sendEvent(...)
//Here I am using kafka to measure the flow throughput rate. This is where I establish my throughput rate, based on the rate messages are written to "esper_flow_through" topic.
.map(rec => new ProducerRecord[Array[Byte], String]("esper_flow_through", Serialization.write(rec)))
.runWith(sink)
esperFlow (Параллелизм = 4 по умолчанию):
val esperFlow = Flow[EsperEvent]
.mapAsync(Parallelism)(event => Future {
engine.getEPRuntime.sendEvent(event)
event
})
Слушатель:
override def update(newEvents: Array[EventBean], oldEvents: Array[EventBean], statement: EPStatement, epServiceProvider: EPServiceProvider): Unit = Future {
logger.info(s"Received Listener updates: Query Name: ${statement.getName} ---- ${newEvents.map(_.getUnderlying)}, $oldEvents")
statement.getName match {
case "SERVICE_INSTALL" => serviceInstall.increment(newEvents.length)
case "ADMIN_GROUP" => adminGroup.increment(newEvents.length)
case "SMB_SHARE" => smbShare.increment(newEvents.length)
}
newEvents.map(_.getUnderlying.toString).toList
.foreach(queryMatch => {
val record: ProducerRecord[Array[Byte], String] = new ProducerRecord[Array[Byte], String]("esper_output", queryMatch)
producer.send(record)
})
}
Результаты наблюдений:
- Входной поток имеет скорость ~ 2,4 Кбит / с.
- Мы видим, что эспер не может идти в ногу с самого начала. Максимальная скорость ~ 600 в секунду
- Эспер постепенно уменьшается в пропускной способности
- В конечном итоге пропускная способность esper стабилизируется <100 в секунду </li>
Профилирование, здесь нет ничего необычного:
Скорость кажется очень низкой, поэтому я предполагаю, что мне чего-то не хватает в отношении некоторой конфигурации esper?
Наша целевая пропускная способность должна составлять ~ 10k в секунду. Мы далеки от этого, и у нас есть похожий POC в Spark, который приближается к этой цели.
Обновление:
После комментариев @ user650839 я смог улучшить свою пропускную способность до стабильного 1k в секунду. Оба эти запроса производят одинаковую пропускную способность:
select * from EsperEvent(eventtype = 13 and win_EventID in ("4624", "4672", "4697", "7045"))#time(5 minutes)
match_recognize (
partition by asset_id
measures A as event1, B as event2, C as event3
pattern (A B C)
interval 10 seconds or terminated
define
A as A.eventtype = 13 AND A.win_EventID = "4624" AND A.win_LogonType = "3",
B as B.eventtype = 13 AND B.win_EventID = "4672",
C as C.eventtype = 13 AND (C.win_EventID = "4697" OR C.win_EventID = "7045"))
create context NetworkLogonThenInstallationOfANewService
start EsperEvent(eventtype = 13 AND win_EventID = "4624" AND win_LogonType = "3")
end pattern [
b=EsperEvent(eventtype = 13 AND win_EventID = "4672") ->
c=EsperEvent(eventtype = 13 AND (win_EventID = "4697" OR win_EventID = "7045"))
where timer:within(5 minutes)
]
context NetworkLogonThenInstallationOfANewService select * from EsperEvent output when terminated
Однако 1 кбит / с все еще слишком медленна для наших нужд.