Я использую testHarness для проверки своего пользовательского триггера. Упрощенный фрагмент приведен ниже:
public class CustomTrigger extends Trigger<InputPOJO, TimeWindow> {
private final ReducingStateDescriptor<Long> previousTriggerDesc = new ReducingStateDescriptor<>( "previous-trigger", new Max(),LongSerializer.INSTANCE);
private final long allowedLatenessMillis;
public CustomTrigger(long allowedLatenessMillis) {
this.allowedLatenessMillis = allowedLatenessMillis;
}
@Override
public TriggerResult onElement(InputPOJO inputPOJO, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> previousTriggerState = ctx.getPartitionedState(previousTriggerDesc);
Long previousTriggerTime = ctx.getPartitionedState(previousTriggerDesc).get();
// Remove previous Timer trigger. else it will invoke twice.
if (previousTriggerTime != null) {
ctx.deleteProcessingTimeTimer(previousTriggerTime); //NOTE
System.out.println("deleteProcessingTimeTimer(previousTriggerTime)"+previousTriggerTime); // Invoked
}
// register new trigger for current InputPOJO.
long currentTriggerTime = ctx.getCurrentProcessingTime() + allowedLatenessMillis;
ctx.registerProcessingTimeTimer(currentTriggerTime);
// Update currentTriggerTime in previousTriggerState.
previousTriggerTimeState.add(currentTriggerTime);
return TriggerResult.CONTINUE;
}
...
}
В пользовательском триггере я регистрирую новый таймер для каждого нового InputPOJO. Когда я регистрирую таймер, я удаляю предыдущий таймер (основанный на previousTimerTriggerTime, сохраненном в сокращенном состоянии).
Я проверяю счетчик таймера (вместе с окном), используя приведенный ниже фрагмент.
private OneInputStreamOperatorTestHarness testHarness;
private CustomWindowFunction customWindowFunction;
@Before
public void setup_testHarness() throws Exception {
KeySelector keySelector = InputPOJO::getKey;
TypeInformation STRING_INT_TUPLE = TypeInformation.of(new TypeHint() {}); // Any suggestion ?
ListStateDescriptor stateDesc = new ListStateDescriptor<>("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?
/**
* Creating windowOperator for the below function
*
*
*
* DataStream OutputPOJOStream =
* inputPOJOStream
* .keyBy(InputPOJO::getKey)
* .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
* .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
* .process(new CustomWindowFunction(windowListStateTtlMillis));
*
* / customWindowFunction = new CustomWindowFunction (secondsToMillis (windowListStateTtlMillis)); WindowOperator operator = new WindowOperator <> (// настройка .window (ProcessingTimeSession Windows .withGap (maxTimeout)) ProcessingTimeSession Windows .withGap (Time.seconds (triggerMaximumTimeoutSeconds)), новый TimeWindow.Serializer (), // установка .keyBy (InputPOJO :: getKey) keySelector, BasicTypeInfo.STRING_TYPE_INFO.createSerializer (new ExecutionConfig ()), stateDes c, // setting .process (новая функция CustomWindowFunction (windowListStateTtlMillis)), новый InternalIterableProcessWindowFunction > <>. (новый CustomTrigger (allowLateness)) новый CustomTrigger (secondsToMillis (allowLatenessSeconds)), 0, ноль); // Создание testHarness для оператора окна testHarness = new KeyedOneInputStreamOperatorTestHarness <> (operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO); // Настройка и открытие тестового жгута testHarness.setup (); testHarness.open (); } @Test publi c void test_allowedLateness_extension_on_second_pojo () выдает исключение {int enabledLatenessSeconds = 3; int listStateTTL = 10; // 1. Arrange InputPOJO listStateInput1 = new InputPOJO (1, "Arjun"); InputPOJO listStateInput2 = новый InputPOJO (2, «Арун»); // 2. Act // listStateInput1 происходит через 1 секунду testHarness.setProcessingTime (secondsToMillis (1)); testHarness.processElement (new StreamRecord <> (listStateInput1)); // listStateInput2 имеет значение 2 se c, ie в периоде allowLateness для listStateInput1 testHarness.setProcessingTime (secondsToMillis (2)); testHarness.processElement (new StreamRecord <> (listStateInput1)); // Ожидание: listStateInput2 удаляет существующий неиспользуемый таймер listStateInput1 и регистрирует новый таймер. // Actual: listStateInput2 зарегистрировал новый таймер и общее количество равно 3. // ПРИМЕЧАНИЕ: // 1. Здесь я использую SessionWindow, поэтому по умолчанию 1 таймер будет зарегистрирован для SessionGap. // 2. Второй таймер должен быть зарегистрированным таймером InputPOJO. Assert.assertEquals (2, testHarness.numProcessingTimeTimers ()); // FAILS}
Здесь запускаются функции, ctx.deleteProcessingTimeTimer(previousTriggerTime);
. Но все же timerCount в testHarness показывает 3.
Это ошибка в testHarness?
Пожалуйста, предоставьте подход к подсчету таймера теста с помощью testHarness.
PS:
Хотя это может показаться типичной функциональностью SessionWindow.Gap (), я использую этот пользовательский триггер в сложном расчете. Для простоты я уменьшил лог c до вышеупомянутого.
Я использую ListStateDescriptor
при создании WindowOperator
для testHarness.