Я пытаюсь написать некоторый код в рамках более крупного интеграционного теста.
У меня есть функция, которая создает KinesisSource
типа Source[KinesisRecord, Future[Done]
:
protected def createKinesisSource(config: KinesisClientLibConfiguration): Source[KinesisRecord, Future[Done]]#Repr[MyFile.KinesisFlow] = {
KinesisSource(config).mapAsync(1)(processRecord)
}
protected def processRecord(record: KinesisRecord): Future[KinesisFlow] = {
Future { validateRecord(record) }
}
Я хочу создать поддельный источник того же типа из файла.У меня есть (простите код, я новичок в Scala):
val records = scala.io.Source.fromFile("testFile.txt").getLines.toList
.map(i => {
new KinesisRecord( ByteString.fromString(i), "x", None, "x", None, Instant.now(), "x")
})
Source(records).mapAsync(1)(processRecord)
Это дает мне Source[KinesisRecord, NotUsed]
.Как я могу изменить это на Source[KinesisRecord, Future[Done]]
?Я понимаю, что вторым параметром является материализованное значение ( см. Этот пост ), но я не уверен, как указать это значение без фактического применения функции run
.