Flink - как агрегировать и запрашивать расширенное состояние функций приемника по нескольким слотам задач - PullRequest
0 голосов
/ 29 ноября 2018

Я реализовал расширенную функцию приемника, которая выполняет некоторые сетевые вызовы для каждого вызываемого объекта.Я хотел бы иметь возможность подсчитывать некоторые метаданные по этим событиям, связанные с некоторой контекстной информацией, содержащейся в событии (batchID события), и предоставлять эти метаданные внешней системе.

Например, событие выглядиткак это:

case class MyEvent(batchId: String, eventId: String, moreInformation: ...)

class MySink(...) extends RichSinkFunction[MyEvent] 
{
override def open(parameters: Configuration): Unit = {
    ...
  }

  override def close(): Unit = {
    ...
  }

  override def invoke(event: MyEvent) = {
    // some processing is done here

    ....
   //
   ...
     if (success) {
        I want to save the meta data here per event.batchId
        state.count.number.of.events.processed.for.event.batchId
     }
  }
}

И в другом месте я хочу как-то иметь возможность запросить значение, сколько событий было обработано для batchId

1 Ответ

0 голосов
/ 29 ноября 2018

Несколько вариантов:

План A: Используйте объекты Metric и MetricReporter, чтобы предоставить данные внешней системе (системам).Это имеет недостаток, заключающийся в том, что метрики не являются контрольными точками, и, если batchIds много, вы, вероятно, в конечном итоге будете загрязнять систему метрик множеством метрик, которые не могут получить GC.

ПланированиеB: Переписать вашу RichSinkFunction как RichFlatMap (или ProcessFunction), которая испускает поток удержания кортежей (batchId, number.of.events.in.batchId).Вы можете задать этот поток с помощью batchId, а затем использовать состояние ключа в KeyedProcessFunction (например) для хранения и предоставления этого состояния через запрашиваемое состояние.Это имеет тот недостаток, что запрашиваемое состояние допускает только точечные запросы (по одному ключу за раз).

План C: в этом варианте внешние системы могут запрашивать состояние, созданное в Плане B, путем внедрения запросов в потокэто передается в KeyedBroadcastProcessFunction, которая содержит ключевые данные state.count.number.of.events.processed.for.event.batchId.Затем вы можете использовать ctx.applyToKeyedState в методе processBroadcastElement функции KeyedBroadcastProcessFunction для ответа на эти запросы.См., Например, одно из обучающих упражнений Flink .

План D: записать результаты из B (или C) в redis или эластичный поиск или в другое хранилище данных с запросами и иметьвнешние системы получают эту информацию оттуда.

...