Спарк не умеет читать из встроенной темы Кафки - PullRequest
0 голосов
/ 07 марта 2020

Я пытаюсь написать интеграционный тест с использованием Embedded Kafka, но получаю исключение NullPointerException. Мой тестовый пример очень прост. Он имеет следующие шаги:

  1. Считывание файла JSON и запись сообщений в inputTopi c.
  2. Выполнение операции readStream.
  3. Do «выберите» в потоке. Это вызывает исключение NullPointerException.

Что я делаю не так? Код указан ниже:

  "My Test which runs with Embedded Kafka" should "Generate correct Result" in {

    implicit val config: EmbeddedKafkaConfig =
      EmbeddedKafkaConfig(
        kafkaPort = 9066,
        zooKeeperPort = 2066,
        Map("log.dir" -> "./src/test/resources/")
      )

    withRunningKafka {
      createCustomTopic(inputTopic)
      val source = Source.fromFile("src/test/resources/test1.json")
      source.getLines.toList.filterNot(_.isEmpty).foreach(
        line => publishStringMessageToKafka(inputTopic, line)
      )
      source.close()
      implicit val deserializer: StringDeserializer = new StringDeserializer

      createCustomTopic(outputTopic)
      import spark2.implicits._

      val schema = spark.read.json("my.json").schema
      val myStream = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9066")
        .option("subscribe", inputTopic)
        .load()

      // Schema looks good
      myStream.printSchema()

      // Following line throws NULLPointerException! Why?
      val df = myStream.select(from_json($"value".cast("string"), schema).alias("value"))

      // There's more code... but let's not worry about that for now.
    }

  }

По запросу ... вот StackTrace :

at net.manub.embeddedkafka.EmbeddedKafkaSupport$$anonfun$withRunningKafka$1$$anonfun$apply$1.apply(EmbeddedKafka.scala:220)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$$anonfun$withRunningKafka$1$$anonfun$apply$1.apply(EmbeddedKafka.scala:213)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$class.withTempDir(EmbeddedKafka.scala:279)
at com.MyTestClass.withTempDir(MyTestClass.scala:18)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$$anonfun$withRunningKafka$1.apply(EmbeddedKafka.scala:213)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$$anonfun$withRunningKafka$1.apply(EmbeddedKafka.scala:212)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$$anonfun$withRunningZooKeeper$1.apply(EmbeddedKafka.scala:268)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$$anonfun$withRunningZooKeeper$1.apply(EmbeddedKafka.scala:265)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$class.withTempDir(EmbeddedKafka.scala:279)
at com.MyTestClass.withTempDir(MyTestClass.scala:18)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$class.withRunningZooKeeper(EmbeddedKafka.scala:265)
at com.MyTestClass.withRunningZooKeeper(MyTestClass.scala:18)
at net.manub.embeddedkafka.EmbeddedKafkaSupport$class.withRunningKafka(EmbeddedKafka.scala:212)
at com.MyTestClass.withRunningKafka(MyTestClass.scala:18)
at com.MyTestClass$$anonfun$1.apply$mcV$sp(MyTestClass.scala:47)
at com.MyTestClass$$anonfun$1.apply(MyTestClass.scala:38)
at com.MyTestClass$$anonfun$1.apply(MyTestClass.scala:38)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1682)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1685)
at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1679)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1692)
at com.MyTestClass.org$scalatest$BeforeAndAfter$$super$runTest(MyTestClass.scala:18)
at org.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:203)
at com.MyTestClass.runTest(MyTestClass.scala:18)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:393)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:381)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:370)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:407)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:381)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:376)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1750)
at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1685)
at org.scalatest.Suite$class.run(Suite.scala:1124)
at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1685)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1795)
at com.MyTestClass.org$scalatest$BeforeAndAfterAll$$super$run(MyTestClass.scala:18)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at com.MyTestClass.org$scalatest$BeforeAndAfter$$super$run(MyTestClass.scala:18)
at org.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:258)
at com.MyTestClass.run(MyTestClass.scala:18)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1349)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1343)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1012)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:133)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
...