testHarness ListState TTL не применяется на Flink 1.8.2 - PullRequest
0 голосов
/ 05 апреля 2020

Я тестирую оконную функцию с listState с включенным TTL.

Фрагмент оконной функции:

public class CustomWindowFunction extends ProcessWindowFunction<InputPOJO, OutputPOJO, String, TimeWindow> {

  ...

  @Override
  public void open(Configuration config) {
    StateTtlConfig ttlConfig =
        StateTtlConfig.newBuilder(listStateTTl)
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // NOTE: NeverReturnExpired
            .build();
    listStateDescriptor =  new ListStateDescriptor<>("unprocessedItems", InputPOJO.class);
    listStateDescriptor.enableTimeToLive(ttlConfig);
  }

  @Override
  public void process( String key, Context context, Iterable<InputPOJO> windowElements, Collector<OutputPOJO> out) throws Exception {

        ListState<InputPOJO> listState = getRuntimeContext().getListState(listStateDescriptor);

        ....

        Iterator<InputPOJO> iterator;

        // Getting unexpired listStateItems for computation.
        iterator = listState.get().iterator();
        while (iterator.hasNext()) {
            InputPOJO listStateInput = iterator.next();
            System.out.println("There are unexpired elements in listState");

            /** Bussiness Logic to compute result using the unexpired values in listState**/
        }


        /** Bussiness Logic to compute result using the current window elements.*/


        // Adding unProcessed WindowElements to ListState(with TTL)
        // NOTE: processed WindowElements are removed manually.
        iterator = windowElements.iterator();
        while (iterator.hasNext()) {
            System.out.println("unProcessed Item added to ListState.")
            InputPOJO unprocessedItem = iterator.next();
            listState.add(unprocessedItem); // This part gets executed for listStateInput1
        }

    }

    ....
}

Я использую testHarness для выполнения интеграционного теста. Я проверяю количество элементов listState, когда истекает TTL для listState. Ниже приведен фрагмент кода моей тестовой функции.

ПРИМЕЧАНИЕ:

  1. Существует настраиваемая allowLateness, которая реализуется с помощью настраиваемого таймера.
private OneInputStreamOperatorTestHarness testHarness;

private CustomWindowFunction customWindowFunction;

@Before
public void setup_testHarness() throws Exception {

    KeySelector keySelector = InputPOJO::getKey;

    TypeInformation STRING_INT_TUPLE = TypeInformation.of(new TypeHint() {}); // Any suggestion ?

    ListStateDescriptor stateDesc = new ListStateDescriptor<>("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?

    /**
     * Creating windowOperator for the below function
     *
     * 
     *
     *      DataStream OutputPOJOStream =
     *         inputPOJOStream
     *             .keyBy(InputPOJO::getKey)
     *             .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
     *             .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
     *             .process(new CustomWindowFunction(windowListStateTtlMillis));
     * 
* / customWindowFunction = new CustomWindowFunction (secondsToMillis (windowListStateTtlMillis)); WindowOperator operator = new WindowOperator <> (// настройка .window (ProcessingTimeSession Windows .withGap (maxTimeout)) ProcessingTimeSession Windows .withGap (Time.seconds (triggerMaximumTimeoutSeconds)), новый TimeWindow.Serializer (), // установка .keyBy (InputPOJO :: getKey) keySelector, BasicTypeInfo.STRING_TYPE_INFO.createSerializer (new ExecutionConfig ()), stateDes c, // setting .process (новая функция CustomWindowFunction (windowListStateTtlMillis)), новый InternalIterableProcessWindowFunction > <> (новый CustomTrigger (allowLateness)) новый CustomTrigger (secondsToMillis (allowLatenessSeconds)), 0, ноль); // Создание testHarness для оператора окна testHarness = new KeyedOneInputStreamOperatorTestHarness <> (operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO); // Настройка и открытие тестового жгута testHarness.setup (); testHarness.open (); } @Test publi c void test_listStateTtl_exclusion () throws Exception {int enabledLatenessSeconds = 3; int listStateTTL = 10; // 1. Arrange InputPOJO listStateInput1 = new InputPOJO (1, "Arjun"); InputPOJO listStateInput2 = новый InputPOJO (2, «Арун»); // 2. Act // listStateInput1 происходит через 1 секунду testHarness.setProcessingTime (secondsToMillis (1)); testHarness.processElement (new StreamRecord <> (listStateInput1)); // Установка текущего времени обработки на 1 + 3 = 4> allowLateness. // Window.process () вызывается, и окно очищается (FIRE_AND_PURGE) // Ожидание: listStateInput1 помещается в listState с TTL (10 секунд) до завершения process (). testHarness.setProcessingTime (secondsToMillis (4)); // Установка времени обработки после listStateTTL, ie 4 + listStateTTL (10) + 1 = 15 // Ожидание: listStateInput1 исключается из listState (Fails) testHarness.setProcessingTime (secondsToMillis (15)); // Используя sleep (), listStateTTL применяется к listState, а listStateInput1 удаляется (Pass) //Thread.sleep(secondsToMillis(15)) // Передача listStateInput2 тесту Harness testHarness.setProcessingTime (secondsToMillis (16)); testHarness.processElement (new StreamRecord <> (listStateInput2)); // Установка времени обработки после allowLateness = 16 + 3 + 1 = 20 testHarness.setProcessingTime (secondsToMillis (20)); // 3. Список утверждений > streamRecords = testHarness.extractOutputStreamRecords (); // Ожидание: streamRecords будет содержать только listStateInput2, так как listStateInput1 был исключен. // Actual: получение обоих listStateInput1 и listStateInput2 в выходных данных. }

Я заметил, что TTL не применяется при установке времени обработки. Когда я попробовал ту же функцию с Thread.sleep (TTL), результат был, как и ожидалось.

  1. Использует ли listState TTL системное время для выселения (с testHarness)?

  2. Есть ли способ проверить listStateTTL с помощью testHarness?

...