Я пытаюсь запустить проект, который получает твиты с помощью Apache Kafka, обрабатывает их с помощью Spark Streaming и, наконец, сохраняет твиты в MongoDB (для образовательных целей). Проект находится здесь: https://github.com/alonsoir/hello-kafka-twitter-scala
Я следую всем инструкциям:
1) Запуск сервера zookeeper + сервер кафка
2) Перейдите в каталог проекта и выполните команду 'sbt' (Scala Build Tool), а затем 'pack', которая успешно строит проект
3) Запуск. \ Twitter-продюсер, который начинает показывать твиты в новой командной строке
- До сих пор все работает, как положено.
4) Запуск. \ Kafka-connector, который должен инициализировать Spark Context, но я получаю следующее:
.\kafka-connector 192.168.59.3:9092 Obq6c
Initializing Streaming Spark Context and kafka connector...
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/05/25 15:00:53 INFO SparkContext: Running Spark version 1.6.1
19/05/25 15:00:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/05/25 15:00:53 INFO SecurityManager: Changing view acls to: Denis.Denchev
19/05/25 15:00:53 INFO SecurityManager: Changing modify acls to: Denis.Denchev
19/05/25 15:00:53 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Denis.Denchev); users with modify permissions: Set(Denis.Denchev)
19/05/25 15:00:54 INFO Utils: Successfully started service 'sparkDriver' on port 54008.
19/05/25 15:00:55 INFO Slf4jLogger: Slf4jLogger started
19/05/25 15:00:55 INFO Remoting: Starting remoting
19/05/25 15:00:55 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.0.100:54021]
19/05/25 15:00:55 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 54021.
19/05/25 15:00:55 INFO SparkEnv: Registering MapOutputTracker
19/05/25 15:00:55 INFO SparkEnv: Registering BlockManagerMaster
19/05/25 15:00:55 INFO DiskBlockManager: Created local directory at C:\Users\Denis.Denchev\AppData\Local\Temp\blockmgr-221b28fc-2862-4b29-933d-d48c5f914759
19/05/25 15:00:55 INFO MemoryStore: MemoryStore started with capacity 1125.8 MB
19/05/25 15:00:55 INFO SparkEnv: Registering OutputCommitCoordinator
19/05/25 15:00:55 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/05/25 15:00:55 INFO SparkUI: Started SparkUI at http://192.168.0.100:4040
19/05/25 15:00:55 INFO Executor: Starting executor ID driver on host localhost
19/05/25 15:00:55 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54040.
19/05/25 15:00:55 INFO NettyBlockTransferService: Server created on 54040
19/05/25 15:00:55 INFO BlockManagerMaster: Trying to register BlockManager
19/05/25 15:00:55 INFO BlockManagerMasterEndpoint: Registering block manager localhost:54040 with 1125.8 MB RAM, BlockManagerId(driver, localhost, 54040)
19/05/25 15:00:55 INFO BlockManagerMaster: Registered BlockManager
19/05/25 15:00:56 INFO VerifiableProperties: Verifying properties
19/05/25 15:00:56 INFO VerifiableProperties: Property group.id is overridden to
19/05/25 15:00:56 INFO VerifiableProperties: Property zookeeper.connect is overridden to
19/05/25 15:01:17 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at example.spark.KafkaConnector$.main(KafkaConnectorWithMongo.scala:87)
at example.spark.KafkaConnector.main(KafkaConnectorWithMongo.scala)
19/05/25 15:01:38 INFO SparkContext: Invoking stop() from shutdown hook
19/05/25 15:01:38 INFO SparkUI: Stopped Spark web UI at http://192.168.0.100:4040
19/05/25 15:01:38 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/05/25 15:01:38 INFO MemoryStore: MemoryStore cleared
19/05/25 15:01:38 INFO BlockManager: BlockManager stopped
19/05/25 15:01:38 INFO BlockManagerMaster: BlockManagerMaster stopped
19/05/25 15:01:38 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/05/25 15:01:38 INFO SparkContext: Successfully stopped SparkContext
19/05/25 15:01:38 INFO ShutdownHookManager: Shutdown hook called
19/05/25 15:01:38 INFO ShutdownHookManager: Deleting directory C:\Users\Denis.Denchev\AppData\Local\Temp\spark-10efc5c8-a803-49cd-b458-2044bd91c557
19/05/25 15:01:38 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
19/05/25 15:01:38 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
и Spark выключается, и ничего не записывается в таблицу MongoDB.
Вот класс scala, который должен инициализировать Spark Context и подключаться к MongoDB:
package example.spark
import java.io.File
import java.util.Date
import com.google.gson.{Gson,GsonBuilder, JsonParser}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import com.mongodb.casbah.Imports._
import com.mongodb.QueryBuilder
import com.mongodb.casbah.MongoClient
import com.mongodb.casbah.commons.{MongoDBList, MongoDBObject}
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
//import com.github.nscala_time.time.Imports._
/**
* Collect at least the specified number of json tweets into cassandra, mongo...
on mongo shell:
use alonsodb;
db.tweets.find();
*/
object KafkaConnector {
private var numTweetsCollected = 0L
private var partNum = 0
private val numTweetsToCollect = 10000000
//this settings must be in reference.conf
private val Database = "bigdata"
private val Collection = "tweets"
private val MongoHost = "127.0.0.1"
private val MongoPort = 27017
private val MongoProvider = "com.stratio.datasource.mongodb"
private val jsonParser = new JsonParser()
private val gson = new GsonBuilder().setPrettyPrinting().create()
private def prepareMongoEnvironment(): MongoClient = {
val mongoClient = MongoClient(MongoHost, MongoPort)
mongoClient
}
private def closeMongoEnviroment(mongoClient : MongoClient) = {
mongoClient.close()
println("mongoclient closed!")
}
private def cleanMongoEnvironment(mongoClient: MongoClient) = {
cleanMongoData(mongoClient)
mongoClient.close()
}
private def cleanMongoData(client: MongoClient): Unit = {
val collection = client(Database)(Collection)
collection.dropCollection()
}
def main(args: Array[String]) {
// Process program arguments and set properties
if (args.length < 2) {
System.err.println("Usage: " + this.getClass.getSimpleName +
"<brokers> <topics>")
System.exit(1)
}
val Array(brokers, topics) = args
println("Initializing Streaming Spark Context and kafka connector...")
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("KafkaConnector").setMaster("local[4]").set("spark.driver.allowMultipleContexts", "true")
// val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
println("Initialized Streaming Spark Context and kafka connector...")
println("Initializing mongodb connector...")
val mongoClient = prepareMongoEnvironment()
val collection = mongoClient(Database)(Collection)
println("Initialized mongodb connector...")
try {
/*val sqlContext = new SQLContext(sc)
println("Creating temporary table in mongo instance...")
sqlContext.sql(
s"""|CREATE TEMPORARY TABLE $Collection
|(id STRING, tweets STRING)
|USING $MongoProvider
|OPTIONS (
|host '$MongoHost:$MongoPort',
|database '$Database',
|collection '$Collection'
|)
""".stripMargin.replaceAll("\n", " "))*/
messages.foreachRDD(rdd => {
val count = rdd.count()
if (count>0) {
val topList = rdd.take(count.toInt)
println("\nReading data from kafka broker... (%s total):".format(rdd.count()))
topList.foreach(println)
//println
for (tweet <- topList) {
collection.insert {MongoDBObject("id" -> new Date(),"tweets" -> tweet)}
}//for (tweet <- topList)
numTweetsCollected += count
if (numTweetsCollected > numTweetsToCollect) {
println
println("numTweetsCollected > numTweetsToCollect condition is reached. Stopping..." + numTweetsCollected + " " + count)
//cleanMongoEnvironment(mongoClient)
closeMongoEnviroment(mongoClient)
println("shutdown mongodb connector...")
System.exit(0)
}
}//if(count>0)
})//messages.foreachRDD(rdd =>
//studentsDF.where(studentsDF("age") > 15).groupBy(studentsDF("enrolled")).agg(avg("age"), max("age")).show(5)
//!val tweetsDF = sqlContext.read.format("com.stratio.datasource.mongodb").table(s"$Collection")
//tweetsDF.show(numTweetsCollected.toInt)
//! tweetsDF.show(5)
println("tested a mongodb connection with stratio library...")
} finally {
//sc.stop()
println("finished withSQLContext...")
}
ssc.start()
ssc.awaitTermination()
println("Finished!")
}
}
уточнить:
// val sc = new SparkContext(sparkConf)
прокомментирован, потому что ранее я получил и ошибка, что только один Spark Context может работать на этой JVM.
Я понятия не имею, что может быть причиной этого. Любые направления будут очень признательны.