Flink Type Erasure Исключение с вопросом о кортеже - PullRequest
0 голосов
/ 03 апреля 2019

Я пытаюсь использовать KeyedStream с Tuple для обработки различных типов Tuple, включая Tuple6.Продолжайте получать Исключение: Исключение в потоке "main" org.apache.flink.api.common.functions.InvalidTypesException: Использование класса Tuple как типа недопустимо.Вместо этого используйте конкретный подкласс (например, Tuple1, Tuple2 и т. Д.) .

Есть ли способ обойти Erasure здесь?

Я хочуиспользовать KeyedStream, чтобы я мог передать его для обработки Tuple6 как Tuple, например, monitorTupleKeyedStream.

Код ниже:

KeyedStream<Monitoring, Tuple> monitoringTupleKeyedStream = null;
String keyOperationType = ....;//provided        
if (StringUtils.isNotEmpty(keyOperationType)) {
    if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_INSTANCE_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance");
    } else if (keyOperationType.equalsIgnoreCase(Utils.COMPONENT_KEY_OPERATION)) {
        TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
        monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple>() {
            public Tuple getKey(Monitoring mon) throws Exception {
                String key = "";
                String keyName = "";
                final String eventName = mon.getEventName();
                if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                )) {
                    keyName = PCAM_ID;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                    keyName = OUT_BITRATE;
                    key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                }
                mon.setKeyName(keyName);
                mon.setKeyValue(key);
                return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
            }
        }); //, info)
    } else if (keyOperationType.equalsIgnoreCase(COMPONENT_CONTAINER_OPERATION)) {
        monitoringTupleKeyedStream = kinesisStream.keyBy("deployment", "gameId", "eventName", "component", "instance", "container"); //<== this is also a Tuple6 but no complaints ?
    }
}

Для этого примера ниже необходимо, чтобы monitorTupleKeyedStream был KeyedStream типа [Monitoring, Tuple6 [String, String, String, String, String, String]]

TypeInformation<Tuple6<String, String, String, String, String, String>> info = TypeInformation.of(new TypeHint<Tuple6<String, String, String, String, String, String>>(){});
monitoringTupleKeyedStream = kinesisStream.keyBy(new KeySelector<Monitoring, Tuple6<String, String, String, String, String, String>>() {
                    @Override
                    public Tuple6<String, String, String, String, String, String> getKey(Monitoring mon) throws Exception {
                        String key = "";
                        String keyName = "";
                        //TODO: extract to a method to pull key to use from a config file
                        final String eventName = mon.getEventName();
                        if (eventName != null && ((eventName.equalsIgnoreCase(INGRESS_FPS)))
                        )) {
                            keyName = PCAM_ID;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(PCAM_ID) : "";
                        } else if (eventName != null && (eventName.equalsIgnoreCase(EGRESS_FPS))) {
                            keyName = OUT_BITRATE;
                            key = mon.getEventDataMap() != null ? (String) mon.getEventDataMap().get(OUT_BITRATE) : ""; //TODO: identify key to use
                        }
                        mon.setKeyName(keyName);
                        mon.setKeyValue(key);
                        return new Tuple6<>(mon.getDeployment(), mon.getGameId(), eventName, mon.getComponent(), mon.getKeyName(), mon.getKeyValue());
                    }
                }, info);

TIA,

...