Я читаю простую строку JSON в качестве ввода и набираю поток на основе двух полей A
и B
.Но KeyBy генерирует один и тот же ключевой поток для разных значений B
, но для определенной комбинации A
и B
.
Ввод:
{
"A": "352580084349898",
"B": "1546559127",
"C": "A"
}
Это основная логика моего кода Flink:
DataStream<GenericDataObject> genericDataObjectDataStream = inputStream
.map(new MapFunction<String, GenericDataObject>() {
@Override
public GenericDataObject map(String s) throws Exception {
JSONObject jsonObject = new JSONObject(s);
GenericDataObject genericDataObject = new GenericDataObject();
genericDataObject.setA(jsonObject.getString("A"));
genericDataObject.setB(jsonObject.getString("B"));
genericDataObject.setC(jsonObject.getString("C"));
return genericDataObject;
}
});
DataStream<GenericDataObject> testStream = genericDataObjectDataStream
.keyBy("A", "B")
.map(new MapFunction<GenericDataObject, GenericDataObject>() {
@Override
public GenericDataObject map(GenericDataObject genericDataObject) throws Exception {
return genericDataObject;
}
});
testStream.print();
GenericDataObject - это POJO с тремя полями A
, B
и C
.
И это вывод консоли для разных значений поля B
.
5> GenericDataObject{A='352580084349898', B='1546559224', C='A'}
5> GenericDataObject{A='352580084349898', B='1546559127', C='A'}
4> GenericDataObject{A='352580084349898', B='1546559234', C='A'}
3> GenericDataObject{A='352580084349898', B='1546559254', C='A'}
Обратите внимание на строки 1 и 2. Даже если они имеют разные значения Bони помещаются в один и тот же ключевой поток (5).Я, должно быть, делаю что-то в корне не так, может кто-нибудь, пожалуйста, укажите мне правильное направление?