таймер testHarness не удаляется в Flink 1.8.2 - PullRequest
1 голос
/ 05 апреля 2020

Я использую testHarness для проверки своего пользовательского триггера. Упрощенный фрагмент приведен ниже:

public class CustomTrigger extends Trigger<InputPOJO, TimeWindow> {

    private final ReducingStateDescriptor<Long> previousTriggerDesc = new ReducingStateDescriptor<>( "previous-trigger", new Max(),LongSerializer.INSTANCE);

    private final long allowedLatenessMillis;


    public CustomTrigger(long allowedLatenessMillis) {
        this.allowedLatenessMillis = allowedLatenessMillis;
    }


    @Override
    public TriggerResult onElement(InputPOJO inputPOJO, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {

        ReducingState<Long> previousTriggerState = ctx.getPartitionedState(previousTriggerDesc);
        Long previousTriggerTime = ctx.getPartitionedState(previousTriggerDesc).get();

        // Remove previous Timer trigger. else it will invoke twice.
        if (previousTriggerTime != null) {
            ctx.deleteProcessingTimeTimer(previousTriggerTime); //NOTE
            System.out.println("deleteProcessingTimeTimer(previousTriggerTime)"+previousTriggerTime); // Invoked
        }

        // register new trigger for current InputPOJO.      
        long currentTriggerTime = ctx.getCurrentProcessingTime() + allowedLatenessMillis;
        ctx.registerProcessingTimeTimer(currentTriggerTime);

        // Update currentTriggerTime in previousTriggerState.
        previousTriggerTimeState.add(currentTriggerTime);

        return TriggerResult.CONTINUE;
    }

    ...
}

В пользовательском триггере я регистрирую новый таймер для каждого нового InputPOJO. Когда я регистрирую таймер, я удаляю предыдущий таймер (основанный на previousTimerTriggerTime, сохраненном в сокращенном состоянии).

Я проверяю счетчик таймера (вместе с окном), используя приведенный ниже фрагмент.

private OneInputStreamOperatorTestHarness testHarness;

private CustomWindowFunction customWindowFunction;

@Before
public void setup_testHarness() throws Exception {

    KeySelector keySelector = InputPOJO::getKey;

    TypeInformation STRING_INT_TUPLE = TypeInformation.of(new TypeHint() {}); // Any suggestion ?

    ListStateDescriptor stateDesc = new ListStateDescriptor<>("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?

    /**
     * Creating windowOperator for the below function
     *
     * 
     *
     *      DataStream OutputPOJOStream =
     *         inputPOJOStream
     *             .keyBy(InputPOJO::getKey)
     *             .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
     *             .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
     *             .process(new CustomWindowFunction(windowListStateTtlMillis));
     * 
* / customWindowFunction = new CustomWindowFunction (secondsToMillis (windowListStateTtlMillis)); WindowOperator operator = new WindowOperator <> (// настройка .window (ProcessingTimeSession Windows .withGap (maxTimeout)) ProcessingTimeSession Windows .withGap (Time.seconds (triggerMaximumTimeoutSeconds)), новый TimeWindow.Serializer (), // установка .keyBy (InputPOJO :: getKey) keySelector, BasicTypeInfo.STRING_TYPE_INFO.createSerializer (new ExecutionConfig ()), stateDes c, // setting .process (новая функция CustomWindowFunction (windowListStateTtlMillis)), новый InternalIterableProcessWindowFunction > <>. (новый CustomTrigger (allowLateness)) новый CustomTrigger (secondsToMillis (allowLatenessSeconds)), 0, ноль); // Создание testHarness для оператора окна testHarness = new KeyedOneInputStreamOperatorTestHarness <> (operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO); // Настройка и открытие тестового жгута testHarness.setup (); testHarness.open (); } @Test publi c void test_allowedLateness_extension_on_second_pojo () выдает исключение {int enabledLatenessSeconds = 3; int listStateTTL = 10; // 1. Arrange InputPOJO listStateInput1 = new InputPOJO (1, "Arjun"); InputPOJO listStateInput2 = новый InputPOJO (2, «Арун»); // 2. Act // listStateInput1 происходит через 1 секунду testHarness.setProcessingTime (secondsToMillis (1)); testHarness.processElement (new StreamRecord <> (listStateInput1)); // listStateInput2 имеет значение 2 se c, ie в периоде allowLateness для listStateInput1 testHarness.setProcessingTime (secondsToMillis (2)); testHarness.processElement (new StreamRecord <> (listStateInput1)); // Ожидание: listStateInput2 удаляет существующий неиспользуемый таймер listStateInput1 и регистрирует новый таймер. // Actual: listStateInput2 зарегистрировал новый таймер и общее количество равно 3. // ПРИМЕЧАНИЕ: // 1. Здесь я использую SessionWindow, поэтому по умолчанию 1 таймер будет зарегистрирован для SessionGap. // 2. Второй таймер должен быть зарегистрированным таймером InputPOJO. Assert.assertEquals (2, testHarness.numProcessingTimeTimers ()); // FAILS}

Здесь запускаются функции, ctx.deleteProcessingTimeTimer(previousTriggerTime);. Но все же timerCount в testHarness показывает 3.

  1. Это ошибка в testHarness?

  2. Пожалуйста, предоставьте подход к подсчету таймера теста с помощью testHarness.

PS:

  1. Хотя это может показаться типичной функциональностью SessionWindow.Gap (), я использую этот пользовательский триггер в сложном расчете. Для простоты я уменьшил лог c до вышеупомянутого.

  2. Я использую ListStateDescriptor при создании WindowOperator для testHarness.

...