Как правильно получить доступ к SparkSession от работника? - PullRequest
1 голос
/ 06 июля 2019

Я только что понял, что много раз вызываю следующий код, и это кажется неправильным:

spark = SparkSession.builder.getOrCreate()

Некоторые шаги моего кода выполняются в рабочем контексте. Таким образом, сеанс искры, созданный в то время как в драйвере, не доступен для рабочего.

Я знаю, что метод getOrCreate () проверяет, есть ли какой-либо глобальный сеанс, доступный для использования, поэтому он не всегда может создавать новый, но это заставляет меня снова и снова запрашивать сеанс spark.

Я проверил и увидел людей, отправляющих сеанс в качестве аргумента функций UDF или foreach, но не смог найти много об этом.

Итак, как правильно получить доступ к искре, находясь внутри рабочего?

РЕДАКТИРОВАТЬ: Добавлен мой вариант использования ниже / Изменены детали шагов

Может быть, мой вариант использования станет понятнее из приведенного ниже списка:

 1. Get data from eventhub. 
 2. Save data to delta table
 3. Query distinct IDs
 4. Foreach ID
  4.1. Query other database to get info about the body based on the ID
  4.2. For each row using UDF function (CSV)
   4.2.1. Transform csv into dataframe and return list of tuples
  4.3. Merge all dataframes using flatMap on the rows
  4.4. Write to somewhere

Я получаю сообщения от концентратора событий, и каждое сообщение имеет тело CSV и идентификатор.

Каждое сообщение может полностью отличаться от другого, и если да, то, в конце концов, я собираюсь сохранить каждое сообщение в другой таблице DW.

Итак, для этого я выбрал следующую стратегию:

Во-первых, сохраните все тело и идентификаторы CSV в общей таблице Delta, точно так же, как они пришли (я делю по идентификатору)

Теперь я могу запрашивать все данные, связанные с каждым идентификатором, по одному, и это позволяет обрабатывать все данные, связанные с этим идентификатором, в одном пакете.

Когда я запрашиваю все данные тела определенного идентификатора, у меня есть X строк, и мне нужно перебирать их, преобразовывая тело CSV каждой строки в Dataframe.

После этого я объединяю все кадры данных в один и сохраняю их в нужную таблицу в DW.

Для каждого идентификатора dinstinct я использую spark для получения информации о теле, и каждое чтение CSV или запись в DW уже выполняется изнутри рабочего.

РЕДАКТИРОВАТЬ: Добавлен код для людей

4 Foreach ID

# dfSdIds is a dataframe containing all distinct ids that I want to iterate over
dfSdIds.rdd.foreach(SaveAggregatedBodyRows)

4.2 Для каждой строки с использованием функции UDF (CSV)

# mapping: is a json structure that is going to generate the dataframe schema of the CSV inside the udf function
# argSchema: is the expected udf returning structure ArrayType(StructType(...))
def SaveAggregatedBodyRows(row): 

...

spark = SparkSession.builder.getOrCreate()
dfCsvBody = spark.sql('select body from delta.`/dbfs/' + allInOneLocation + '` where SdIds = {}'.format(sdid))

UdfConvertCSVToDF = udf(lambda body, mapping: ConvertCSVToDF(body, mapping), argSchema)
dfConvertedBody = dfCsvBody.withColumn('body', UdfConvertCSVToDF(dfCsvBody.body, lit(mapping)))

4.2.1 Преобразовать CSV в dataframe и вернуть список кортежей

def ConvertCSVToDF(body, mapping): 

...

spark = SparkSession.builder.getOrCreate()           
csvData = spark.sparkContext.parallelize(splittedBody)

df = (spark.read
.option("header", True)
.option("delimiter", delimiter)
.option("quote", quote)
.option("nullValue", nullValue)
.schema(schema)
.csv(csvData))

return list(map(tuple, df.select('*').collect()))

4.3 Объединение всех фреймов данных с помощью flatMap в строках

# mapSchema is the same as argSchema but without ArrayType
flatRdd = dfConvertedBody.rdd.flatMap(lambda x: x).flatMap(lambda x: x)      
dfMerged = flatRdd.toDF(mapSchema)

4.4 Пишите куда-нибудь

(dfMerged.write
   .format(savingFileFormat)
   .mode("append")
   .option("checkpointLocation", checkpointLocation)
   .save(tableLocation)) 

Я знаю, что в этом коде есть что улучшить, но я делаю так, как изучаю pyspark.

Этот вопрос стал намного больше, чем я ожидал, но суть в том, что я позвонил

spark = SparkSession.builder.getOrCreate() 

в драйвере, внутри метода SaveAggregatedBodyRows И внутри метода ConvertCSVToDF.

Люди говорили, что это не сработает, но это так.

1 Ответ

0 голосов
/ 06 июля 2019

Интересны первые 3 отзыва, но это не парадигма Искры. Похоже, вам нужно разделить. Понятно, но это параллельное программирование.

http://www.informit.com/articles/article.aspx?p=2928186 можно проконсультироваться.

...