Объедините список ZIO ZStreams в один - PullRequest
0 голосов
/ 11 февраля 2020

Мой старый код поддерживал использование одной очереди SQS с SqsStream. Я должен обновить его, чтобы он поддерживал несколько очередей, учитывая список URL-адресов очередей.

Содержимое метода:

for {
  sqs <- Sqs.>.async // async client
  urls <- Sqs.>.queueUrls // List[String] of multiple queues
  _ <- {
    urls
      .map(url => {
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
          .mapMParUnordered(10)(handleMessage) // run "handleMessage" up to 10 times concurrently, ZStream[Env, Throwable, Unit]
          .runDrain // ZIO[Env, Throwable, Unit]
          .forever // ZIO[Env, Throwable, Nothing]
      })
} yield ()

, но компилятор жалуется, поскольку ожидает (ZIO, ZIO, ZIO), тогда как Я дал ему (ZIO, ZIO, список). Я предполагаю, что мне нужно свести все эффекты в этом списке к одному эффекту, который будет выполняться handleMessage параллельно во всех очередях, но я не уверен насчет синтаксиса, поскольку у меня нет опыта работы с ZIO.

По сути, к этому моменту,

urls
      .map(url => {
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))

мой URL стал ZStream. Я думаю, что мне нужно вызвать ZStream.flatMapPar, используя этот элемент и следующий, и так далее, до тех пор, пока все они не будут сведены вместе. Как бы я это сделал?

1 Ответ

4 голосов
/ 11 февраля 2020

runDrain вернет ZIO, который вы можете запустить и забыть с помощью foreachPar_.

for {
  sqs <- Sqs.>.async
  urls <- Sqs.>.queueUrls
  // Returns ZIO[R, E, Unit] and executes each effect in parallel while discarding the results
  _ <- ZIO.foreachPar_(urls) { url =>
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
          // Handles up to 10 messages at a time in parallel.
          .mapMParUnordered(10)(handleMessage)
          // The stream is already unbounded so no need to have `.forever`
          .runDrain
      }
} yield ()

Я бы также уточнил, что SqsStream уже должен быть неограниченным, поэтому вы не должны не нужно использовать forever, а параметр mapMParUnordered относится к максимальному параллелизму, а не к общему количеству обработанных событий.

...