Docker-Compose Spark Elasticsearch Соединение NetworkClient Ошибка - PullRequest
0 голосов
/ 28 декабря 2018

Я не могу найти решение для сообщения об ошибке, которое я получаю при запуске spark-submit в контейнере Docker.

Таким образом, общая идея состоит в том, чтобы генерировать данные через kafka, которые имеют такую ​​структуру:

{'source': 'JFdyGil9YYHU', 'target': 'M4iCWTNB7P9E', 'amount': 5425.76, 'currency': 'EUR'}

Затем получаю эти данные в Spark через Scala-скрипт, а именно:

package com.example.spark

import kafka.serializer.StringDecoder
import org.apache.spark.{TaskContext, SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.kafka.{OffsetRange, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.parsing.json.JSON

import org.elasticsearch.spark._

object Receiver {
  def main(args: Array[String]): Unit = {
    /** when starting the receiver, broker and topics must be passed.*/
    if (args.length < 2) {
      System.err.println(s"""
        |Usage: DirectReceiver <brokers> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }

    val Array(brokers, topics) = args


    /** Create context:
    *   The --master option specifies the master URL for a distributed cluster,
    *   or local to run locally with one thread,
    *   or local[N] to run locally with N threads,
    *   or local[*] to run locally with as many worker threads as logical cores on your machine.
    *   You should start by using local for testing.
    */
    val sparkConf = new SparkConf().setAppName("Receiver").setMaster("local[*]")

    /** Whether elasticsearch-hadoop should create an index (if its missing)
    *   when writing data to Elasticsearch or fail.
    *   (default: yes, but specifying anyway for the sake of completeness)
    */
    sparkConf.set("es.index.auto.create", "true")

    /** Define that the context batch interval should take 2 seconds.*/
    //val ssc = new StreamingContext(sparkConf, Seconds(2)) // testing alternatives
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet // if there are many
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    /** Get the lines.
    *   messages are of format:
    *   (null, {"key": "value", "key": "value, ...})
    *   .map(_._2) takes the second tuple argument
    */
    val lines = messages.map(_._2)

    /** pairs are now: [Ljava.lang.String;@5922fbe4
    *   it is what "toString" function in scala actually returns:
    *   def toString(): String = this.getClass.getName + "@" + this.hashCode.toHexString
    *   [ means it’s an array
    *   L means it can contain references to objects
    *   java.lang.String means all those objects should be instances of java.lang.String
    *   ; is just because Java loves its semicolons
    *
    *   Get rid of all the unneccessary charecters and split the string by comma for further usage.
    */
    val pairs = lines.map(_.stripPrefix("{").stripSuffix("}").replaceAll("\"|\\s", "").split(","))

    /** Getting key-value from the pairs, which are:
    *   key: value
    *    key: value
    *    key: value
    *    ...
    */
    pairs.foreach(arr =>
        arr.map(
            x => Map( x(0).split(":")(0) -> x(0).split(":")(1) )
        ).saveToEs("spark/json-test")
    )
    /* testing
    pairs.foreach(
        arr => arr.foreach( x =>
            //val source = Map(x.map(_.1) -> x.map(_.2))
            //source.foreach(println)
            x => x.foreach(println)
        )
    )*/

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

И мой файл docker-compose.yml имеет следующий вид:

version: '3.7'
services:


# kafka (zookeeper integrated)
  kafka:
    container_name: kafka
    build: ./kafka
    environment:
      - KAFKA=localhost:9092
      - ZOOKEEPER=localhost:2181
    expose:
      - 2181
      - 9092
    networks:
      - kaspelki-net

# spark (contains all daemons)
  spark:
    container_name: spark
    build: ./spark
    command: bash
    links:
      - "kafka"
    ports:
      - 8080:8080
      - 7077:7077
      - 6066:6066
      - 4040:4040
    environment:
      - SPARK_MASTER_HOST=spark://localhost:7077
    env_file:
      - ./hadoop/hadoop.env
    tty: true
    expose:
      - 7077
      - 8080
      - 6066
      - 4040
    volumes:
      - ./scripts/spark:/app
    networks:
      - kaspelki-net


# ELK
  elasticsearch:
    container_name: elasticsearch
    build: ./ELK/elasticsearch
    ports:
      - 9200:9200
    expose:
      - 9200
    networks:
      - kaspelki-net


  kibana:
    container_name: kibana
    build: ./ELK/kibana
    ports:
      - 5601:5601
    expose:
      - 5601
    networks:
      - kaspelki-net
    depends_on:
      - elasticsearch



### --- volumes --- ###
volumes:
  data:
networks:
  kaspelki-net:
    name: kaspelki-net

Итак, я запускаю «sudo docker-compose up -d» и могу проверить «localhost: 9200» и «localhost: 5601» в моем браузере, которые работают нормально, но когда я запускаю контейнер через «sudo»docker exec -it spark bash "и попробуйте отправить мой receiver.jar через:

spark-submit --master yarn-client --driver-java-options" -Dlog4j.configuration = file:///app/receiver/log4j.properties "/app/receiver/building_jar/target/scala-2.10/receiver.jar kafka: 9092 test

Затем я получаю это сообщение об ошибке:

18/12/28 09:05:18 ОШИБКА NetworkClient: Ошибка узла [127.0.0.1:9200] (соединение отклонено);других узлов не осталось - прерывание ...

с некоторыми другими сообщениями, в которых завершается процесс.Итак, я понимаю, что каким-то образом соединение не удается, но я не понимаю, ПОЧЕМУ: /

Может кто-нибудь помочь?

1 Ответ

0 голосов
/ 28 декабря 2018

Я не знаком со Spark, но где-то в вашей конфигурации вы пытаетесь подключиться к localhost:9200 из одного контейнера в другой, который не будет работать (это работает вне докера, так как localhost - это ваша машина, но когда каждый сервисзапускается в своем собственном контейнере. localhost ссылается на localhost каждого контейнера, а не на хост-машину.локального хоста, и все должно работать - вам нужно добавить эластичный поиск как link в файл compose под вызывающим сервисом, чтобы ссылаться на него по имени сервиса (как вы это делали для kafka как ссылка под spark).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...