Я пытаюсь использовать KeyedStream с Tuple для обработки различных типов Tuple, включая Tuple6.Продолжайте получать Исключение: Исключение в потоке "main" org.apache.flink.api.common.functions.InvalidTypesException: Использование класса Tuple как типа недопустимо.Вместо этого используйте конкретный подкласс (например, Tuple1, Tuple2 и т. Д.) .
Есть ли способ обойти Erasure здесь?
Я хочуиспользовать KeyedStream, чтобы я мог передать его для обработки Tuple6 как Tuple, например, monitorTupleKeyedStream.
Код ниже:
KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
String keyOperationType = ....;//provided
if (StringUtils.isNotEmpty(keyOperationType)) {
if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
} else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance");
} else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
public Tuple getKey(Monitoring mon) throws Exception {
String key = "";
String keyName = "";
final String eventName = mon.getEventName();
if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
)) {
keyName = PCAM_ID;
key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
} else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
keyName = OUT_BITRATE;
key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
}
mon.setKeyName(keyName);
mon.setKeyValue(key);
return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
}
}); //, info)
} else if (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance", "container"); //<== this is also a Tuple6 but no complaints ?
}
}
Для этого примера ниже необходимо, чтобы monitorTupleKeyedStream был KeyedStream типа [Monitoring, Tuple6 [String, String, String, String, String, String]]
TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple6<String, String, String, String, String, String>>() {
@Override
public Tuple6<String, String, String, String, String, String> getKey(Monitoring mon) throws Exception {
String key = "";
String keyName = "";
//TODO: extract to a method to pull key to use from a config file
final String eventName = mon.getEventName();
if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
)) {
keyName = PCAM_ID;
key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
} else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
keyName = OUT_BITRATE;
key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
}
mon.setKeyName(keyName);
mon.setKeyValue(key);
return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
}
}, info);
TIA,