Проблема из учебника Flink Training: LongRidesSolution. scala - PullRequest
0 голосов
/ 06 мая 2020

Что будет делать эта функция (ProcessElement), довольно ясно: на основе ключевого потока (с ключом rideId) она будет перебирать все элементы, чей rideId принадлежит этому ключу, и обновит состояние на основе условия

override def processElement(ride: TaxiRide,
                                context: KeyedProcessFunction[Long, TaxiRide, TaxiRide]#Context,
                                out: Collector[TaxiRide]): Unit = {
      val timerService = context.timerService
      if (ride.isStart) {
        // the matching END might have arrived first; don't overwrite it
        if (rideState.value() == null) {
          rideState.update(ride)
        }
      }
      else {
        rideState.update(ride)
      }

      timerService.registerEventTimeTimer(ride.getEventTime + 120 * 60 * 1000)
    }

Таймер сработает, когда водяной знак достигнет отметки времени

    override def onTimer(timestamp: Long,
                         ctx: KeyedProcessFunction[Long, TaxiRide, TaxiRide]#OnTimerContext,
                         out: Collector[TaxiRide]): Unit = {
      val savedRide = rideState.value

      if (savedRide != null && savedRide.isStart) {
        out.collect(savedRide)
      }
      rideState.clear()
    }

Проблема: если сначала идет запись End, а затем на основе logi c, она будет не обновлять состояние поездки (связанный ключ), тогда он сработает через 2 часа, тогда он не будет собирать и не генерировать запись, но что, если эта запись соответствует нашим требованиям? ==> время начала записи произошло более 2 часов a go? Я думаю, что должно быть больше логи c, чтобы справиться с этим

1 Ответ

0 голосов
/ 06 мая 2020

Если запись КОНЕЦ обрабатывается перед записью НАЧАЛО, то, возможно, запись НАЧАЛО поступает очень поздно, и когда она поступает, она предоставляет доказательства того, что эта поездка длилась более двух часов.

Однако цель этого упражнения состоит не в том, чтобы найти все заезды продолжительностью более двух часов, а, скорее, в том, чтобы в режиме реального времени отметить заезды, которые должны были закончиться к настоящему моменту (потому что они начинались более чем через два часа go ), но не сделали. Поскольку эти поездки, о которых вы спрашиваете, закончились, остается спорным, заслуживают ли они предупреждений.

Вы подняли интересный момент, который, вероятно, следует добавить на страницу обсуждения упражнений.

...