У меня есть приложение Flink для сбора и обработки потока кликов. Приложение состоит из Kafka в качестве источника событий, функции карты и приемника, как показано на рисунке ниже:
Я хочу обогатить данные входящего потока кликов с IP-адресом пользователя на основе поля userIp в необработанном событии, полученном из Kafka.
упрощенный фрагмент файла CSV, как показано ниже
start_ip,end_ip,country
"1.1.1.1","100.100.100.100","United States of America"
"100.100.100.101","200.200.200.200","China"
Я провел несколько исследований и нашел несколько потенциальные решения:
1. Решение: Передача данных обогащения и соединение с потоком событий с некоторыми логиками сопоставления IP c.
1. Результат: Это хорошо сработало для нескольких примеров данных IP-местоположения, но не для всех данных CSV. Куча JVM достигла 3,5 ГБ, и из-за состояния широковещания нет возможности поместить состояние широковещания на диск (для RocksDb)
2. Решение: Загрузка данных CSV в методе open()
в RichFlatMapFunction
в состояние (ValueState) перед началом обработки события и обогащение данных события в методе flatMap
.
2. Результат: Из-за того, что данные обогащения настолько велики для хранения в куче JVM, их невозможно загрузить в ValueState. А также де-сериализация через ValueState - плохая практика для данных в ключе-значении.
3. Решение: Чтобы избежать ограничения кучи JVM, я попытался поместить данные обогащения в RocksDB (использует диск) как состояние с MapState.
3. Результат: Попытка загрузить CSV-файл в MapState в методе open()
привела к ошибке, которая говорит о том, что вы не можете поместить в MapState в open()
метод, потому что я не был в контексте с ключами в методе open()
, как этот вопрос : Ключ потока Flink key имеет значение null
4. Решение: Из-за необходимости ключевого контекста для MapState (чтобы поместить RocksDB), я попытался загрузить весь CSV-файл в локальный экземпляр RocksDB (диск) в функции процесса после создания DataStream в KeyedStream:
class KeyedIpProcess extends KeyedProcessFunction[Long, Event, Event] {
var ipMapState: MapState[String, String] = _
var csvFinishedFlag: ValueState[Boolean] = _
override def processElement(event: Event,
ctx: KeyedProcessFunction[Long, Event, Event]#Context,
out: Collector[Event]): Unit = {
val ipDescriptor = new MapStateDescriptor[String, String]("ipMapState", classOf[String], classOf[String])
val csvFinishedDescriptor = new ValueStateDescriptor[Boolean]("csvFinished", classOf[Boolean])
ipMapState = getRuntimeContext.getMapState(ipDescriptor)
csvFinishedFlag = getRuntimeContext.getState(csvFinishedDescriptor)
if (!csvFinishedFlag.value()) {
val csv = new CSVParser(defaultCSVFormat)
val fileSource = Source.fromFile("/tmp/ip.csv", "UTF-8")
for (row <- fileSource.getLines()) {
val Some(List(start, end, country)) = csv.parseLine(row)
ipMapState.put(start, country)
}
fileSource.close()
csvFinishedFlag.update(true)
}
out.collect {
if (ipMapState.contains(event.userIp)) {
val details = ipMapState.get(event.userIp)
event.copy(data =
event.data.copy(
ipLocation = Some(details.country)
))
} else {
event
}
}
}
}
4. Результат: Он слишком хакерский и мешает обработке событий из-за блокировки операции чтения файла.
Не могли бы вы сказать мне, что я могу сделать для этой ситуации?
Спасибо