Когда вы указываете keyBy (0), вы вводите поток с помощью первого элемента кортежей, находящихся в потоке, или, другими словами, вы вводите поток с помощью строки слова. Однако компилятор не может выяснить, что ключ является строкой, поэтому эта версия keyBy всегда обрабатывает ключ как кортеж, содержащий некоторый объект (который является фактическим ключом).
Если переписать keyBy как keyBy(_._1)
, то компилятор сможет определить тип ключа, и y будет KeyedStream[(String, Int), String]
, что должно быть лучше.
То, что выполняет управление потоком, - это разделение потока, аналогично тому, как groupBy в SQL разбивает таблицу на непересекающиеся непересекающиеся группы. Таким образом, в этом случае поток ("a", 1), ("b", 1), ("c", 1), ("a", 1), ("c", 1), ("c" 1) логически разделена на три группы:
("a",1), ("a",1)
("b",1)
("c",1), ("c",1), ("c",1)
Затем вычисление суммы (1) для каждого из них приводит к уменьшению (в смысле карты / уменьшения) каждого из них путем сложения вторых полей во всех кортежах в каждой группе. Итак, («a», 1), («a», 1) становится («a», 2) и т. Д.
Вместо того, чтобы использовать z=y.sum(1)
, было бы легче понять это записанное более полно как
val z: DataStream[(String, Int)] = y.reduce(new ReduceFunction[(String, Int)] {
override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) =
(t1._1, t1._2 + t2._2)
})
Вы можете точно увидеть, как выглядит z, если вы запустите код. Если вы предоставите ему достаточно ресурсов, он может работать в трех отдельных потоках (так как есть три разных ключа). Я получил эти результаты только сейчас:
3> (a,1)
2> (c,1)
1> (b,1)
2> (c,2)
2> (c,3)
3> (a,2)
где 1>, 2> и 3> указывают, какой поток отвечал за эту строку вывода.