Как отправить данные из Spark в Elasticsearch (YARN и Docker-Compose)? - PullRequest
0 голосов
/ 13 января 2019

Я пытаюсь отправить простое приложение (написанное на Scala) с помощью spark-submit с YARN (мастер или клиент на данный момент не имеет значения) в Elasticsearch. Для этого я изменил основной пример с сайтаasticsearch:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._
import org.apache.spark.SparkConf

object Receiver {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Simple Application").setMaster("yarn")
    val sc = new SparkContext(conf)
    conf.set("es.index.auto.create", "true")

    val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
    val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

    val rdd = sc.makeRDD(Seq(numbers, airports))
    rdd.saveAsTextFile("hdfs://namenode:9000/user/root/input/test")
    rdd.saveToEs("spark/docs")

    val RDD = sc.esRDD("spark/docs")
    RDD.first()
  }
}

Я попробовал ".saveToEs" с локальным мастером [*] на моей машине, и он отлично работал. К сожалению, я был слишком быстр и перешел к Docker (и Docker-Compose) и не пробовал этот фрагмент кода с Yarn, поэтому я не знаю, работает ли он так же ...

В любом случае я создал следующий файл docker-compose.yml:

version: '3.7'
services:


# kafka (zookeeper integrated)
  kafka:
    container_name: kafka
    build: ./kafka
    environment:
      - KAFKA=localhost:9092
      - ZOOKEEPER=localhost:2181
    expose:
      - 2181
      - 9092
    networks:
      - kaspelki-net

# HDFS (all bde2020 with tail "2.0.0-hadoop2.7.4-java8")
# (e.g. bde2020/hadoop-datanode:2.0.0-hadoop2.7.4-java8)
  namenode:
    container_name: namenode
    build: ./hadoop/namenode
    volumes:
      - ./data/namenode:/hadoop/dfs/name
    environment:
      - CLUSTER_NAME=cloud1
    env_file:
      - ./hadoop/hadoop.env
    ports:
      - 9870:9870
    expose:
      - 9870
    networks:
      - kaspelki-net

  nodemanager:
    container_name: nodemanager
    build: ./hadoop/nodemanager
    depends_on:
      - namenode
      - datanode
    env_file:
      - ./hadoop/hadoop.env
    networks:
      - kaspelki-net

  resourcemanager:
    container_name: resourcemanager
    build: ./hadoop/resourcemanager
    depends_on:
      - namenode
      - datanode
    env_file:
      - ./hadoop/hadoop.env
    networks:
      - kaspelki-net

  historyserver:
    container_name: historyserver
    build: ./hadoop/historyserver
    depends_on:
      - namenode
      - datanode
    volumes:
      - ./data/historyserver:/hadoop/yarn/timeline
    env_file:
      - ./hadoop/hadoop.env
    networks:
      - kaspelki-net

  datanode:
    container_name: datanode
    build: ./hadoop/datanode
    depends_on: 
      - namenode
    volumes:
      - ./data/datanode:/hadoop/dfs/data
    env_file:
      - ./hadoop/hadoop.env
    ports:
      - 9864:9864
    expose:
      - 9864
    tty: true
    networks:
      - kaspelki-net


# spark (also bde2020 with tail "2.2.0-hadoop2.7")
# (e.g. bde2020/spark-master:2.2.0-hadoop2.7)
  spark_master:
    container_name: spark_master
    build: ./spark/master
    command: bash
    links:
      - "kafka"
      - "elasticsearch"
    ports:
      - 8080:8080
      - 7077:7077
      - 6066:6066
      - 4040:4040
    expose:
      - 7077
      - 8080
      - 6066
      - 4040
    environment:
      - SPARK_MASTER_HOST=spark://localhost:7077
      - YARN_CONF_DIR=/spark/yarn/
      - HADOOP_CONF_DIR=/spark/jars/
      - INIT_DAEMON_STEP=setup_spark
    env_file:
      - ./hadoop/hadoop.env
    tty: true
    volumes:
      - ./scripts/spark:/app
    networks:
      - kaspelki-net


  spark_worker:
    container_name: spark_worker
    build: ./spark/worker
    command: bash
    ports:
      - 8081:8081
    depends_on:
      - spark_master
    environment:
      - SPARK_MASTER=spark://spark_master:7077
    env_file:
      - ./hadoop/hadoop.env
    tty: true
    networks:
      - kaspelki-net


# ELK
  elasticsearch:
    container_name: elasticsearch
    build: ./ELK/elasticsearch
    ports:
      - 9200:9200
    expose:
      - 9200
    cap_add:
      - IPC_LOCK
    environment:
      - cluster.name=cp-es-cluster
      - node.name=cloud1
      - node.master=true
      - http.cors.enabled=true
      - http.cors.allow-origin="*"
      - bootstrap.memory_lock=true
      - discovery.zen.minimum_master_nodes=1
      - xpack.security.enabled=false
      - xpack.monitoring.enabled=false
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
      - network.host=0.0.0.0  # or publish host?
    ulimits:
      memlock:
        soft: -1
        hard: -1
      nofile:
        soft: 65536
        hard: 65536

    networks:
      - kaspelki-net


  kibana:
    container_name: kibana
    build: ./ELK/kibana
    ports:
      - 5601:5601
    expose:
      - 5601
    environment:
      - ELASTICSEARCH_URL=http://elasticsearch:9200
    depends_on:
      - elasticsearch
    networks:
      - kaspelki-net


### --- volumes --- ###
volumes:
  data:
networks:
  kaspelki-net:
    name: kaspelki-net

В котором определены все необходимые мне услуги.

Что я не понимаю и в чем может быть проблема, так это следующее: Когда Spark и все Hadoop-сервисы (namenode, datanode и т. Д.) Были на моей машине локально, spark подал заявку, и hadoop использовал для работы с ней hdfs dfs ...

Как это работает с Docker?

Потому что, когда я сейчас подаю заявку, используя эту команду:

/ spark / bin / spark-submit --class Receiver --master yarn - клиент в режиме развертывания /app/receiver/building_jar/target/scala-2.11/receiver.jar kafka: 9092 тест

Я получаю это сообщение об ошибке и не понимаю, почему:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/01/13 13:00:17 INFO SparkContext: Running Spark version 2.2.0
19/01/13 13:00:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/01/13 13:00:17 WARN SparkConf: spark.master yarn-client is deprecated in Spark 2.0+, please instead use "yarn" with specified deploy mode.
19/01/13 13:00:17 INFO SparkContext: Submitted application: Simple Application
19/01/13 13:00:17 INFO SecurityManager: Changing view acls to: user
19/01/13 13:00:17 INFO SecurityManager: Changing modify acls to: user
19/01/13 13:00:17 INFO SecurityManager: Changing view acls groups to: 
19/01/13 13:00:17 INFO SecurityManager: Changing modify acls groups to: 
19/01/13 13:00:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(user); groups with view permissions: Set(); users  with modify permissions: Set(user); groups with modify permissions: Set()
19/01/13 13:00:18 INFO Utils: Successfully started service 'sparkDriver' on port 39545.
19/01/13 13:00:18 INFO SparkEnv: Registering MapOutputTracker
19/01/13 13:00:18 INFO SparkEnv: Registering BlockManagerMaster
19/01/13 13:00:18 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/01/13 13:00:18 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/01/13 13:00:18 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-2b8e90e1-d987-49ff-a546-5add22bb3631
19/01/13 13:00:18 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
19/01/13 13:00:18 INFO SparkEnv: Registering OutputCommitCoordinator
19/01/13 13:00:18 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/01/13 13:00:18 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.21.0.10:4040
19/01/13 13:00:18 INFO SparkContext: Added JAR file:/app/receiver/building_jar/target/scala-2.11/receiver.jar at spark://172.21.0.10:39545/jars/receiver.jar with timestamp 1547384418299
19/01/13 13:00:18 INFO RMProxy: Connecting to ResourceManager at resourcemanager/172.21.0.8:8032
19/01/13 13:00:18 INFO Client: Requesting a new application from cluster with 1 NodeManagers
19/01/13 13:00:18 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
19/01/13 13:00:18 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
19/01/13 13:00:18 INFO Client: Setting up container launch context for our AM
19/01/13 13:00:18 INFO Client: Setting up the launch environment for our AM container
19/01/13 13:00:18 INFO Client: Preparing resources for our AM container
19/01/13 13:00:19 INFO Client: Uploading resource file:/tmp/spark-44077237-6112-4c8e-870d-7fd0c28875d0/__spark_conf__2212863065927281423.zip -> file:/home/user/.sparkStaging/application_1547384324312_0001/__spark_conf__.zip
19/01/13 13:00:19 INFO SecurityManager: Changing view acls to: user
19/01/13 13:00:19 INFO SecurityManager: Changing modify acls to: user
19/01/13 13:00:19 INFO SecurityManager: Changing view acls groups to: 
19/01/13 13:00:19 INFO SecurityManager: Changing modify acls groups to: 
19/01/13 13:00:19 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(user); groups with view permissions: Set(); users  with modify permissions: Set(user); groups with modify permissions: Set()
19/01/13 13:00:19 INFO Client: Submitting application application_1547384324312_0001 to ResourceManager
19/01/13 13:00:20 INFO YarnClientImpl: Submitted application application_1547384324312_0001
19/01/13 13:00:20 INFO SchedulerExtensionServices: Starting Yarn extension services with app application_1547384324312_0001 and attemptId None
19/01/13 13:00:21 INFO Client: Application report for application_1547384324312_0001 (state: ACCEPTED)
19/01/13 13:00:21 INFO Client: 
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1547384419935
 final status: UNDEFINED
 tracking URL: http://resourcemanager:8088/proxy/application_1547384324312_0001/
 user: user
19/01/13 13:00:22 INFO Client: Application report for application_1547384324312_0001 (state: ACCEPTED)
19/01/13 13:00:23 INFO Client: Application report for application_1547384324312_0001 (state: FAILED)
19/01/13 13:00:23 INFO Client: 
 client token: N/A
 diagnostics: Application application_1547384324312_0001 failed 2 times due to AM Container for appattempt_1547384324312_0001_000002 exited with  exitCode: -1000
For more detailed output, check application tracking page:http://historyserver:8188/applicationhistory/app/application_1547384324312_0001Then, click on links to logs of each attempt.
Diagnostics: File file:/home/user/.sparkStaging/application_1547384324312_0001/__spark_conf__.zip does not exist
java.io.FileNotFoundException: File file:/home/user/.sparkStaging/application_1547384324312_0001/__spark_conf__.zip does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:428)
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:253)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1746)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:62)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

Failing this attempt. Failing the application.
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1547384419935
 final status: FAILED
 tracking URL: http://historyserver:8188/applicationhistory/app/application_1547384324312_0001
 user: user
19/01/13 13:00:23 INFO Client: Deleted staging directory file:/home/user/.sparkStaging/application_1547384324312_0001
19/01/13 13:00:23 ERROR SparkContext: Error initializing SparkContext.
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:85)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:62)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:173)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:509)
at Receiver$.main(receiver.scala:21)
at Receiver.main(receiver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/01/13 13:00:23 INFO SparkUI: Stopped Spark web UI at http://172.21.0.10:4040
19/01/13 13:00:23 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!
19/01/13 13:00:23 INFO YarnClientSchedulerBackend: Shutting down all executors
19/01/13 13:00:23 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
19/01/13 13:00:23 INFO SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
 services=List(),
 started=false)
19/01/13 13:00:23 INFO YarnClientSchedulerBackend: Stopped
19/01/13 13:00:23 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/01/13 13:00:23 INFO MemoryStore: MemoryStore cleared
19/01/13 13:00:23 INFO BlockManager: BlockManager stopped
19/01/13 13:00:23 INFO BlockManagerMaster: BlockManagerMaster stopped
19/01/13 13:00:23 WARN MetricsSystem: Stopping a MetricsSystem that is not running
19/01/13 13:00:23 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/01/13 13:00:23 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master.
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:85)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:62)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:173)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:509)
at Receiver$.main(receiver.scala:21)
at Receiver.main(receiver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/01/13 13:00:23 INFO ShutdownHookManager: Shutdown hook called
19/01/13 13:00:23 INFO ShutdownHookManager: Deleting directory /tmp/spark-44077237-6112-4c8e-870d-7fd0c28875d0

Должен ли я добавить некоторые дополнительные параметры сейчас? Сообщение об ошибке вводит в заблуждение, так как сказано, что файл .zip не существует, хотя он создан ранее. Я провел небольшое исследование и попробовал следующие решения:

  • установить HADOOP / YARN_CONF_DIR, но, возможно, они не правы? Каталоги внутри контейнеров не совпадают с локальными, что делает все более сложным. Я указал HADOOP_CONF_DIR на каталог / spark / jars /, потому что это единственный каталог, где находится "hadoop":

/spark/jars hadoop-files

  • Я пытался добавить ".saveAsTextFile (" hdfs .. ") в мой скрипт, но опять же я не понимаю, как это получить доступ к hdfs, когда он не в том же контейнере?

  • Я устанавливаю Мастер на «пряжу» в сценарии, и я передаю его в качестве аргумента при отправке ..

Может быть, я не правильно понимаю структуру контейнера bde2020, потому что нигде не вижу примеров спарк-отправки. Каждый пример находится либо на одной машине с пряжей, либо внутри контейнера, но выполняется локально, или кто-то запрашивает что-то, но не делится своим кодом, поэтому я не могу проверить, понял ли я что-то неправильно ..

Так может кто-нибудь объяснить, КАК я могу подать свое искровое заявление с пряжей в Elasticsearch (с докером)?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...