Я не могу найти решение для сообщения об ошибке, которое я получаю при запуске spark-submit в контейнере Docker.
Таким образом, общая идея состоит в том, чтобы генерировать данные через kafka, которые имеют такую структуру:
{'source': 'JFdyGil9YYHU', 'target': 'M4iCWTNB7P9E', 'amount': 5425.76, 'currency': 'EUR'}
Затем получаю эти данные в Spark через Scala-скрипт, а именно:
package com.example.spark
import kafka.serializer.StringDecoder
import org.apache.spark.{TaskContext, SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.kafka.{OffsetRange, HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.parsing.json.JSON
import org.elasticsearch.spark._
object Receiver {
def main(args: Array[String]): Unit = {
/** when starting the receiver, broker and topics must be passed.*/
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectReceiver <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(brokers, topics) = args
/** Create context:
* The --master option specifies the master URL for a distributed cluster,
* or local to run locally with one thread,
* or local[N] to run locally with N threads,
* or local[*] to run locally with as many worker threads as logical cores on your machine.
* You should start by using local for testing.
*/
val sparkConf = new SparkConf().setAppName("Receiver").setMaster("local[*]")
/** Whether elasticsearch-hadoop should create an index (if its missing)
* when writing data to Elasticsearch or fail.
* (default: yes, but specifying anyway for the sake of completeness)
*/
sparkConf.set("es.index.auto.create", "true")
/** Define that the context batch interval should take 2 seconds.*/
//val ssc = new StreamingContext(sparkConf, Seconds(2)) // testing alternatives
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet // if there are many
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
/** Get the lines.
* messages are of format:
* (null, {"key": "value", "key": "value, ...})
* .map(_._2) takes the second tuple argument
*/
val lines = messages.map(_._2)
/** pairs are now: [Ljava.lang.String;@5922fbe4
* it is what "toString" function in scala actually returns:
* def toString(): String = this.getClass.getName + "@" + this.hashCode.toHexString
* [ means it’s an array
* L means it can contain references to objects
* java.lang.String means all those objects should be instances of java.lang.String
* ; is just because Java loves its semicolons
*
* Get rid of all the unneccessary charecters and split the string by comma for further usage.
*/
val pairs = lines.map(_.stripPrefix("{").stripSuffix("}").replaceAll("\"|\\s", "").split(","))
/** Getting key-value from the pairs, which are:
* key: value
* key: value
* key: value
* ...
*/
pairs.foreach(arr =>
arr.map(
x => Map( x(0).split(":")(0) -> x(0).split(":")(1) )
).saveToEs("spark/json-test")
)
/* testing
pairs.foreach(
arr => arr.foreach( x =>
//val source = Map(x.map(_.1) -> x.map(_.2))
//source.foreach(println)
x => x.foreach(println)
)
)*/
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
И мой файл docker-compose.yml имеет следующий вид:
version: '3.7'
services:
# kafka (zookeeper integrated)
kafka:
container_name: kafka
build: ./kafka
environment:
- KAFKA=localhost:9092
- ZOOKEEPER=localhost:2181
expose:
- 2181
- 9092
networks:
- kaspelki-net
# spark (contains all daemons)
spark:
container_name: spark
build: ./spark
command: bash
links:
- "kafka"
ports:
- 8080:8080
- 7077:7077
- 6066:6066
- 4040:4040
environment:
- SPARK_MASTER_HOST=spark://localhost:7077
env_file:
- ./hadoop/hadoop.env
tty: true
expose:
- 7077
- 8080
- 6066
- 4040
volumes:
- ./scripts/spark:/app
networks:
- kaspelki-net
# ELK
elasticsearch:
container_name: elasticsearch
build: ./ELK/elasticsearch
ports:
- 9200:9200
expose:
- 9200
networks:
- kaspelki-net
kibana:
container_name: kibana
build: ./ELK/kibana
ports:
- 5601:5601
expose:
- 5601
networks:
- kaspelki-net
depends_on:
- elasticsearch
### --- volumes --- ###
volumes:
data:
networks:
kaspelki-net:
name: kaspelki-net
Итак, я запускаю «sudo docker-compose up -d» и могу проверить «localhost: 9200» и «localhost: 5601» в моем браузере, которые работают нормально, но когда я запускаю контейнер через «sudo»docker exec -it spark bash "и попробуйте отправить мой receiver.jar через:
spark-submit --master yarn-client --driver-java-options" -Dlog4j.configuration = file:///app/receiver/log4j.properties "/app/receiver/building_jar/target/scala-2.10/receiver.jar kafka: 9092 test
Затем я получаю это сообщение об ошибке:
18/12/28 09:05:18 ОШИБКА NetworkClient: Ошибка узла [127.0.0.1:9200] (соединение отклонено);других узлов не осталось - прерывание ...
с некоторыми другими сообщениями, в которых завершается процесс.Итак, я понимаю, что каким-то образом соединение не удается, но я не понимаю, ПОЧЕМУ: /
Может кто-нибудь помочь?