Как и ожидалось в названии, у меня есть некоторые проблемы при отправке задания на спарк-кластер, работающий на докере.
Я написал очень простое задание на спарк в scala, подпишитесь на сервер kafka и расположите некоторые данныеи сохраните их в базе данных elastichsearch.Кафка иasticsearch уже запущены в докере.
Все работает отлично, если я запускаю работу по спекуляции из моего Ide в моей среде разработки (Windows / IntelliJ).
Тогда (и я не Java-паренья добавил кластер искр, следуя этим инструкциям: https://github.com/big-data-europe/docker-spark
Кластер выглядит здоровым, если обратиться к его панели управления.Я создал кластер, состоящий из мастера и рабочего.
Теперь моя работа написана на языке scala:
import java.io.Serializable
import org.apache.commons.codec.StringDecoder
import org.apache.hadoop.fs.LocalFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark
import org.apache.spark.SparkConf
import org.elasticsearch.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.parsing.json.JSON
object KafkaConsumer {
def main(args: Array[String]): Unit = {
val sc = new SparkConf()
.setMaster("local[*]")
.setAppName("Elastic Search Indexer App")
sc.set("es.index.auto.create", "true")
val elasticResource = "iot/demo"
val ssc = new StreamingContext(sc, Seconds(10))
//ssc.checkpoint("./checkpoint")
val kafkaParams = Map(
"bootstrap.servers" -> "kafka:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "earliest",
"group.id" -> "group0"
)
val topics = List("test")
val stream = KafkaUtils.createDirectStream(
ssc,
PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics.distinct, kafkaParams)
)
case class message(key: String, timestamp: Long, payload: Object)
val rdds = stream.map(record => message(record.key, record.timestamp, record.value))
val es_config: scala.collection.mutable.Map[String, String] =
scala.collection.mutable.Map(
"pushdown" -> "true",
"es.nodes" -> "http://docker-host",
"es.nodes.wan.only" -> "true",
"es.resource" -> elasticResource,
"es.ingest.pipeline" -> "iot-test-pipeline"
)
rdds.foreachRDD { rdd =>
rdd.saveToEs(es_config)
rdd.collect().foreach(println)
}
ssc.start()
ssc.awaitTermination()
}
}
Чтобы передать это кластеру, я сделал:
- С помощью плагина "sbt-assembly" я создал толстый файл jar со всеми зависимостями.
- Определите стратегию сборки в build.sbt, чтобы избежать дедуплицирующих ошибок при объединении ...
Затем отправьте с:
. / Spark-submit.cmd --class KafkaConsumer --master spark: // docker-host: 7077 / c / Users / shams / Documents /Appunti / iot-demo-app / spark-streaming / target / scala-2.11 / spark-streaming-assembly-1.0.jar
НО У меня есть эта ошибка:
19/02/27 11:18:12 WARN NativeCodeLoader: невозможно загрузить библиотеку native-hadoop для вашей платформы ... с использованием встроенных классов java, где это применимо Исключение в потоке "main" java.io.IOException: НетФайловая система для схемы: C в org.apache.hadoop.fs.FileSystem.getFileSystemClass (FileSystem.java:2660) в org.apache.hadoop.fs.FileSystem.createFileSystem (FileSystem.java:2667) в org.apache.hadoop.fs.FileSystem.access $ 200 (FileSystem.java:94) в org.apache.hadoop.fs.FileSystem $ Cache.getInternal (FileSystem.java:2703).) в org.apache.hadoop.fs.FileSystem $ Cache.get (FileSystem.java:2685) в org.apache.hadoop.fs.FileSystem.get (FileSystem.java:373) в org.apache.spark.util.Использует $ .getHadoopFileSystem (Utils.scala: 1897) в org.apache.spark.util.Utils $ .doFetchFile (Utils.scala: 694) в org.apache.spark.deploy.DependencyUtils $ .downloadFile (DependencyUtils.scala: 135) в org.apache.spark.deploy.SparkSubmit $$ anonfun $ doPrepareSubmitEnvironment $ 7.apply (SparkSubmit.scala: 416) в org.apache.spark.deploy.SparkSubmit $$ anonfun $ doPrepareSubmitEnvironment Spal $ 7.apply (7. $)) в scala.Option.map (Option.scala: 146) в org.apache.spark.deploy.SparkSubmit $ .doPrepareSubmitEnvironment (SparkSubmit.scala: 415) в org.apache.spark.deploy.SparkSubmit $ .prepareSubmitSubmitEubvironment.scala: 250) в org.apache.spark.deploy.SparkSubmit $ .submit (SparkSubmit.scala: 171) в org.apache.spark.deploy.SparkSubmit $ .main (SparkSubmit.scala: 137) в org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala)
После дня попыток я так и не решил и не могу понять, где в моей работе хочет получить доступ к определенному тому, как это говорит ошибка
Может быть связано спредупреждающее сообщение?Тогда как мне отредактировать мой скрипт, чтобы избежать этой проблемы?
Заранее спасибо.
ОБНОВЛЕНИЕ:
Проблема, похоже, не связана с моим кодомпотому что я попытался отправить простое приложение hello world, скомпилированное таким же образом, но у меня та же проблема.