Только один поток выполняет параллельный SQL-запрос с PySpark, используя многопроцессорный пул - PullRequest
0 голосов
/ 21 ноября 2018

У меня есть случай, когда я использую PySpark (или Spark, если я не могу сделать это с Python и вместо этого мне нужно использовать Scala или Java) для извлечения данных из нескольких сотен таблиц базы данных, в которых отсутствуют первичные ключи.(Почему Oracle когда-либо будет создавать продукт ERP, содержащий таблицы с первичными ключами, это другая тема ... но независимо от того, нам нужно иметь возможность извлекать данные и сохранять данные из каждой таблицы базы данных в файл Parquet.) Я изначальнопопытался использовать Sqoop вместо PySpark, но из-за ряда проблем, с которыми мы столкнулись, было больше смысла использовать вместо этого PySpark / Spark.

В идеале мне бы хотелось, чтобы каждый узел задачи был в моем вычислительном кластере: взять имя таблицы, запросить эту таблицу из базы данных и сохранить эту таблицу в виде файла Parquet (или набора файлов Parquet).) в S3.Мой первый шаг - заставить его работать локально в автономном режиме.(Если бы у меня был первичный ключ для каждой данной таблицы, то я мог бы разделить процесс сохранения запросов и файлов по различным наборам строк для данной таблицы и распределить разделы строк по узлам задачи в вычислительном кластере, чтобы выполнить операцию сохранения файлапараллельно, но из-за того, что продукт Oracle ERP не имеет первичных ключей для соответствующих таблиц, это не вариант.)

Я могу успешно запрашивать целевую базу данных с помощью PySpark, и я могу успешно сохранитьданные в файл паркета с многопоточностью, но по какой-то причине только один поток что-то делает . Итак, что происходит, так это то, что только один поток принимает tableName, запрашивает базу данных и сохраняет файл в желаемом каталоге как файл Parquet. Затем задание заканчивается, как будто никакие другие потоки не выполнялись.Я предполагаю, что может иметь место какая-то проблема блокировки.Если я правильно понял комментарии здесь: Как запустить несколько заданий в одном Sparkcontext из отдельных потоков в PySpark? , тогда то, что я пытаюсь сделать, должно быть возможным, если нет особых проблем, связанных с выполнением параллельного JDBC SQLзапросы.

Редактировать : Я специально ищу способ, позволяющий мне использовать пул потоков какого-либо типа, чтобы мне не нужно было вручную создавать поток для каждого изтаблицы, которые мне нужно обработать и вручную распределить их между узлами задач в моем кластере.

Даже когда я попытался установить:

--master local[*]

и

--conf 'spark.scheduler.mode=FAIR'

проблема осталась.

Кроме того, чтобы кратко объяснить мой код, мне нужно было использовать собственный драйвер JDBC, и я запускаю код в записной книжке Jupyter в Windows, поэтому я использую обходной путь, чтобы PySpark запускался справильные параметры.(Кстати, я ничего не имею против других операционных систем, но моя машина с Windows - моя самая быстрая рабочая станция, поэтому я ее и использую.)

Вот мои настройки:

driverPath = r'C:\src\NetSuiteJDBC\NQjc.jar'
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--driver-class-path '{0}' --jars '{0}' --master local[*] --conf 'spark.scheduler.mode=FAIR' --conf 'spark.scheduler.allocation.file=C:\\src\\PySparkConfigs\\fairscheduler.xml' pyspark-shell".format(driverPath)
)

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Column, Row, SQLContext
from pyspark.sql.functions import col, split, regexp_replace, when
from pyspark.sql.types import ArrayType, IntegerType, StringType

spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
sc = SparkContext.getOrCreate()

Затем, чтобы протестировать многопроцессорность, я создал файл sparkMethods.py в каталоге, где я запускаю свой блокнот Jupyter, и поместил в него этот метод:

def testMe(x):
    return x*x

Когда я запускаю:

from multiprocessing import Pool
import sparkMethods

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    # print "[0, 1, 4,..., 81]"
    print(pool.map(sparkMethods.testMe, range(10)))

в моем ноутбуке Jupyter я получаю ожидаемый результат:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Теперь, перед тем, как кто-то оскорбляет то, как я написал следующий метод, пожалуйста, знайте, что я сначала попытался передать контекст искры через замыкание, а затем столкнулся с ошибкой Pickling, как описано здесь: Я могу "«Выбирать локальные объекты», если я использую производный класс? Итак, я включил весь контекст Spark в следующий метод, который я поместил в файл sparkMethods.py (по крайней мере, пока не найду лучший способ).Причина, по которой я поместил методы во внешний файл (вместо того, чтобы включать их только в блокнот Jupyter), заключался в том, чтобы решить эту проблему: https://bugs.python.org/issue25053, как обсуждалось здесь: Пример многопроцессорной обработки, выдающий AttributeError издесь: многопроцессорная обработка python: AttributeError: Не удается получить атрибут "abc"

Это тот метод, который содержит логику для установления соединения JDBC:

# In sparkMethods.py file:
def getAndSaveTableInPySpark(tableName):
    import os
    import os.path
    from pyspark.sql import SparkSession, SQLContext
    spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
    spark.sparkContext.setLogLevel("INFO")
    spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")

    jdbcDF = spark.read \
        .format("jdbc") \
        .option("url", "OURCONNECTIONURL;") \
        .option("driver", "com.netsuite.jdbc.openaccess.OpenAccessDriver") \
        .option("dbtable", tableName) \
        .option("user", "USERNAME") \
        .option("password", "PASSWORD") \
        .load()

    filePath = "C:\\src\\NetsuiteSparkProject\\" + tableName + "\\" + tableName + ".parquet"
    jdbcDF.write.parquet(filePath)
    fileExists = os.path.exists(filePath)
    if(fileExists):
        return (filePath + " exists!")
    else:
        return (filePath + " could not be written!")

Затем, возвращаясь в свой блокнот Jupyter, я запускаю:

import sparkMethods
from multiprocessing import Pool

if __name__ == '__main__':
    with Pool(5) as p:
        p.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)

проблема в том, что кажется, что выполняется только один поток.

Когда я выполняю его, в выводе консоли я вижу, что он включает это изначально:

Процесс не может получить доступфайл, потому что он используется другим процессом.Системе не удается найти файл C: \ Users \ DEVIN ~ 1.BOS \ AppData \ Local \ Temp \ spark-class-launcher-output-3662.txt.,,,

, что заставляет меня заподозрить, что, возможно, происходит какой-то тип блокировки.

Независимо от того, один из потоков всегда будет успешно завершен, успешно запросит соответствующую таблицу и сохранит ее в файле Parquet по желанию.Существует некоторый недетерминизм в процессе, потому что разные исполнения приводят к тому, что другой поток выигрывает гонку и, следовательно, обрабатывает другую таблицу.Интересно, что выполняется только одно задание, как показано в интерфейсе Spark: Spark UI picture that shows that only one Spark Job was executed Тем не менее, статья здесь: https://medium.com/@rbahaguejr/threaded-tasks-in-pyspark-jobs-d5279844dac0 подразумевает, что я должен ожидать увидеть несколько заданий вSpark UI, если они были успешно запущены.

Теперь, если проблема в том, что PySpark на самом деле не способен выполнять несколько запросов JDBC параллельно для разных узлов задач, то, возможно, мое решение будет состоять в том, чтобы использовать пул соединений JDBC или даже просто открыть соединение для каждогостол (пока я закрываю соединение в конце потока).Когда я получал список таблиц для обработки, мне удалось подключиться к базе данных через библиотеку jaydebeapi, например:

import jaydebeapi
conn = jaydebeapi.connect("com.netsuite.jdbc.openaccess.OpenAccessDriver",  
                          "OURCONNECTIONURL;", 
                          ["USERNAME", "PASSWORD"], 
                          r"C:\src\NetSuiteJDBC\NQjc.jar")

top5Tables = list(pd.read_sql("SELECT TOP 5 TABLE_NAME FROM OA_TABLES WHERE TABLE_OWNER != 'SYSTEM';", conn)["TABLE_NAME"].values)
conn.close()
top5Tables

Вывод:

['SALES_TERRITORY_PLAN_PARTNER',
 'WORK_ORDER_SCHOOLS_TO_INSTALL_MAP',
 'ITEM_ACCOUNT_MAP',
 'PRODUCT_TRIAL_STATUS',
 'ACCOUNT_PERIOD_ACTIVITY']

Так что, возможно, еслипроблема в том, что PySpark нельзя использовать для распределения нескольких запросов по узлам задач, как это, тогда, возможно, я могу использовать библиотеку jaydebeapi, чтобы установить соединение.Однако в этом случае мне все еще нужен был бы способ записать выходные данные SQL-запроса JDBC в файл Parquet (который в идеале использовал бы возможность вывода схемы Spark), но я готов принять этот подход, еслиэто выполнимо.

Итак, как мне успешно выполнить запрос к базе данных и сохранить выходные данные в файлы Parquet параллельно (т. е. распределенные по узлам задачи) без того, чтобы мастер-узел выполнял все запросы последовательно?

1 Ответ

0 голосов
/ 21 ноября 2018

С некоторыми подсказками, предоставленными комментариями в ответ на мой вопрос, а также ответом здесь: Как параллельно запускать независимые преобразования с использованием PySpark? Я исследовал использование многопоточности вместо многопроцессорной обработки.Я более внимательно посмотрел на один из ответов здесь: Как запустить несколько заданий в одном Sparkcontext из отдельных потоков в PySpark? и заметил использование:

from multiprocessing.pool import ThreadPool

Я былв состоянии заставить это работать, как это:

from multiprocessing.pool import ThreadPool
pool = ThreadPool(5)
results = pool.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)
pool.close() 
pool.join() 
print(*results, sep='\n')

, который печатает:

C:\src\NetsuiteSparkProject\SALES_TERRITORY_PLAN_PARTNER\SALES_TERRITORY_PLAN_PARTNER.parquet exists!
C:\src\NetsuiteSparkProject\WORK_ORDER_SCHOOLS_TO_INSTALL_MAP\WORK_ORDER_SCHOOLS_TO_INSTALL_MAP.parquet exists!
C:\src\NetsuiteSparkProject\ITEM_ACCOUNT_MAP\ITEM_ACCOUNT_MAP.parquet exists!
C:\src\NetsuiteSparkProject\PRODUCT_TRIAL_STATUS\PRODUCT_TRIAL_STATUS.parquet exists!
C:\src\NetsuiteSparkProject\ACCOUNT_PERIOD_ACTIVITY\ACCOUNT_PERIOD_ACTIVITY.parquet exists!
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...