Проверьте RichCoFlatMapFunction в Apache Flink - PullRequest
0 голосов
/ 03 апреля 2020

Я пытаюсь протестировать RichCoFlatMapFunction, которую я использую для левого соединения двух потоков, это что-то вроде этого:

    private ValueState<Card> currentValueState;
    private ListState<Card> historicListState;

    @Override
    public void open(Configuration parameters) throws Exception {
        currentValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("Current State", Card.class));
        historicListState = getRuntimeContext().getListState(new ListStateDescriptor<>("historic state", Card.class));
    }

    @Override
    public void flatMap1(Card currentCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
        Iterable<Card> historicCardList =  historicListState.get();
        if (Iterables.size(historicCardList) > 0) {
            out.collect(new Tuple2<>(currentCard, Lists.newArrayList(historicCardList) ));
        } else {
            currentValueState.update(currentCard);
            out.collect(new Tuple2<>(currentCard, null));
        }
    }

    @Override
    public void flatMap2(Card historicCard, Collector<Tuple2<Card, List<Card>>> out) throws Exception {
        historicListState.add(historicCard);
    }

В методе flatMap1 я возвращаю ноль, когда История c Карта не найдена

out.collect(new Tuple2<>(currentCard, null));

Проблема в том, что, когда я пытаюсь проверить всю эту функциональность, я получаю эту ошибку:

Автомат c извлечение типа невозможно для кандидатов с нулевыми значениями. Пожалуйста, укажите типы напрямую.

Вот как я пытаюсь протестировать функцию richCoFlatMapFunction

    @Test
    public void testFlatMap() throws Exception {
        final Card current = currentCard(2L);
        final Card historic = historicCard(2L);
        final List<Card> historicList = new ArrayList<>();
        historicList.add(historic);
        CoStreamFlatMap<Card, Card, Tuple2<Card, List<Card>>> operator = new CoStreamFlatMap<>(new LeftJoin());
        KeyedTwoInputStreamOperatorTestHarness<Long, Card, Card, Tuple2<Card, List<Card>>> testHarness =
                new KeyedTwoInputStreamOperatorTestHarness<>(
                        operator,
                        (Card c) -> c.getCardHash(),
                        (Card h) -> h.getCardHash(),
                        BasicTypeInfo.LONG_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement1(new StreamRecord<>(current));
        testHarness.processElement2(new StreamRecord<>(historic));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
        expectedOutput.add(new StreamRecord<>(new Tuple2<>(current, historicList)));
        // Check that the result is correct
        ConcurrentLinkedQueue<Object> actualOutput = testHarness.getOutput();
        TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, actualOutput);
    }

Любая помощь, я бы очень признателен, я новичок в Apache Flink и модульное тестирование с его помощью. Спасибо.

1 Ответ

1 голос
/ 03 апреля 2020

Проблема в том, что KeyedTwoInputStreamOperatorTestHarness не знает, как сериализовать выходы вашего оператора LeftJoin. Вы можете указать выходной сериализатор через KeyedTwoInputStreamOperatorTestHarness.setup(TypeSerializer<OUT> outputSerializer).

В вашем случае это будет:

testHarness.setup(TypeInformation.of(new TypeHint<Tuple2<Card, List<Card>>>() {}).createSerializer(new ExecutionConfig()));
...