Найти счет в WindowedStream - Flink - PullRequest
0 голосов
/ 01 мая 2019

Я довольно новичок в мире потоков и с первой попытки столкнулся с некоторыми проблемами.

В частности, я пытаюсь реализовать функции count и groupBy в скользящем окне с помощью Flink.

Я сделал это в обычном DateStream, но я не могу заставить его работать в WindowedStream.

Есть ли у вас какие-либо предложения о том, как я могу это сделать?

val parsedStream: DataStream[(String, Response)] = stream
      .mapWith(_.decodeOption[Response])
      .filter(_.isDefined)
      .map { record =>
        (
          s"${record.get.group.group_country}, ${record.get.group.group_state}, ${record.get.group.group_city}",
          record.get
        )
      }

val result: DataStream[((String, Response), Int)] = parsedStream
      .map((_, 1))
      .keyBy(_._1._1)
      .sum(1)

// The output of result is 
// ((us, GA, Atlanta,Response()), 14)
// ((us, SA, Atlanta,Response()), 4)

result
      .keyBy(_._1._1)
      .timeWindow(Time.seconds(5))

//the following part doesn't compile

      .apply(
        new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
          def apply(
                   key: Tuple,
                   window: TimeWindow,
                   values: Iterable[(String, Response)],
                   out: Collector[(String, Int)]
                   ) {}
        }
      )

Ошибка компиляции:

overloaded method value apply with alternatives:
  [R](function: (String, org.apache.flink.streaming.api.windowing.windows.TimeWindow, Iterable[((String, com.flink.Response), Int)], org.apache.flink.util.Collector[R]) => Unit)(implicit evidence$28: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R] <and>
  [R](function: org.apache.flink.streaming.api.scala.function.WindowFunction[((String, com.flink.Response), Int),R,String,org.apache.flink.streaming.api.windowing.windows.TimeWindow])(implicit evidence$27: org.apache.flink.api.common.typeinfo.TypeInformation[R])org.apache.flink.streaming.api.scala.DataStream[R]
 cannot be applied to (org.apache.flink.streaming.api.functions.windowing.WindowFunction[((String, com.flink.Response), Int),(String, com.flink.Response),String,org.apache.flink.streaming.api.windowing.windows.TimeWindow]{def apply(key: String,window: org.apache.flink.streaming.api.windowing.windows.TimeWindow,input: Iterable[((String, com.flink.Response), Int)],out: org.apache.flink.util.Collector[(String, com.flink.Response)]): Unit})
      .apply(

Ответы [ 2 ]

0 голосов
/ 02 мая 2019

Это более простой пример, с которым мы можем работать

val source: DataStream[(JsonField, Int)] = env.fromElements(("hello", 1), ("hello", 2))

    val window2 = source
      .keyBy(0)
      .timeWindow(Time.minutes(1))
      .apply(new WindowFunction[(JsonField, Int), Int, String, TimeWindow] {})

0 голосов
/ 02 мая 2019

Я попробовал Ваш код и обнаружил ошибки, похоже, у вас есть ошибка при объявлении типов для вашего WindowFunction.

В документации сказано, что ожидаемые типы для WindowFunction равны WindowFunction[IN, OUT, KEY, W <: Window].Теперь, если вы посмотрите на свой код, ваш IN - это тип потока данных, на котором вы рассчитываете окна.Тип потока ((String, Response), Int) и не соответствует объявленному в коде (String, Int).

Если вы измените часть, которая не компилируется, на:

.apply(new WindowFunction[((String, Response), Int), (String, Response), String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[((String, Response), Int)], out: Collector[(String, Response)]): Unit = ???
})

РЕДАКТИРОВАТЬ: Как и во втором примере, ошибка возникает по той же причине в целом.Когда вы используете keyBy с Tuple У вас есть две возможные функции для использования keyBy(fields: Int*), которые используют целое число для доступа к полю кортежа с использованием предоставленного индекса (это то, что вы использовали).А также keyBy(fun: T => K), где Вы предоставляете функцию для извлечения ключа, который будет использоваться.

Но есть одно важное различие между этими функциями, одна из которых возвращает ключ как JavaTuple, а другая возвращает ключ с его точным типом.Таким образом, в основном, если вы измените String на Tuple в своем упрощенном примере, он должен скомпилироваться четко.

...