Ошибка java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer - PullRequest
0 голосов
/ 21 февраля 2019

Подключение к потоковой передаче с использованием внешнего источника, такого как сервер MS SQL, и публикация данных таблиц в Kafka.

Получение

java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer error.

Пожалуйста, ознакомьтесь с нижеприведенными ссылками.

    **CustomReceiver.sacla**
    package com.sparkdemo.app
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.receiver.Receiver
    import java.util.List
    import java.util.Map
    import com.sparkdemo.entity.Inventory
    import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
    import java.net.ConnectException
    import scala.util.{Try, Success, Failure}
    import collection.JavaConversions._


    class CustomReceiver(topic: String, kafkaParams: Map[String, Object]) extends Receiver[Inventory](StorageLevel.MEMORY_AND_DISK_2) {

      override def onStart(): Unit = {

        val dataService = new DataService()
        var records: Inventory = dataService.selectAll()

        new Thread("Socket Receiver") {

          override def run() {
            Try {
              val consumer = new KafkaConsumer(kafkaParams)
              consumer.subscribe(topic)
              while (!isStopped && records!=null) {

                // store(tokenData)
                // tokenData = new DataService().selectAll();
                val records = new DataService().selectAll();
                store(records)
              }
            } match {
              case e: ConnectException =>
                restart("Error connecting to...", e)
              case t: Throwable =>
                restart("Error receiving data", t)
            }

          }
        }.start()

      }

      override def onStop(): Unit = {
        println("Nothing")
      }
    }

    **DataService.scala**
    package com.sparkdemo.app;
    import java.sql.Connection
    import java.sql.DriverManager
    import java.sql.ResultSet
    import java.sql.Statement
    import java.util._
    import scala.collection.JavaConversions._
    import com.sparkdemo.entity.Inventory

    class DataService {

      var connect: Connection = DriverManager.getConnection(
        "jdbc:sqlserver://localhost;databaseName=TestDB;user=SA;password=Sqlserver@007")

      var statement: Statement = connect.createStatement()

      var resultSet: ResultSet = null

      var inv: Inventory = new Inventory()

      Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")

      def selectAll(): Inventory = {
        resultSet = statement.executeQuery("select * from Inventory")
        while (resultSet.next()) {
          val inv: Inventory = new Inventory()
          inv.setId(resultSet.getInt("id"))
          inv.setName(resultSet.getString("name"))
          inv.setQuantity(resultSet.getInt("quantity"))
        }
        inv
      }

    }
  Scala main class   **Stream.scala**
    package com.sparkdemo.app
    import org.apache.spark.streaming.dstream.DStream
    import com.sparkdemo.config.Config
    import org.apache.kafka.common.serialization.{ StringDeserializer, StringSerializer }
    import org.apache.kafka.clients.producer.{ ProducerRecord, KafkaProducer }
    import java.util.Properties
    import collection.JavaConversions._
    import com.sparkdemo.entity.Inventory


    object Stream extends Serializable{

      def main(args: Array[String]) {
        import org.apache.spark.streaming._

        def getKafkaParams: Map[String, Object] = {
          Map[String, Object](
            "auto.offset.reset" -> "earliest",
            "bootstrap.servers" -> "localhost:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "group3")
        }

        val properties = new Properties()
        properties.put("bootstrap.servers", "localhost:9092")
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        val topic1 = "topic1"
        val topic2 = "topic2"
        val producer :KafkaProducer[String, Object] = new KafkaProducer(properties)

        val ssc = Config.configReceiver()
        val stream = ssc.receiverStream(new CustomReceiver(topic1, getKafkaParams))

        stream.map(Inventory=>producer.send(new ProducerRecord[String,Object](topic2,Inventory)))

        stream.print()


        ssc.start()
        ssc.awaitTermination()

      }

    }

Entity class: **Inventory.scala**
    package com.sparkdemo.entity

    import scala.beans.{BeanProperty}


    class Inventory extends Serializable{

      @BeanProperty
      var id: Int = _

      @BeanProperty
      var name: String = _

      @BeanProperty
      var quantity: Int = _

    }

Ошибка:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:547)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:547)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:701)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:265)
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:546)
    at com.sparkdemo.app.Stream$.main(Stream.scala:36)
    at com.sparkdemo.app.Stream.main(Stream.scala)
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer
Serialization stack:
    - object not serializable (class: org.apache.kafka.clients.producer.KafkaProducer, value: org.apache.kafka.clients.producer.KafkaProducer@557286ad)
    - field (class: com.sparkdemo.app.Stream$$anonfun$main$1, name: producer$1, type: class org.apache.kafka.clients.producer.KafkaProducer)
    - object (class com.sparkdemo.app.Stream$$anonfun$main$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 12 more

Ответы [ 2 ]

0 голосов
/ 22 февраля 2019

Вы столкнулись с проблемой, когда kafkaproducer непреднамеренно отправляется исполнителю из-за приведенного ниже кода stream.map (Inventory => provider.send (new ProducerRecordString, Object)) *

Вы можете отображать и создавать производителя в отображениях так,что оно не отправлено исполнителям.

0 голосов
/ 21 февраля 2019

Проблема в типе Сериализатора , который вы используете для значения Типа объекта.

properties.put ("value.serializer", "org.apache.kafka.common.serialization).StringSerializer ")

Пожалуйста, напишите сериализатор, чтобы прочитать значения типа объекта. Вы можете обратиться по ссылке ниже Отправка пользовательских объектов Java в тему Kafka

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...