Потребитель fs2-kafka обнаружил ошибку несоответствия типов fs2.stream требуется cats.effect.IO - PullRequest
0 голосов
/ 01 февраля 2019

Я следовал за примерами для fs2-kafka .Тем не менее, я довольно застрял на примере для потребителя.Проблема, которую я получаю, заключается в несоответствии типов между fs2.stream и cats.effect.IO (ошибка ниже)

Код: NB: Теперь обновлено с рекомендацией @AlexeyNovakov, чтобы предоставить рабочий пример

package pb.streams

import cats.effect.{ContextShift, Timer}

import fs2.kafka._
import fs2.kafka.{AutoOffsetReset, ConsumerSettings}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization._

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.forkjoin.ForkJoinPool
import scala.concurrent.duration._

import cats.implicits._
import cats.effect.IO

object Consumer {

  implicit val ec: ExecutionContextExecutor =
    ExecutionContext.fromExecutor(new ForkJoinPool(4))

  implicit val contextShift: ContextShift[IO] = IO.contextShift(ec)
  implicit val timer: Timer[IO] = IO.timer(ec)


  def main(args: Array[String]): Unit = {
    consumeFeed()
    ()
  }

  def processRecord(record: ConsumerRecord[String, String]): IO[Unit] = {
    println(s"${record.key()} => ${record.value()}")

    IO.unit
  }

  def consumeFeed()= {


    val consumerSettings = (executionContext: ExecutionContext) ⇒
      ConsumerSettings(
        keyDeserializer   = new StringDeserializer,
        valueDeserializer = new StringDeserializer,
        executionContext  = executionContext
      )
        .withAutoOffsetReset(AutoOffsetReset.Earliest)
        .withBootstrapServers("localhost:9092")
        .withPollTimeout(250.milliseconds)
        .withGroupId("group")

      for {
        executionContext    ← consumerExecutionContextStream[IO]
        consumer            ← consumerStream[IO].using(consumerSettings(executionContext))
        _                   ← fs2.Stream.eval(consumer.subscribeTo("topic-inbox"))
        _                   ← consumer.stream
                              .mapAsync( 4) { message ⇒
                                processRecord(message.record)
                                  .as(message.committableOffset)
                              }
                              .groupWithin(500, 15.seconds)
                              .map(_.foldLeft(CommittableOffsetBatch.empty[IO])(_ updated _))
                              .evalMap(_.commit)
      } yield ()

  }
}

Ошибка, с которой я сталкиваюсь при компиляции:

Error:(55, 29) type mismatch;
 found   : fs2.Stream[[x]cats.effect.IO[x],Unit]
 required: cats.effect.IO[?]
        _                   ← consumer.stream

Error:(54, 29) type mismatch;
 found   : cats.effect.IO[Nothing]
 required: fs2.Stream[?,?]
        _                   ← consumer.subscribeTo("topic-inbox")

Error:(55, 9) parameter value consumer in value $anonfun is never used
        consumer            ← consumerStream[IO].using(consumerSettings(executionContext))

Может кто-нибудь предложить какую-нибудь информацию, которая поможет мне понять и исправить эту загадочную ошибку?Я пробовал разные попытки решить эту проблему, но безрезультатно.Буду признателен за любую помощь, так как я не могу гуглить подобную ситуацию.

...