Как получить подробную информацию об этапах и задачах Spark - PullRequest
0 голосов
/ 22 января 2019

Я установил кластер Apache Spark с мастером и одним рабочим, и я использую Python с Spyder в качестве IDE. Пока все работает нормально, но мне нужна подробная информация о распределении задач в кластере. Я знаю, что есть Spark Web UI, но я хотел бы, чтобы информация была непосредственно в моей консоли Spyder. Таким образом, я имею в виду, какая часть моего кода / сценария выполняется каким Worker / Master. Я думаю, что с пакетом python "socket" и socket.gethostname () должно быть возможно получить больше информации. Я действительно с нетерпением жду помощи. Вот мой код:

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import matplotlib.pyplot as plt
from datetime import datetime
from pyspark.sql.functions import udf
from datetime import datetime
import pyspark.sql.functions as F

#spark = SparkSession \
#    .builder \
#    .appName('weather_data') \
#    .getOrCreate()


spark = SparkSession \
   .builder \
   .appName("weather_data_u") \
   .master('master_ip@...')\
   .getOrCreate()

data.show()
data.printSchema()

data_selected = data\
        .select(data['Date'],
                data['TemperatureHighC'],
                data['TemperatureAvgC'],
                data['TemperatureLowC'],
                data['DewpointHighC'],
                data['DewpointAvgC'],
                data['DewpointLowC'],
                data['HumidityAvg'],
                data['WindSpeedMaxKMH'],
                data['WindSpeedAvgKMH'],
                data['GustSpeedMaxKMH'],
                data['PrecipitationSumCM'])

data_selected.printSchema()
data_selected.show()


f = udf(lambda row: datetime.strptime(row, '%Y-%m-%d'), TimestampType())

data_selected = data_selected\
        .withColumn('date', f(data['Date'].cast(StringType())))\
        .withColumn('t_max', data['TemperatureHighC'].cast(DoubleType()))\
        .withColumn('t_mean', data['TemperatureAvgC'].cast(DoubleType()))\
        .withColumn('t_min', data['TemperatureLowC'].cast(DoubleType()))\
        .withColumn('dew_max', data['DewpointHighC'].cast(DoubleType()))\
        .withColumn('dew_mean', data['DewpointAvgC'].cast(DoubleType()))\
        .withColumn('dew_min', data['DewpointLowC'].cast(DoubleType()))\
        .cache()

 data_selected.show()

t_mean_calculated = data_selected\
.groupBy(F.date_format(data_selected.date, 'M'))\
.agg(F.mean(data_selected.t_max))\
.orderBy('date_format(date, M)')

t_mean_calculated = t_mean_calculated\
.withColumn('month', t_mean_calculated['date_format(date, M)'].cast(IntegerType()))\
.withColumnRenamed('avg(t_max)', 't_max_month')\
.orderBy('month')\
.drop(t_mean_calculated['date_format(date, M)'])\
.select('month', 't_max_month')

t_mean_calculated = t_mean_calculated.collect()

1 Ответ

0 голосов
/ 22 января 2019

Как сообщил @ Jacek Laskowski , вы можете использовать Spark - Core локальные свойства для изменения имя работы в веб-интерфейс

  • callSite.short
  • callSite.long

ДляНапример, мое Spark -приложение синхронизирует несколько MySQL таблиц с S3, и я установил

spark.sparkContext.setLocalProperty("callSite.short", currentTableName)

, чтобы отобразить текущее имя таблицы в web-ui

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...