Я следовал за примерами для fs2-kafka .Тем не менее, я довольно застрял на примере для потребителя.Проблема, которую я получаю, заключается в несоответствии типов между fs2.stream и cats.effect.IO (ошибка ниже)
Код: NB: Теперь обновлено с рекомендацией @AlexeyNovakov, чтобы предоставить рабочий пример
package pb.streams
import cats.effect.{ContextShift, Timer}
import fs2.kafka._
import fs2.kafka.{AutoOffsetReset, ConsumerSettings}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.forkjoin.ForkJoinPool
import scala.concurrent.duration._
import cats.implicits._
import cats.effect.IO
object Consumer {
implicit val ec: ExecutionContextExecutor =
ExecutionContext.fromExecutor(new ForkJoinPool(4))
implicit val contextShift: ContextShift[IO] = IO.contextShift(ec)
implicit val timer: Timer[IO] = IO.timer(ec)
def main(args: Array[String]): Unit = {
consumeFeed()
()
}
def processRecord(record: ConsumerRecord[String, String]): IO[Unit] = {
println(s"${record.key()} => ${record.value()}")
IO.unit
}
def consumeFeed()= {
val consumerSettings = (executionContext: ExecutionContext) ⇒
ConsumerSettings(
keyDeserializer = new StringDeserializer,
valueDeserializer = new StringDeserializer,
executionContext = executionContext
)
.withAutoOffsetReset(AutoOffsetReset.Earliest)
.withBootstrapServers("localhost:9092")
.withPollTimeout(250.milliseconds)
.withGroupId("group")
for {
executionContext ← consumerExecutionContextStream[IO]
consumer ← consumerStream[IO].using(consumerSettings(executionContext))
_ ← fs2.Stream.eval(consumer.subscribeTo("topic-inbox"))
_ ← consumer.stream
.mapAsync( 4) { message ⇒
processRecord(message.record)
.as(message.committableOffset)
}
.groupWithin(500, 15.seconds)
.map(_.foldLeft(CommittableOffsetBatch.empty[IO])(_ updated _))
.evalMap(_.commit)
} yield ()
}
}
Ошибка, с которой я сталкиваюсь при компиляции:
Error:(55, 29) type mismatch;
found : fs2.Stream[[x]cats.effect.IO[x],Unit]
required: cats.effect.IO[?]
_ ← consumer.stream
Error:(54, 29) type mismatch;
found : cats.effect.IO[Nothing]
required: fs2.Stream[?,?]
_ ← consumer.subscribeTo("topic-inbox")
Error:(55, 9) parameter value consumer in value $anonfun is never used
consumer ← consumerStream[IO].using(consumerSettings(executionContext))
Может кто-нибудь предложить какую-нибудь информацию, которая поможет мне понять и исправить эту загадочную ошибку?Я пробовал разные попытки решить эту проблему, но безрезультатно.Буду признателен за любую помощь, так как я не могу гуглить подобную ситуацию.