Получение значения предыдущего окна для обработки поздних событий - PullRequest
0 голосов
/ 17 мая 2018

Я ищу способ настройки управления окнами, чтобы учесть задержки, а также позволить мне рассчитывать значения на основе предыдущих значений, рассчитанных для сеанса.

В целом значения моих сеансов являются уникальным идентификатором и никогда не должныесть столкновения, но технически сессии могут прийти в любое время.В большинстве сеансов большинство событий обрабатываются в течение 5 минут. Допустимое опоздание на 1 день должно удовлетворять любые поздние события.

  stream
    .keyBy { jsonEvent => jsonEvent.findValue("session").toString }
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(5)))
    .allowedLateness(Time.days(1))
    .process { new SessionProcessor }
    .addSink { new HttpSink }

Для каждого сеанса я нахожу максимальное значение поля и проверяю, что несколькособытия не произошли (если они произойдут, то поле max value обнулится).Я решил создать ProcessWindowFunction для этого.

Class SessionProcessor extends ProcessWindowFunction[ObjectNode, (String, String, String, Long), String, TimeWindow] {

   override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[(String, String, String, Long)]): Unit = {
      //Parse and calculate data
      maxValue = if(badEvent1) 0 else maxValue
      maxValue = if(badEvent2) 0 else maxValue          
      out.collect((string1,string2,string3, maxValue))
   }
}

Это прекрасно работает до учета поздних событий.Когда происходит позднее событие, maxValue пересчитывается и снова выводится в HttpSink.Я ищу способ, чтобы я мог рассчитать дельту предыдущего maxValue и позднего maxValue.

Я ищу способ определить:

  1. Если вызов функции выполнен из позднего события (я не хочу удваивать общее количество сеансов)
  2. Что такое новые данные или если есть способ сохранить предыдущее вычисленное значение.

Любая помощь с этим будет принята с благодарностью.

Редактировать:Новый код, используемый для ValueState

KafkaConsumer.scala

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization.JSONDeserializationSchema
import org.apache.flink.streaming.api.scala._
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time


object KafkaConsumer {
   def main(args: Array[String]) {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
      val properties = getServerProperties
      val consumer = new FlinkKafkaConsumer010[ObjectNode]("test-topic", new JSONDeserializationSchema, properties)
      consumer.setStartFromLatest()
      val stream = env.addSource(consumer)

      stream
        .keyBy { jsonEvent => jsonEvent.findValue("data").findValue("query").findValue("session").toString }
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .allowedLateness(Time.days(1))
        .process {
          new SessionProcessor
        }
        .print
      env.execute("Kafka APN Consumer")
    }
  }

SessionProcessor.scala

import org.apache.flink.util.Collector
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

class SessionProcessor extends ProcessWindowFunction[ObjectNode, (String, String, String, Long), String, TimeWindow] {

  final val previousValue = new ValueStateDescriptor("previousValue", classOf[Long])

  override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[(String, String, String, Long)]): Unit = {

    val previousVal: ValueState[Long] = context.windowState.getState(previousValue)
    val pVal: Long = previousVal.value match {
      case i: Long => i
    }
    var session = ""
    var user = ""
    var department = ""
    var lVal: Long = 0

    elements.foreach( value => {
      var jVal: String = "0"
      if (value.findValue("data").findValue("query").has("value")) {
        jVal = value.findValue("data").findValue("query").findValue("value").toString replaceAll("\"", "")
      }
      session = value.findValue("data").findValue("query").findValue("session").toString replaceAll("\"", "")
      user = value.findValue("data").findValue("query").findValue("user").toString replaceAll("\"", "")
      department = value.findValue("data").findValue("query").findValue("department").toString replaceAll("\"", "")
      lVal = if (jVal.toLong > lVal) jVal.toLong else lVal
    })

    val increaseTime = lVal - pVal
    previousVal.update(increaseTime)
    out.collect((session, user, department, increaseTime))
  }
}

1 Ответ

0 голосов
/ 17 мая 2018

Вот пример, который делает нечто подобное.Надеюсь, это достаточно понятно, и должно быть достаточно легко адаптироваться к вашим потребностям.

Основная идея здесь заключается в том, что вы можете использовать context.windowState(), то есть состояние для каждого окна, доступное через контекст, передаваемый вProcessWindowFunction.Этот windowState на самом деле полезен только для окон, которые запускаются несколько раз, поскольку каждый новый экземпляр окна имеет только что инициализированное (и пустое) хранилище windowState.Для состояния, которое используется всеми окнами (но все еще имеет ключ), используйте context.globalState().

private static class DifferentialWindowFunction
  extends ProcessWindowFunction<Long, Tuple2<Long, Long>, String, TimeWindow> {

  private final static ValueStateDescriptor<Long> previousFiringState =
    new ValueStateDescriptor<>("previous-firing", LongSerializer.INSTANCE);

  private final static ReducingStateDescriptor<Long> firingCounterState =
    new ReducingStateDescriptor<>("firing-counter", new Sum(), LongSerializer.INSTANCE);

  @Override
  public void process(
      String key, 
      Context context, 
      Iterable<Long> values, 
      Collector<Tuple2<Long, Long>> out) {

    ValueState<Long> previousFiring = context.windowState().getState(previousFiringState);
    ReducingState<Long> firingCounter = context.windowState().getState(firingCounterState);

    Long output = Iterables.getOnlyElement(values);
    if (firingCounter.get() == null) {
      // first firing
      out.collect(Tuple2.of(0L, output));
    } else {
      // subsequent firing
      out.collect(Tuple2.of(firingCounter.get(), output - previousFiring.value()));    
    } 
    firingCounter.add(1L);
    previousFiring.update(output);
  }

  @Override
  public void clear(Context context) {
    ValueState<Long> previousFiring = context.windowState().getState(previousFiringState);
    ReducingState<Long> firingCounter = context.windowState().getState(firingCounterState);

    previousFiring.clear();
    firingCounter.clear();
  }
}
...