Можно ли контролировать время обработки ввода с помощью scio JobTest? - PullRequest
0 голосов
/ 19 апреля 2019

Мы используем com.spotify.scio.testing.JobTest для сквозного тестирования нашего конвейера scio. Конвейер включает в себя DoFn, который чувствителен к последовательности данных, в потоке данных конфигурации, который поступает редко.

Мы передаем упорядоченный список значений конфигурации combinedSampleConfig в качестве входных данных для JobTest Builder. Есть ли способ заставить JobTest сохранить порядок этого входного потока CustomIO, когда мы запускаем сквозное тестирование?

Я вижу, что среда тестирования позволяет точно контролировать время поступления источника (используя advanceProcessingTime) при тестировании отдельных компонентов, но не вижу, как применить это для сквозного тестирования с использованием JobTest.

    JobTest[MyApp.type]
      .args(commonArgs ++ Seq(
        "--numWorkers=1",
        "--maxNumWorkers=1",
      ): _*
      )
      .input(CustomIO[PubsubMessage](CONFIG_ID), combinedSampleConfig)
      .input(CustomIO[IndicatorEntry](INPUT_ID), sampleInput)
      .output(CustomIO[EnrichedIndicatorEntry](AGG_ID)) {
        _ should containInAnyOrder (expectedAggs)
      }
      .output(CustomIO[EnrichedIndicatorEntry](EVENT_ID)) {
        _ should containInAnyOrder (expectedEvents)
      }
      .run()

1 Ответ

0 голосов
/ 13 июня 2019

https://github.com/spotify/scio/pull/1905

Этот PR был недавно объединен и должен допускать такой вариант использования.Можешь попробовать?

...