Из по этой ссылке Я понимаю, что порядок событий, поступающих из некоторого inDataStream, сохраняется для ключа в следующем результирующем outDataStream:
outDataStream = inDataStream.keyBy(...)
.timeWindow(...)
.reduce(...)
Так, например, если мыиметь следующие события, введенные из inDataStream (мы делаем keyBy на клавишах):
(1, key1), (2, key1), (3, key2), (4, key1), (5, key2)
Тогда outDataStream сохранит одинаковый порядок для событий key1 и событий key2.Таким образом, такой результат outDataStream НИКОГДА не может произойти:
(2, ключ1), (1, ключ1), (3, ключ2), (4, ключ1), (5, ключ2)
(Потому что 1 и 2 поменялись).
Пока я прав?Тогда, если мы соединяем в цепочку другой keyBy / process, мы снова получаем результат в том же порядке, верно?Потому что мы просто снова применяем ту же гарантию. Поскольку порядок одинаковых ключей для нас важен, то чтобы убедиться, что мы находимся на одной странице, я сделал упрощенную версию того, что у нас есть:
// incoming events. family is used for keyBy grouping.
case class Event(id: Int, family: String, value: Double)
// the aggregation of events
case class Aggregation(latsetId: Int, family: String, total: Double)
// simply adding events into total aggregation
object AggFunc extends AggregateFunction[Event, Aggregation, Aggregation] {
override def add(e: Event, acc: Aggregation) = Aggregation(e.id, e.family, e.value + acc.total)
override def createAccumulator() = Aggregation(-1, null, 0.0)
override def getResult(acc: Aggregation) = acc
}
object ProcessFunc extends ProcessFunction[Aggregation, String] {
override def processElement(agg: Aggregation, ctx: ProcessFunction[Aggregation, String]#Context, out: Collector[String]) =
out.collect(s"Received aggregation combined with event ${agg.latsetId}. New total=${agg.total}")
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// incoming events from a source have 2 families: "A", and "B"
env.fromElements(Event(1, "A", 6.0), Event(2, "B", 4.0), Event(3, "A", -2.0), Event(4, "B", 3.0),
Event(5, "A", 8.0), Event(6, "B", 1.0), Event(7, "A", -10.0))
.keyBy(_.family)
.timeWindow(Time.seconds(1))
.trigger(CountTrigger.of(1)) // FIRE any incoming event for immediate aggregation and ProcessFunc application
.aggregate(AggFunc)
.keyBy(_.family)
.process(ProcessFunc)
.print()
}
Итак, длятакие события, поступающие в первый keyBy в таком порядке - для любого параллелизма оператора и развертывания кластера мы гарантируем, что Sink (здесь print ()) всегда получит следующие агрегаты семейства «A» и в этом порядке (но, возможно, смешанныес совокупностями семейства "B"):
"Received aggregation combined with event 1. New total=6.0"
"Received aggregation combined with event 3. New total=4.0"
"Received aggregation combined with event 5. New total=12.0"
"Received aggregation combined with event 7. New total=2.0"
Это правильно?