Невозможно отправить искровое задание на искровом кластере в докере - PullRequest
0 голосов
/ 27 февраля 2019

Как и ожидалось в названии, у меня есть некоторые проблемы при отправке задания на спарк-кластер, работающий на докере.

Я написал очень простое задание на спарк в scala, подпишитесь на сервер kafka и расположите некоторые данныеи сохраните их в базе данных elastichsearch.Кафка иasticsearch уже запущены в докере.

Все работает отлично, если я запускаю работу по спекуляции из моего Ide в моей среде разработки (Windows / IntelliJ).

Тогда (и я не Java-паренья добавил кластер искр, следуя этим инструкциям: https://github.com/big-data-europe/docker-spark

Кластер выглядит здоровым, если обратиться к его панели управления.Я создал кластер, состоящий из мастера и рабочего.

Теперь моя работа написана на языке scala:

import java.io.Serializable

import org.apache.commons.codec.StringDecoder
import org.apache.hadoop.fs.LocalFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark
import org.apache.spark.SparkConf
import org.elasticsearch.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.parsing.json.JSON

object KafkaConsumer {
  def main(args: Array[String]): Unit = {

    val sc = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Elastic Search Indexer App")

    sc.set("es.index.auto.create", "true")

    val elasticResource = "iot/demo"
    val ssc = new StreamingContext(sc, Seconds(10))

    //ssc.checkpoint("./checkpoint")

    val kafkaParams = Map(
      "bootstrap.servers" -> "kafka:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "earliest",
      "group.id" -> "group0"
    )

    val topics = List("test")
    val stream = KafkaUtils.createDirectStream(
      ssc,
      PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics.distinct, kafkaParams)
    )

    case class message(key: String, timestamp: Long, payload: Object)
    val rdds = stream.map(record => message(record.key, record.timestamp, record.value))

    val es_config: scala.collection.mutable.Map[String, String] =
      scala.collection.mutable.Map(
        "pushdown" -> "true",
        "es.nodes" -> "http://docker-host",
        "es.nodes.wan.only" -> "true",
        "es.resource" -> elasticResource,
        "es.ingest.pipeline" -> "iot-test-pipeline"
      )


    rdds.foreachRDD { rdd =>
      rdd.saveToEs(es_config)
      rdd.collect().foreach(println)
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

Чтобы передать это кластеру, я сделал:

  • С помощью плагина "sbt-assembly" я создал толстый файл jar со всеми зависимостями.
  • Определите стратегию сборки в build.sbt, чтобы избежать дедуплицирующих ошибок при объединении ...

Затем отправьте с:

. / Spark-submit.cmd --class KafkaConsumer --master spark: // docker-host: 7077 / c / Users / shams / Documents /Appunti / iot-demo-app / spark-streaming / target / scala-2.11 / spark-streaming-assembly-1.0.jar

НО У меня есть эта ошибка:

19/02/27 11:18:12 WARN NativeCodeLoader: невозможно загрузить библиотеку native-hadoop для вашей платформы ... с использованием встроенных классов java, где это применимо Исключение в потоке "main" java.io.IOException: НетФайловая система для схемы: C в org.apache.hadoop.fs.FileSystem.getFileSystemClass (FileSystem.java:2660) в org.apache.hadoop.fs.FileSystem.createFileSystem (FileSystem.java:2667) в org.apache.hadoop.fs.FileSystem.access $ 200 (FileSystem.java:94) в org.apache.hadoop.fs.FileSystem $ Cache.getInternal (FileSystem.java:2703).) в org.apache.hadoop.fs.FileSystem $ Cache.get (FileSystem.java:2685) в org.apache.hadoop.fs.FileSystem.get (FileSystem.java:373) в org.apache.spark.util.Использует $ .getHadoopFileSystem (Utils.scala: 1897) в org.apache.spark.util.Utils $ .doFetchFile (Utils.scala: 694) в org.apache.spark.deploy.DependencyUtils $ .downloadFile (DependencyUtils.scala: 135) в org.apache.spark.deploy.SparkSubmit $$ anonfun $ doPrepareSubmitEnvironment $ 7.apply (SparkSubmit.scala: 416) в org.apache.spark.deploy.SparkSubmit $$ anonfun $ doPrepareSubmitEnvironment Spal $ 7.apply (7. $)) в scala.Option.map (Option.scala: 146) в org.apache.spark.deploy.SparkSubmit $ .doPrepareSubmitEnvironment (SparkSubmit.scala: 415) в org.apache.spark.deploy.SparkSubmit $ .prepareSubmitSubmitEubvironment.scala: 250) в org.apache.spark.deploy.SparkSubmit $ .submit (SparkSubmit.scala: 171) в org.apache.spark.deploy.SparkSubmit $ .main (SparkSubmit.scala: 137) в org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala)

После дня попыток я так и не решил и не могу понять, где в моей работе хочет получить доступ к определенному тому, как это говорит ошибка

Может быть связано спредупреждающее сообщение?Тогда как мне отредактировать мой скрипт, чтобы избежать этой проблемы?

Заранее спасибо.

ОБНОВЛЕНИЕ:

Проблема, похоже, не связана с моим кодомпотому что я попытался отправить простое приложение hello world, скомпилированное таким же образом, но у меня та же проблема.

1 Ответ

0 голосов
/ 27 февраля 2019

После многих попыток и исследований я пришел к выводу, что проблема может заключаться в том, что я использую версию Windows spark-submit с моего компьютера для отправки задания.

Я не мог полностью понятьно сейчас, перемещая файл непосредственно на главный и рабочий узел, я смог отправить его оттуда.

Первая копия в контейнере:

docker cp spark-streaming-assembly-1.0.jar 21b43cb2e698:/spark/bin

Затем я выполняю (в папке / spark / bin) :

./spark-submit --class KafkaConsumer --deploy-mode cluster --master spark://spark-master:7077 spark-streaming-assembly-1.0.jar

Это обходной путь, который я нашел на данный момент.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...