После прочтения официальной документации по тестированию Flink (https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/testing.html) я смог разработать тесты для ProcessFunction, используя Test Harness, что-то вроде этого:
pendingPartitionBuilder = new PendingPartitionBuilder(":::some_name", "")
testHarness =
new OneInputStreamOperatorTestHarness[StaticAdequacyTilePublishedData, PendingPartition](
new ProcessOperator[StaticAdequacyTilePublishedData, PendingPartition](pendingPartitionBuilder)
)
testHarness.open()
теперь япытаясь сделать то же самое для ProcessAllWindowFunction, которая выглядит следующим образом:
class MapVersionValidationDistributor(batchSize: Int) extends
ProcessAllWindowFunction[MapVersionValidation, Seq[StaticAdequacyTilePublishedData],TimeWindow] {
lazy val state: ValueState[Long] = getRuntimeContext .getState(new ValueStateDescriptor[Long]("latestMapVersion", classOf[Long]))
(...)
Сначала я понял, что не могу использовать TestHarness для ProcessAllWindowFunction, потому что у него нет метода processElement. В этом случае, чтоследует ли мне следовать стратегии модульного тестирования?
РЕДАКТИРОВАТЬ: В данный момент мой тестовый код выглядит следующим образом:
val collector = mock[Collector[Seq[StaticAdequacyTilePublishedData]]]
val mvv = new MapVersionValidationDistributor(1)
val input3 = Iterable(new MapVersionValidation("123",Seq(TileValidation(1,true,Seq(1,3,4)))))
val ctx = mock[mvv.Context]
val streamContext = mock[RuntimeContext]
mvv.setRuntimeContext(streamContext)
mvv.open(mock[Configuration])
mvv.process(ctx,input3,collector)
, и я получаю эту ошибку:
Unexpected call: <mock-3> RuntimeContext.getState[T](ValueStateDescriptor{name=latestMapVersion, defaultValue=null, serializer=null}) Expected: inAnyOrder { }