Мы используем com.spotify.scio.testing.JobTest для сквозного тестирования нашего конвейера scio. Конвейер включает в себя DoFn, который чувствителен к последовательности данных, в потоке данных конфигурации, который поступает редко.
Мы передаем упорядоченный список значений конфигурации combinedSampleConfig
в качестве входных данных для JobTest Builder. Есть ли способ заставить JobTest сохранить порядок этого входного потока CustomIO, когда мы запускаем сквозное тестирование?
Я вижу, что среда тестирования позволяет точно контролировать время поступления источника (используя advanceProcessingTime
) при тестировании отдельных компонентов, но не вижу, как применить это для сквозного тестирования с использованием JobTest.
JobTest[MyApp.type]
.args(commonArgs ++ Seq(
"--numWorkers=1",
"--maxNumWorkers=1",
): _*
)
.input(CustomIO[PubsubMessage](CONFIG_ID), combinedSampleConfig)
.input(CustomIO[IndicatorEntry](INPUT_ID), sampleInput)
.output(CustomIO[EnrichedIndicatorEntry](AGG_ID)) {
_ should containInAnyOrder (expectedAggs)
}
.output(CustomIO[EnrichedIndicatorEntry](EVENT_ID)) {
_ should containInAnyOrder (expectedEvents)
}
.run()