У меня есть случай, когда я использую PySpark (или Spark, если я не могу сделать это с Python и вместо этого мне нужно использовать Scala или Java) для извлечения данных из нескольких сотен таблиц базы данных, в которых отсутствуют первичные ключи.(Почему Oracle когда-либо будет создавать продукт ERP, содержащий таблицы с первичными ключами, это другая тема ... но независимо от того, нам нужно иметь возможность извлекать данные и сохранять данные из каждой таблицы базы данных в файл Parquet.) Я изначальнопопытался использовать Sqoop вместо PySpark, но из-за ряда проблем, с которыми мы столкнулись, было больше смысла использовать вместо этого PySpark / Spark.
В идеале мне бы хотелось, чтобы каждый узел задачи был в моем вычислительном кластере: взять имя таблицы, запросить эту таблицу из базы данных и сохранить эту таблицу в виде файла Parquet (или набора файлов Parquet).) в S3.Мой первый шаг - заставить его работать локально в автономном режиме.(Если бы у меня был первичный ключ для каждой данной таблицы, то я мог бы разделить процесс сохранения запросов и файлов по различным наборам строк для данной таблицы и распределить разделы строк по узлам задачи в вычислительном кластере, чтобы выполнить операцию сохранения файлапараллельно, но из-за того, что продукт Oracle ERP не имеет первичных ключей для соответствующих таблиц, это не вариант.)
Я могу успешно запрашивать целевую базу данных с помощью PySpark, и я могу успешно сохранитьданные в файл паркета с многопоточностью, но по какой-то причине только один поток что-то делает . Итак, что происходит, так это то, что только один поток принимает tableName, запрашивает базу данных и сохраняет файл в желаемом каталоге как файл Parquet. Затем задание заканчивается, как будто никакие другие потоки не выполнялись.Я предполагаю, что может иметь место какая-то проблема блокировки.Если я правильно понял комментарии здесь: Как запустить несколько заданий в одном Sparkcontext из отдельных потоков в PySpark? , тогда то, что я пытаюсь сделать, должно быть возможным, если нет особых проблем, связанных с выполнением параллельного JDBC SQLзапросы.
Редактировать : Я специально ищу способ, позволяющий мне использовать пул потоков какого-либо типа, чтобы мне не нужно было вручную создавать поток для каждого изтаблицы, которые мне нужно обработать и вручную распределить их между узлами задач в моем кластере.
Даже когда я попытался установить:
--master local[*]
и
--conf 'spark.scheduler.mode=FAIR'
проблема осталась.
Кроме того, чтобы кратко объяснить мой код, мне нужно было использовать собственный драйвер JDBC, и я запускаю код в записной книжке Jupyter в Windows, поэтому я использую обходной путь, чтобы PySpark запускался справильные параметры.(Кстати, я ничего не имею против других операционных систем, но моя машина с Windows - моя самая быстрая рабочая станция, поэтому я ее и использую.)
Вот мои настройки:
driverPath = r'C:\src\NetSuiteJDBC\NQjc.jar'
os.environ["PYSPARK_SUBMIT_ARGS"] = (
"--driver-class-path '{0}' --jars '{0}' --master local[*] --conf 'spark.scheduler.mode=FAIR' --conf 'spark.scheduler.allocation.file=C:\\src\\PySparkConfigs\\fairscheduler.xml' pyspark-shell".format(driverPath)
)
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Column, Row, SQLContext
from pyspark.sql.functions import col, split, regexp_replace, when
from pyspark.sql.types import ArrayType, IntegerType, StringType
spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
sc = SparkContext.getOrCreate()
Затем, чтобы протестировать многопроцессорность, я создал файл sparkMethods.py в каталоге, где я запускаю свой блокнот Jupyter, и поместил в него этот метод:
def testMe(x):
return x*x
Когда я запускаю:
from multiprocessing import Pool
import sparkMethods
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
# print "[0, 1, 4,..., 81]"
print(pool.map(sparkMethods.testMe, range(10)))
в моем ноутбуке Jupyter я получаю ожидаемый результат:
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Теперь, перед тем, как кто-то оскорбляет то, как я написал следующий метод, пожалуйста, знайте, что я сначала попытался передать контекст искры через замыкание, а затем столкнулся с ошибкой Pickling, как описано здесь: Я могу "«Выбирать локальные объекты», если я использую производный класс? Итак, я включил весь контекст Spark в следующий метод, который я поместил в файл sparkMethods.py
(по крайней мере, пока не найду лучший способ).Причина, по которой я поместил методы во внешний файл (вместо того, чтобы включать их только в блокнот Jupyter), заключался в том, чтобы решить эту проблему: https://bugs.python.org/issue25053, как обсуждалось здесь: Пример многопроцессорной обработки, выдающий AttributeError издесь: многопроцессорная обработка python: AttributeError: Не удается получить атрибут "abc"
Это тот метод, который содержит логику для установления соединения JDBC:
# In sparkMethods.py file:
def getAndSaveTableInPySpark(tableName):
import os
import os.path
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "OURCONNECTIONURL;") \
.option("driver", "com.netsuite.jdbc.openaccess.OpenAccessDriver") \
.option("dbtable", tableName) \
.option("user", "USERNAME") \
.option("password", "PASSWORD") \
.load()
filePath = "C:\\src\\NetsuiteSparkProject\\" + tableName + "\\" + tableName + ".parquet"
jdbcDF.write.parquet(filePath)
fileExists = os.path.exists(filePath)
if(fileExists):
return (filePath + " exists!")
else:
return (filePath + " could not be written!")
Затем, возвращаясь в свой блокнот Jupyter, я запускаю:
import sparkMethods
from multiprocessing import Pool
if __name__ == '__main__':
with Pool(5) as p:
p.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)
проблема в том, что кажется, что выполняется только один поток.
Когда я выполняю его, в выводе консоли я вижу, что он включает это изначально:
Процесс не может получить доступфайл, потому что он используется другим процессом.Системе не удается найти файл C: \ Users \ DEVIN ~ 1.BOS \ AppData \ Local \ Temp \ spark-class-launcher-output-3662.txt.,,,
, что заставляет меня заподозрить, что, возможно, происходит какой-то тип блокировки.
Независимо от того, один из потоков всегда будет успешно завершен, успешно запросит соответствующую таблицу и сохранит ее в файле Parquet по желанию.Существует некоторый недетерминизм в процессе, потому что разные исполнения приводят к тому, что другой поток выигрывает гонку и, следовательно, обрабатывает другую таблицу.Интересно, что выполняется только одно задание, как показано в интерфейсе Spark: Тем не менее, статья здесь: https://medium.com/@rbahaguejr/threaded-tasks-in-pyspark-jobs-d5279844dac0 подразумевает, что я должен ожидать увидеть несколько заданий вSpark UI, если они были успешно запущены.
Теперь, если проблема в том, что PySpark на самом деле не способен выполнять несколько запросов JDBC параллельно для разных узлов задач, то, возможно, мое решение будет состоять в том, чтобы использовать пул соединений JDBC или даже просто открыть соединение для каждогостол (пока я закрываю соединение в конце потока).Когда я получал список таблиц для обработки, мне удалось подключиться к базе данных через библиотеку jaydebeapi, например:
import jaydebeapi
conn = jaydebeapi.connect("com.netsuite.jdbc.openaccess.OpenAccessDriver",
"OURCONNECTIONURL;",
["USERNAME", "PASSWORD"],
r"C:\src\NetSuiteJDBC\NQjc.jar")
top5Tables = list(pd.read_sql("SELECT TOP 5 TABLE_NAME FROM OA_TABLES WHERE TABLE_OWNER != 'SYSTEM';", conn)["TABLE_NAME"].values)
conn.close()
top5Tables
Вывод:
['SALES_TERRITORY_PLAN_PARTNER',
'WORK_ORDER_SCHOOLS_TO_INSTALL_MAP',
'ITEM_ACCOUNT_MAP',
'PRODUCT_TRIAL_STATUS',
'ACCOUNT_PERIOD_ACTIVITY']
Так что, возможно, еслипроблема в том, что PySpark нельзя использовать для распределения нескольких запросов по узлам задач, как это, тогда, возможно, я могу использовать библиотеку jaydebeapi, чтобы установить соединение.Однако в этом случае мне все еще нужен был бы способ записать выходные данные SQL-запроса JDBC в файл Parquet (который в идеале использовал бы возможность вывода схемы Spark), но я готов принять этот подход, еслиэто выполнимо.
Итак, как мне успешно выполнить запрос к базе данных и сохранить выходные данные в файлы Parquet параллельно (т. е. распределенные по узлам задачи) без того, чтобы мастер-узел выполнял все запросы последовательно?