org.apache.spark.SparkException: java.nio.channels.ClosedChannelException - PullRequest
0 голосов
/ 25 мая 2019

Я пытаюсь запустить проект, который получает твиты с помощью Apache Kafka, обрабатывает их с помощью Spark Streaming и, наконец, сохраняет твиты в MongoDB (для образовательных целей). Проект находится здесь: https://github.com/alonsoir/hello-kafka-twitter-scala

Я следую всем инструкциям: 1) Запуск сервера zookeeper + сервер кафка 2) Перейдите в каталог проекта и выполните команду 'sbt' (Scala Build Tool), а затем 'pack', которая успешно строит проект 3) Запуск. \ Twitter-продюсер, который начинает показывать твиты в новой командной строке

  • До сих пор все работает, как положено.

4) Запуск. \ Kafka-connector, который должен инициализировать Spark Context, но я получаю следующее:

.\kafka-connector 192.168.59.3:9092 Obq6c
Initializing Streaming Spark Context and kafka connector...
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/05/25 15:00:53 INFO SparkContext: Running Spark version 1.6.1
19/05/25 15:00:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/05/25 15:00:53 INFO SecurityManager: Changing view acls to: Denis.Denchev
19/05/25 15:00:53 INFO SecurityManager: Changing modify acls to: Denis.Denchev
19/05/25 15:00:53 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Denis.Denchev); users with modify permissions: Set(Denis.Denchev)
19/05/25 15:00:54 INFO Utils: Successfully started service 'sparkDriver' on port 54008.
19/05/25 15:00:55 INFO Slf4jLogger: Slf4jLogger started
19/05/25 15:00:55 INFO Remoting: Starting remoting
19/05/25 15:00:55 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.0.100:54021]
19/05/25 15:00:55 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 54021.
19/05/25 15:00:55 INFO SparkEnv: Registering MapOutputTracker
19/05/25 15:00:55 INFO SparkEnv: Registering BlockManagerMaster
19/05/25 15:00:55 INFO DiskBlockManager: Created local directory at C:\Users\Denis.Denchev\AppData\Local\Temp\blockmgr-221b28fc-2862-4b29-933d-d48c5f914759
19/05/25 15:00:55 INFO MemoryStore: MemoryStore started with capacity 1125.8 MB
19/05/25 15:00:55 INFO SparkEnv: Registering OutputCommitCoordinator
19/05/25 15:00:55 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/05/25 15:00:55 INFO SparkUI: Started SparkUI at http://192.168.0.100:4040
19/05/25 15:00:55 INFO Executor: Starting executor ID driver on host localhost
19/05/25 15:00:55 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54040.
19/05/25 15:00:55 INFO NettyBlockTransferService: Server created on 54040
19/05/25 15:00:55 INFO BlockManagerMaster: Trying to register BlockManager
19/05/25 15:00:55 INFO BlockManagerMasterEndpoint: Registering block manager localhost:54040 with 1125.8 MB RAM, BlockManagerId(driver, localhost, 54040)
19/05/25 15:00:55 INFO BlockManagerMaster: Registered BlockManager
19/05/25 15:00:56 INFO VerifiableProperties: Verifying properties
19/05/25 15:00:56 INFO VerifiableProperties: Property group.id is overridden to
19/05/25 15:00:56 INFO VerifiableProperties: Property zookeeper.connect is overridden to
19/05/25 15:01:17 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
        at scala.util.Either.fold(Either.scala:97)
        at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
        at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
        at example.spark.KafkaConnector$.main(KafkaConnectorWithMongo.scala:87)
        at example.spark.KafkaConnector.main(KafkaConnectorWithMongo.scala)
19/05/25 15:01:38 INFO SparkContext: Invoking stop() from shutdown hook
19/05/25 15:01:38 INFO SparkUI: Stopped Spark web UI at http://192.168.0.100:4040
19/05/25 15:01:38 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/05/25 15:01:38 INFO MemoryStore: MemoryStore cleared
19/05/25 15:01:38 INFO BlockManager: BlockManager stopped
19/05/25 15:01:38 INFO BlockManagerMaster: BlockManagerMaster stopped
19/05/25 15:01:38 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/05/25 15:01:38 INFO SparkContext: Successfully stopped SparkContext
19/05/25 15:01:38 INFO ShutdownHookManager: Shutdown hook called
19/05/25 15:01:38 INFO ShutdownHookManager: Deleting directory C:\Users\Denis.Denchev\AppData\Local\Temp\spark-10efc5c8-a803-49cd-b458-2044bd91c557
19/05/25 15:01:38 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
19/05/25 15:01:38 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

и Spark выключается, и ничего не записывается в таблицу MongoDB.

Вот класс scala, который должен инициализировать Spark Context и подключаться к MongoDB:

package example.spark

import java.io.File
import java.util.Date

import com.google.gson.{Gson,GsonBuilder, JsonParser}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._

import com.mongodb.casbah.Imports._
import com.mongodb.QueryBuilder
import com.mongodb.casbah.MongoClient
import com.mongodb.casbah.commons.{MongoDBList, MongoDBObject}


import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder

//import com.github.nscala_time.time.Imports._

/**
 * Collect at least the specified number of json tweets into cassandra, mongo...

on mongo shell:

 use alonsodb;
 db.tweets.find();
 */
object KafkaConnector {

  private var numTweetsCollected = 0L
  private var partNum = 0
  private val numTweetsToCollect = 10000000

  //this settings must be in reference.conf
  private val Database = "bigdata"
  private val Collection = "tweets"
  private val MongoHost = "127.0.0.1"
  private val MongoPort = 27017
  private val MongoProvider = "com.stratio.datasource.mongodb"

  private val jsonParser = new JsonParser()
  private val gson = new GsonBuilder().setPrettyPrinting().create()

  private def prepareMongoEnvironment(): MongoClient = {
      val mongoClient = MongoClient(MongoHost, MongoPort)
      mongoClient
  }

  private def closeMongoEnviroment(mongoClient : MongoClient) = {
      mongoClient.close()
      println("mongoclient closed!")
  }

  private def cleanMongoEnvironment(mongoClient: MongoClient) = {
      cleanMongoData(mongoClient)
      mongoClient.close()
  }

  private def cleanMongoData(client: MongoClient): Unit = {
      val collection = client(Database)(Collection)
      collection.dropCollection()
  }

  def main(args: Array[String]) {
    // Process program arguments and set properties

    if (args.length < 2) {
      System.err.println("Usage: " + this.getClass.getSimpleName +
        "<brokers> <topics>")
      System.exit(1)
    }

    val Array(brokers, topics) = args

    println("Initializing Streaming Spark Context and kafka connector...")
    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("KafkaConnector").setMaster("local[4]").set("spark.driver.allowMultipleContexts", "true")
   // val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

    println("Initialized Streaming Spark Context and kafka connector...")  

    println("Initializing mongodb connector...")

    val mongoClient = prepareMongoEnvironment()
    val collection = mongoClient(Database)(Collection)

    println("Initialized mongodb connector...")

    try {
        /*val sqlContext = new SQLContext(sc)
        println("Creating temporary table in mongo instance...")
        sqlContext.sql(
            s"""|CREATE TEMPORARY TABLE $Collection
              |(id STRING, tweets STRING)
              |USING $MongoProvider
              |OPTIONS (
              |host '$MongoHost:$MongoPort',
              |database '$Database',
              |collection '$Collection'
              |)
            """.stripMargin.replaceAll("\n", " "))*/

        messages.foreachRDD(rdd => {
          val count = rdd.count()
          if (count>0) {
            val topList = rdd.take(count.toInt)
            println("\nReading data from kafka broker... (%s total):".format(rdd.count()))
            topList.foreach(println)
            //println

            for (tweet <- topList) {
               collection.insert {MongoDBObject("id" -> new Date(),"tweets" -> tweet)}
            }//for (tweet <- topList)

            numTweetsCollected += count
            if (numTweetsCollected > numTweetsToCollect) {
              println
              println("numTweetsCollected > numTweetsToCollect condition is reached. Stopping..." + numTweetsCollected + " " + count)
              //cleanMongoEnvironment(mongoClient)
              closeMongoEnviroment(mongoClient)
              println("shutdown mongodb connector...")
              System.exit(0)
            }
          }//if(count>0)
        })//messages.foreachRDD(rdd =>

        //studentsDF.where(studentsDF("age") > 15).groupBy(studentsDF("enrolled")).agg(avg("age"), max("age")).show(5)
     //!val tweetsDF = sqlContext.read.format("com.stratio.datasource.mongodb").table(s"$Collection")
        //tweetsDF.show(numTweetsCollected.toInt)
       //! tweetsDF.show(5)
        println("tested a mongodb connection with stratio library...")
    } finally {
        //sc.stop()
        println("finished withSQLContext...")
    }

    ssc.start()
    ssc.awaitTermination()

    println("Finished!")
  }
}

уточнить:

// val sc = new SparkContext(sparkConf)

прокомментирован, потому что ранее я получил и ошибка, что только один Spark Context может работать на этой JVM.

Я понятия не имею, что может быть причиной этого. Любые направления будут очень признательны.

...