Мой старый код поддерживал использование одной очереди SQS с SqsStream. Я должен обновить его, чтобы он поддерживал несколько очередей, учитывая список URL-адресов очередей.
Содержимое метода:
for {
sqs <- Sqs.>.async // async client
urls <- Sqs.>.queueUrls // List[String] of multiple queues
_ <- {
urls
.map(url => {
SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
.mapMParUnordered(10)(handleMessage) // run "handleMessage" up to 10 times concurrently, ZStream[Env, Throwable, Unit]
.runDrain // ZIO[Env, Throwable, Unit]
.forever // ZIO[Env, Throwable, Nothing]
})
} yield ()
, но компилятор жалуется, поскольку ожидает (ZIO, ZIO, ZIO), тогда как Я дал ему (ZIO, ZIO, список). Я предполагаю, что мне нужно свести все эффекты в этом списке к одному эффекту, который будет выполняться handleMessage
параллельно во всех очередях, но я не уверен насчет синтаксиса, поскольку у меня нет опыта работы с ZIO.
По сути, к этому моменту,
urls
.map(url => {
SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
мой URL стал ZStream. Я думаю, что мне нужно вызвать ZStream.flatMapPar
, используя этот элемент и следующий, и так далее, до тех пор, пока все они не будут сведены вместе. Как бы я это сделал?