Spark на Fargate не может найти локальный IP - PullRequest
0 голосов
/ 31 мая 2018

У меня есть задание на сборку, которое я пытаюсь настроить в кластере AWS Fargate из 1 узла.Когда я пытаюсь запустить Spark для построения моих данных, я получаю сообщение об ошибке, в котором говорится, что Java не может найти «localHost».

Я настроил конфигурацию, запустив скрипт, который добавляет spark-env.sh file, обновляет файл /etc/hosts и обновляет файл spark-defaults.conf.

В файле $SPARK_HOME/conf/spark-env.sh я добавляю:

  • SPARK_LOCAL_IP
  • SPARK_MASTER_HOST

В $SPARK_HOME/conf/spark-defaults.conf

  • spark.jars.packages <comma separated jars>
  • spark.master <ip or URL>
  • spark.driver.bindAddress <IP or URL>
  • spark.driver.host <IP or URL>

В файле /etc/hosts я добавляю:

  • <IP I get from http://169.254.170.2/v2/metadata> master

Вызов *Сценарий 1044 * путем передачи аргумента -master <IP or URL> с IP-адресом или URL-адресом, похоже, не помогает.

Я пытался использовать варианты local[*], spark://<ip from metadata>:<port from metadata>, <ip> и <ip>:<port>, но безрезультатно.Использование 127.0.0.1 и localhost, похоже, не имеет значения, по сравнению с использованием таких вещей, как master и IP-адрес, возвращаемый из метаданных.

На стороне AWS кластер Fargate работает вчастная подсеть с присоединенным NatGateway, поэтому, насколько я могу судить, она имеет исходящие и входящие сетевые маршруты.Я попытался использовать общедоступную сеть и ENABLED настроить для ECS автоматическое присоединение общедоступного IP-адреса к контейнеру.Все стандартные порты из документов Spark также открываются в контейнере.

Кажется, что он работает нормально до точки, в которой он пытается собрать свой собственный IP.

Ошибка, котораяЯ получил обратно, в стеке:

spark.jars.packages com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2
spark.master spark://10.0.41.190:7077
Spark Command: /docker-java-home/bin/java -cp /usr/spark/conf/:/usr/spark/jars/* -Xmx1gg org.apache.spark.deploy.SparkSubmit --master spark://10.0.41.190:7077 --verbose --jars lib/RedshiftJDBC42-1.2.12.1017.jar --packages org.apache.hadoop:hadoop-aws:2.7.3,com.amazonaws:aws-java-sdk:1.7.4,com.upplication:s3fs:2.2.1 ./build_phase.py
========================================
Using properties file: /usr/spark/conf/spark-defaults.conf
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.spark.util.Utils$.redact(Utils.scala:2653)
at org.apache.spark.deploy.SparkSubmitArguments$$anonfun$defaultSparkProperties$1.apply(SparkSubmitArguments.scala:93)
at org.apache.spark.deploy.SparkSubmitArguments$$anonfun$defaultSparkProperties$1.apply(SparkSubmitArguments.scala:86)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.deploy.SparkSubmitArguments.defaultSparkProperties$lzycompute(SparkSubmitArguments.scala:86)
at org.apache.spark.deploy.SparkSubmitArguments.defaultSparkProperties(SparkSubmitArguments.scala:82)
at org.apache.spark.deploy.SparkSubmitArguments.mergeDefaultSparkProperties(SparkSubmitArguments.scala:126)
at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:110)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.UnknownHostException: d4771b650361: d4771b650361: Name or service not known
at java.net.InetAddress.getLocalHost(InetAddress.java:1505)
at org.apache.spark.util.Utils$.findLocalInetAddress(Utils.scala:891)
at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$localIpAddress$lzycompute(Utils.scala:884)
at org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$localIpAddress(Utils.scala:884)
at org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:941)
at org.apache.spark.util.Utils$$anonfun$localHostName$1.apply(Utils.scala:941)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.util.Utils$.localHostName(Utils.scala:941)
at org.apache.spark.internal.config.package$.<init>(package.scala:204)
at org.apache.spark.internal.config.package$.<clinit>(package.scala)
... 10 more

Контейнер не имеет проблем при попытке запустить локально, поэтому я думаю, что это как-то связано с природой Fargate.

ЛюбойПомощь или указатели будут очень признательны!

Редактировать

С момента поста я пробовал несколько разных вещей.Я использую образы, которые работают с Spark 2.3, Hadoop 2.7 и Python 3, и я попытался добавить пакеты ОС и различные варианты конфигурации, о которых я уже упоминал.

Все это пахнет, как будто я делаю spark-defaults.conf и друзья не правы, но я настолько новичок в этом, что это может быть просто плохое выравнивание Юпитера и Марса ...

Текущая трассировка стека:

    Getting Spark Context...
    2018-06-08 22:39:40 INFO  SparkContext:54 - Running Spark version 2.3.0
    2018-06-08 22:39:40 INFO  SparkContext:54 - Submitted application: SmashPlanner
    2018-06-08 22:39:41 INFO  SecurityManager:54 - Changing view acls to: root
    2018-06-08 22:39:41 INFO  SecurityManager:54 - Changing modify acls to: root
    2018-06-08 22:39:41 INFO  SecurityManager:54 - Changing view acls groups to:
    2018-06-08 22:39:41 INFO  SecurityManager:54 - Changing modify acls groups to:
    2018-06-08 22:39:41 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
    2018-06-08 22:39:41 ERROR SparkContext:91 - Error initializing SparkContext.
    java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:101)
        at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:218)
        at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1283)
        at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
        at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
        at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:989)
        at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
        at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:364)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)
    2018-06-08 22:39:41 INFO  SparkContext:54 - Successfully stopped SparkContext
    Traceback (most recent call last):
      File "/usr/local/smash_planner/build_phase.py", line 13, in <module>
        main()
      File "/usr/local/smash_planner/build_phase.py", line 9, in main
        build_all_data(pred_date)
      File "/usr/local/smash_planner/DataPiping/build_data.py", line 25, in build_all_data
        save_keyword(pred_date)
      File "/usr/local/smash_planner/DataPiping/build_data.py", line 52, in save_keyword
        df = get_dataframe(query)
      File "/usr/local/smash_planner/SparkUtil/data_piping.py", line 15, in get_dataframe
        sc = SparkCtx.get_sparkCtx()
      File "/usr/local/smash_planner/SparkUtil/context.py", line 20, in get_sparkCtx
        sc = SparkContext(conf=conf).getOrCreate()
      File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 118, in __init__
      File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 180, in _do_init
      File "/usr/spark-2.3.0/python/lib/pyspark.zip/pyspark/context.py", line 270, in _initialize_context
      File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.6-py3.4.egg/py4j/java_gateway.py", line 1428, in __call__
        answer, self._gateway_client, None, self._fqn)
      File "/usr/local/lib/python3.4/dist-packages/py4j-0.10.6-py3.4.egg/py4j/protocol.py", line 320, in get_return_value
        format(target_id, ".", name), value)
    py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
    : java.nio.channels.UnresolvedAddressException
        at sun.nio.ch.Net.checkAddress(Net.java:101)
        at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:218)
        at io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:128)
        at io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:558)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1283)
        at io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:501)
        at io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:486)
        at io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:989)
        at io.netty.channel.AbstractChannel.bind(AbstractChannel.java:254)
        at io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:364)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)

    2018-06-08 22:39:41 INFO  ShutdownHookManager:54 - Shutdown hook called
    2018-06-08 22:39:41 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-80488ba8-2367-4fa6-8bb7-194b5ebf08cc
    Traceback (most recent call last):
      File "bin/smash_planner.py", line 76, in <module>
        raise RuntimeError("Spark hated your config and/or invocation...")
    RuntimeError: Spark hated your config and/or invocation...

SparkConf runtimeконфигурация:

def get_dataframe(query):
    ...
    sc = SparkCtx.get_sparkCtx()
    sql_context = SQLContext(sc)

    df = sql_context.read \
        .format("jdbc") \
        .option("driver", "com.amazon.redshift.jdbc42.Driver") \
        .option("url", os.getenv('JDBC_URL')) \
        .option("user", os.getenv('REDSHIFT_USER')) \
        .option("password", os.getenv('REDSHIFT_PASSWORD')) \
        .option("dbtable", "( " + query + " ) tmp ") \
        .load()

    return df

Редактировать 2

Используя только конфигурацию spark-env и работать со значениями по умолчанию от gettyimages / docker-spark Изображение выдает эту ошибку в браузере.

java.util.NoSuchElementException
at java.util.Collections$EmptyIterator.next(Collections.java:4189)
at org.apache.spark.util.kvstore.InMemoryStore$InMemoryIterator.next(InMemoryStore.java:281)
at org.apache.spark.status.AppStatusStore.applicationInfo(AppStatusStore.scala:38)
at org.apache.spark.ui.jobs.AllJobsPage.render(AllJobsPage.scala:273)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:82)
at org.apache.spark.ui.JettyUtils$$anon$3.doGet(JettyUtils.scala:90)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.spark_project.jetty.servlet.ServletHolder.handle(ServletHolder.java:848)
at org.spark_project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:584)
at org.spark_project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1180)
at org.spark_project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:512)
at org.spark_project.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1112)
at org.spark_project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.spark_project.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:493)
at org.spark_project.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:213)
at org.spark_project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134)
at org.spark_project.jetty.server.Server.handle(Server.java:534)
at org.spark_project.jetty.server.HttpChannel.handle(HttpChannel.java:320)
at org.spark_project.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at org.spark_project.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283)
at org.spark_project.jetty.io.FillInterest.fillable(FillInterest.java:108)
at org.spark_project.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148)
at org.spark_project.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136)
at org.spark_project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671)
at org.spark_project.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589)
at java.lang.Thread.run(Thread.java:748)

Ответы [ 2 ]

0 голосов
/ 24 июня 2018

Решение состоит в том, чтобы избежать пользовательской ошибки ...

Это была полная ситуация с лицом-ладонью, но я надеюсь, что мое неправильное понимание системы Spark может помочь какому-то бедному дураку, такому как я, который потратил слишком многоВремя, затраченное на проблему того же типа.

Ответ на последнюю итерацию (gettyimages/docker-spark Образ Docker) был в том, что я пытался запустить команду spark-submit без запуска мастера или рабочих (ий).,В репозитории gettyimages/docker-spark вы можете найти файл docker-compose, который показывает, что он создает узлы master и worker до того, как какая-либо работа с искрой будет выполнена.То, как образ создает мастера или работника, заключается в использовании сценария spark-class и передаче класса org.apache.spark.deploy.<master|worker>.<Master|Worker> соответственно.

Итак, собрав все вместе, я могу использовать конфигурацию, которую использовално я должен сначала создать master и worker(s), а затем выполнить команду spark-submit так же, как я уже делал.

Это быстрая и грязная реализация, хотя я гарантирую, что естьлучше, если это делают люди, которые действительно знают, что делают:

Первые 3 шага выполняются в сценарии загрузки кластера.Я делаю это в AWS Lambda, запускаемом APIGateway

  1. , создавая кластер и очередь или какую-то систему брокерских сообщений, например, zookeeper / kafka.(Для этого я использую API-шлюз -> лямбда)
  2. выберите главный узел (логика в лямбде)
  3. создайте сообщение с некоторой базовой информацией, такой как IP-адрес или домен мастераи поместите его в очередь из шага 1 (происходит в лямбде)

Все, что ниже, происходит в сценарии запуска на узлах Spark

создайте шаг в сценарии запуска, в котором узлы проверяют очередь на наличие сообщения из шага 3 , добавьте SPARK_MASTER_HOST и SPARK_LOCAL_IP в файл $SPARK_HOME/conf/spark-env.sh, используя информацию из сообщениявы подобрали на шаге 4 добавление spark.driver.bindAddress к файлу $SPARK_HOME/conf/spark-defaults.conf, используя информацию из сообщения, которое вы подобрали на шаге 4 , используйте некоторую логику в скрипте запуска, чтобы принять решение«Этот» узел является мастером или рабочим Запустите мастер или работника.в образе gettyimages/docker-spark вы можете запустить мастер с $SPARK_HOME/bin/spark-class org.apache.spark.deploy.master.Master -h <the master's IP or domain> и запустить работника с $SPARK_HOME/bin/spark-class org.apache.spark.deploy.worker.Worker -h spark://<master's domain or IP>:7077 Теперь вы можете запустить команду spark-submit, которая развернет работу в кластере.

Редактировать: (некоторый код для справки) Это дополнение к лямбде

def handler(event, context):
    config = BuildConfig(event)
    res = create_job(config)
    return build_response(res)

и после редактирования

def handler(event, context):
    config = BuildConfig(event)
    coordination_queue = config.cluster + '-coordination'

    sqs = boto3.client('sqs')
    message_for_master_node = {'type': 'master', 'count': config.count}
    queue_urls = sqs.list_queues(QueueNamePrefix=coordination_queue)['QueueUrls']

    if not queue_urls:
        queue_url = sqs.create_queue(QueueName=coordination_queue)['QueueUrl']
    else:
        queue_url = queue_urls[0]

     sqs.send_message(QueueUrl=queue_url,
                 MessageBody=message_for_master_node)

    res = create_job(config)
    return build_response(res)

, а затем янемного добавил в скрипт, который запускаются узлы в кластере Spark, при запуске:

# addition to the "main" in the Spark node's startup script
sqs = boto3.client('sqs')
boot_info_message = sqs.receive_message(
    QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
    MaxNumberOfMessages=1)['Messages'][0]
boot_info = boot_info_message['Body']
message_for_worker = {'type': 'worker', 'master': self_url}

if boot_info['type'] == 'master':
    for i in range(int(boot_info['count'])):
        sqs.send_message(QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
                         MessageBody=message_for_worker)
sqs.delete_message(QueueUrl=os.getenv('COORDINATIN_QUEUE_URL'),
                   ReceiptHandle=boot_info_message['ReceiptHandle'])

...
# starts a master or worker node
startup_command = "org.apache.spark.deploy.{}.{}".format(
    boot_info['type'], boot_info['type'].title())
subprocess.call(startup_command)
0 голосов
/ 09 июня 2018

Перейдите в консоль AWS и в конфигурации вашей группы безопасности разрешите весь входящий трафик в экземпляр.

...