Порядок событий с цепочкой keyBy звонки на тот же ключ - PullRequest
0 голосов
/ 15 мая 2018

Из по этой ссылке Я понимаю, что порядок событий, поступающих из некоторого inDataStream, сохраняется для ключа в следующем результирующем outDataStream:

outDataStream = inDataStream.keyBy(...)
    .timeWindow(...)
    .reduce(...)

Так, например, если мыиметь следующие события, введенные из inDataStream (мы делаем keyBy на клавишах):

(1, key1), (2, key1), (3, key2), (4, key1), (5, key2)

Тогда outDataStream сохранит одинаковый порядок для событий key1 и событий key2.Таким образом, такой результат outDataStream НИКОГДА не может произойти:

(2, ключ1), (1, ключ1), (3, ключ2), (4, ключ1), (5, ключ2)

(Потому что 1 и 2 поменялись).

Пока я прав?Тогда, если мы соединяем в цепочку другой keyBy / process, мы снова получаем результат в том же порядке, верно?Потому что мы просто снова применяем ту же гарантию. Поскольку порядок одинаковых ключей для нас важен, то чтобы убедиться, что мы находимся на одной странице, я сделал упрощенную версию того, что у нас есть:

// incoming events. family is used for keyBy grouping.
case class Event(id: Int, family: String, value: Double)
// the aggregation of events
case class Aggregation(latsetId: Int, family: String, total: Double)

// simply adding events into total aggregation
object AggFunc extends AggregateFunction[Event, Aggregation, Aggregation] {
override def add(e: Event, acc: Aggregation) = Aggregation(e.id, e.family, e.value + acc.total)
override def createAccumulator() = Aggregation(-1, null, 0.0)
override def getResult(acc: Aggregation) = acc
}

object ProcessFunc extends ProcessFunction[Aggregation, String] {
override def processElement(agg: Aggregation, ctx: ProcessFunction[Aggregation, String]#Context, out: Collector[String]) =
  out.collect(s"Received aggregation combined with event ${agg.latsetId}. New total=${agg.total}")
}

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
// incoming events from a source have 2 families: "A", and "B"
env.fromElements(Event(1, "A", 6.0), Event(2, "B", 4.0), Event(3, "A", -2.0), Event(4, "B", 3.0),
    Event(5, "A", 8.0), Event(6, "B", 1.0), Event(7, "A", -10.0))
  .keyBy(_.family)
  .timeWindow(Time.seconds(1))
  .trigger(CountTrigger.of(1)) // FIRE any incoming event for immediate aggregation and ProcessFunc application
  .aggregate(AggFunc)
  .keyBy(_.family)
  .process(ProcessFunc)
  .print()
}

Итак, длятакие события, поступающие в первый keyBy в таком порядке - для любого параллелизма оператора и развертывания кластера мы гарантируем, что Sink (здесь print ()) всегда получит следующие агрегаты семейства «A» и в этом порядке (но, возможно, смешанныес совокупностями семейства "B"):

"Received aggregation combined with event 1. New total=6.0"
"Received aggregation combined with event 3. New total=4.0"
"Received aggregation combined with event 5. New total=12.0"
"Received aggregation combined with event 7. New total=2.0"

Это правильно?

Ответы [ 2 ]

0 голосов
/ 23 мая 2018

Flink гарантирует только порядок в параллельном разделе, то есть он не связывается между разделами и удерживает данные для гарантии порядка.

Это означает, что если у вас есть следующие операторы:

map(Mapper1).keyBy(0).map(Mapper2)

и запустите его с параллелизмом 2, то есть

Mapper1(1) -\-/- Mapper2(1)
             X
Mapper1(2) -/-\- Mapper2(2)

Тогда все записи с одинаковым ключом из Mapper1(1) поступят в порядке либо Mapper2(1), либо Mapper2(2) в зависимости отключ.Конечно, то же самое верно для всех записей с одним и тем же ключом от Mapper1(2).

Так что, как только записи с одним и тем же ключом распределяются по нескольким разделам (здесь Mapper1(1) и Mapper1(2)), тамнет никаких гарантий упорядочения для записей разных разделов, но только для тех, которые находятся в одном разделе.

Если важен порядок, вы можете либо уменьшить параллелизм до 1, либо внедрить в свои операторы семантику времени-события и использовать водяные знаки для рассуждения о неупорядоченности записей.

0 голосов
/ 15 мая 2018

Не думаю, что вы можете с уверенностью предположить, что абсолютный порядок элементов потока будет поддерживаться с параллелизмом> 1.

Кроме того, я не думаю, что порядок вообще можно предположить, как только он достигнет оператора агрегации. Выходные данные оператора агрегирования основаны на внутренних таймерах окна, и ключи не должны храниться в каком-либо конкретном порядке.

Если вам требуется заказ, то вам лучше всего отсортировать данные после того, как они попадут в то, что вам нужно.

...