Я тестирую оконную функцию с listState с включенным TTL.
Фрагмент оконной функции:
public class CustomWindowFunction extends ProcessWindowFunction<InputPOJO, OutputPOJO, String, TimeWindow> {
...
@Override
public void open(Configuration config) {
StateTtlConfig ttlConfig =
StateTtlConfig.newBuilder(listStateTTl)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // NOTE: NeverReturnExpired
.build();
listStateDescriptor = new ListStateDescriptor<>("unprocessedItems", InputPOJO.class);
listStateDescriptor.enableTimeToLive(ttlConfig);
}
@Override
public void process( String key, Context context, Iterable<InputPOJO> windowElements, Collector<OutputPOJO> out) throws Exception {
ListState<InputPOJO> listState = getRuntimeContext().getListState(listStateDescriptor);
....
Iterator<InputPOJO> iterator;
// Getting unexpired listStateItems for computation.
iterator = listState.get().iterator();
while (iterator.hasNext()) {
InputPOJO listStateInput = iterator.next();
System.out.println("There are unexpired elements in listState");
/** Bussiness Logic to compute result using the unexpired values in listState**/
}
/** Bussiness Logic to compute result using the current window elements.*/
// Adding unProcessed WindowElements to ListState(with TTL)
// NOTE: processed WindowElements are removed manually.
iterator = windowElements.iterator();
while (iterator.hasNext()) {
System.out.println("unProcessed Item added to ListState.")
InputPOJO unprocessedItem = iterator.next();
listState.add(unprocessedItem); // This part gets executed for listStateInput1
}
}
....
}
Я использую testHarness
для выполнения интеграционного теста. Я проверяю количество элементов listState, когда истекает TTL для listState. Ниже приведен фрагмент кода моей тестовой функции.
ПРИМЕЧАНИЕ:
- Существует настраиваемая allowLateness, которая реализуется с помощью настраиваемого таймера.
private OneInputStreamOperatorTestHarness testHarness;
private CustomWindowFunction customWindowFunction;
@Before
public void setup_testHarness() throws Exception {
KeySelector keySelector = InputPOJO::getKey;
TypeInformation STRING_INT_TUPLE = TypeInformation.of(new TypeHint() {}); // Any suggestion ?
ListStateDescriptor stateDesc = new ListStateDescriptor<>("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?
/**
* Creating windowOperator for the below function
*
*
*
* DataStream OutputPOJOStream =
* inputPOJOStream
* .keyBy(InputPOJO::getKey)
* .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
* .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
* .process(new CustomWindowFunction(windowListStateTtlMillis));
*
* / customWindowFunction = new CustomWindowFunction (secondsToMillis (windowListStateTtlMillis)); WindowOperator operator = new WindowOperator <> (// настройка .window (ProcessingTimeSession Windows .withGap (maxTimeout)) ProcessingTimeSession Windows .withGap (Time.seconds (triggerMaximumTimeoutSeconds)), новый TimeWindow.Serializer (), // установка .keyBy (InputPOJO :: getKey) keySelector, BasicTypeInfo.STRING_TYPE_INFO.createSerializer (new ExecutionConfig ()), stateDes c, // setting .process (новая функция CustomWindowFunction (windowListStateTtlMillis)), новый InternalIterableProcessWindowFunction > <> (новый CustomTrigger (allowLateness)) новый CustomTrigger (secondsToMillis (allowLatenessSeconds)), 0, ноль); // Создание testHarness для оператора окна testHarness = new KeyedOneInputStreamOperatorTestHarness <> (operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO); // Настройка и открытие тестового жгута testHarness.setup (); testHarness.open (); } @Test publi c void test_listStateTtl_exclusion () throws Exception {int enabledLatenessSeconds = 3; int listStateTTL = 10; // 1. Arrange InputPOJO listStateInput1 = new InputPOJO (1, "Arjun"); InputPOJO listStateInput2 = новый InputPOJO (2, «Арун»); // 2. Act // listStateInput1 происходит через 1 секунду testHarness.setProcessingTime (secondsToMillis (1)); testHarness.processElement (new StreamRecord <> (listStateInput1)); // Установка текущего времени обработки на 1 + 3 = 4> allowLateness. // Window.process () вызывается, и окно очищается (FIRE_AND_PURGE) // Ожидание: listStateInput1 помещается в listState с TTL (10 секунд) до завершения process (). testHarness.setProcessingTime (secondsToMillis (4)); // Установка времени обработки после listStateTTL, ie 4 + listStateTTL (10) + 1 = 15 // Ожидание: listStateInput1 исключается из listState (Fails) testHarness.setProcessingTime (secondsToMillis (15)); // Используя sleep (), listStateTTL применяется к listState, а listStateInput1 удаляется (Pass) //Thread.sleep(secondsToMillis(15)) // Передача listStateInput2 тесту Harness testHarness.setProcessingTime (secondsToMillis (16)); testHarness.processElement (new StreamRecord <> (listStateInput2)); // Установка времени обработки после allowLateness = 16 + 3 + 1 = 20 testHarness.setProcessingTime (secondsToMillis (20)); // 3. Список утверждений > streamRecords = testHarness.extractOutputStreamRecords (); // Ожидание: streamRecords будет содержать только listStateInput2, так как listStateInput1 был исключен. // Actual: получение обоих listStateInput1 и listStateInput2 в выходных данных. }
Я заметил, что TTL не применяется при установке времени обработки. Когда я попробовал ту же функцию с Thread.sleep (TTL), результат был, как и ожидалось.
Использует ли listState TTL системное время для выселения (с testHarness)?
Есть ли способ проверить listStateTTL с помощью testHarness?