Как обогатить поток событий большим файлом в Apache Flink? - PullRequest
1 голос
/ 18 марта 2020

У меня есть приложение Flink для сбора и обработки потока кликов. Приложение состоит из Kafka в качестве источника событий, функции карты и приемника, как показано на рисунке ниже:

Flink, huge, static enrichment data

Я хочу обогатить данные входящего потока кликов с IP-адресом пользователя на основе поля userIp в необработанном событии, полученном из Kafka.

упрощенный фрагмент файла CSV, как показано ниже

   start_ip,end_ip,country
   "1.1.1.1","100.100.100.100","United States of America"
   "100.100.100.101","200.200.200.200","China"

Я провел несколько исследований и нашел несколько потенциальные решения:

1. Решение: Передача данных обогащения и соединение с потоком событий с некоторыми логиками сопоставления IP c.

1. Результат: Это хорошо сработало для нескольких примеров данных IP-местоположения, но не для всех данных CSV. Куча JVM достигла 3,5 ГБ, и из-за состояния широковещания нет возможности поместить состояние широковещания на диск (для RocksDb)

2. Решение: Загрузка данных CSV в методе open() в RichFlatMapFunction в состояние (ValueState) перед началом обработки события и обогащение данных события в методе flatMap.

2. Результат: Из-за того, что данные обогащения настолько велики для хранения в куче JVM, их невозможно загрузить в ValueState. А также де-сериализация через ValueState - плохая практика для данных в ключе-значении.

3. Решение: Чтобы избежать ограничения кучи JVM, я попытался поместить данные обогащения в RocksDB (использует диск) как состояние с MapState.

3. Результат: Попытка загрузить CSV-файл в MapState в методе open() привела к ошибке, которая говорит о том, что вы не можете поместить в MapState в open() метод, потому что я не был в контексте с ключами в методе open(), как этот вопрос : Ключ потока Flink key имеет значение null

4. Решение: Из-за необходимости ключевого контекста для MapState (чтобы поместить RocksDB), я попытался загрузить весь CSV-файл в локальный экземпляр RocksDB (диск) в функции процесса после создания DataStream в KeyedStream:

class KeyedIpProcess extends KeyedProcessFunction[Long, Event, Event] {

  var ipMapState: MapState[String, String] = _
  var csvFinishedFlag: ValueState[Boolean] = _

  override def processElement(event: Event,
                              ctx: KeyedProcessFunction[Long, Event, Event]#Context,
                              out: Collector[Event]): Unit = {

    val ipDescriptor = new MapStateDescriptor[String, String]("ipMapState", classOf[String], classOf[String])
    val csvFinishedDescriptor = new ValueStateDescriptor[Boolean]("csvFinished", classOf[Boolean])

    ipMapState = getRuntimeContext.getMapState(ipDescriptor)
    csvFinishedFlag = getRuntimeContext.getState(csvFinishedDescriptor)

    if (!csvFinishedFlag.value()) {
      val csv = new CSVParser(defaultCSVFormat)

      val fileSource = Source.fromFile("/tmp/ip.csv", "UTF-8")
      for (row <- fileSource.getLines()) {
        val Some(List(start, end, country)) = csv.parseLine(row)
        ipMapState.put(start, country)
      }
      fileSource.close()
      csvFinishedFlag.update(true)
    }

    out.collect {
      if (ipMapState.contains(event.userIp)) {
        val details = ipMapState.get(event.userIp)
        event.copy(data =
          event.data.copy(
            ipLocation = Some(details.country)
          ))
      } else {
        event
      }
    }
  }
}

4. Результат: Он слишком хакерский и мешает обработке событий из-за блокировки операции чтения файла.

Не могли бы вы сказать мне, что я могу сделать для этой ситуации?

Спасибо

1 Ответ

0 голосов
/ 19 марта 2020

Что вы можете сделать, это реализовать собственный разделитель и загрузить фрагмент данных обогащения в каждый раздел. Вот пример такого подхода здесь ; Я приведу некоторые ключевые части:

Работа организована следующим образом:

DataStream<SensorMeasurement> measurements = env.addSource(new SensorMeasurementSource(100_000));

DataStream<EnrichedMeasurements> enrichedMeasurements = measurements
  .partitionCustom(new SensorIdPartitioner(), measurement -> measurement.getSensorId())
  .flatMap(new EnrichmentFunctionWithPartitionedPreloading());

Пользовательский разделитель должен знать, сколько существует разделов, и детерминистически назначать каждое событие определенной c partition:

private static class SensorIdPartitioner implements Partitioner<Long> {
    @Override
    public int partition(final Long sensorMeasurement, final int numPartitions) {
        return Math.toIntExact(sensorMeasurement % numPartitions);
    }
}

И тогда функция обогащения использует знание того, как было выполнено разбиение для загрузки только соответствующего фрагмента в каждый экземпляр:

public class EnrichmentFunctionWithPartitionedPreloading extends RichFlatMapFunction<SensorMeasurement, EnrichedMeasurements> {

    private Map<Long, SensorReferenceData> referenceData;

    @Override
    public void open(final Configuration parameters) throws Exception {
        super.open(parameters);
        referenceData = loadReferenceData(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks());
    }


    @Override
    public void flatMap(
            final SensorMeasurement sensorMeasurement,
            final Collector<EnrichedMeasurements> collector) throws Exception {
        SensorReferenceData sensorReferenceData = referenceData.get(sensorMeasurement.getSensorId());
        collector.collect(new EnrichedMeasurements(sensorMeasurement, sensorReferenceData));
    }

    private Map<Long, SensorReferenceData> loadReferenceData(
            final int partition,
            final int numPartitions) {
        SensorReferenceDataClient client = new SensorReferenceDataClient();
        return client.getSensorReferenceDataForPartition(partition, numPartitions);
    }

}

Обратите внимание, что обогащение не выполняется в потоке с ключами, поэтому вы не можете использовать состояние ключа или таймеры в функции обогащения.

...