Как обработать большую коллекцию Монго, которая имеет много необязательных полей (динамическая схема) в Pyspark - PullRequest
0 голосов
/ 18 апреля 2019

Я обрабатываю коллекцию Mongo, которая содержит тысячи элементов, каждый из которых является необязательным.Элементы являются необязательными, потому что они отражают исходные данные, доступные для каждого идентификатора.Я также мог бы добавить, что есть несколько уровней вложенных структур и массивов, которые я должен был бы угадать приблизительно из 8 уровней в некоторых частях коллекции из моего анализа до настоящего времени.

Проблема, с которой я сталкиваюсь в настоящее времяявляется то, что определение схемы является динамическим.Документация по соединителю mongo показывает только примеры pyspark, использующие данные через соединитель в кадре данных, где схема выводится, предоставляя интерфейс sql для использования для записи данных в hdfs.Предположительно, примеры проиллюстрированы из-за оптимизаций, встроенных в использование фреймов данных.Однако вывод схемы, встроенный в этот подход, не выглядит так, как будто он отбирает достаточное количество строк, когда так много полей являются необязательными, и, следовательно, мои задания аварийно завершают работу, поскольку при выполнении операций разнесения возникают несовпадения типов во вложенных объектах.

Конфигурация среды:

mongo-spark-connector_2.11-2.1.5.jar

mongodb-driver-3.6.4.jar

mongodb-driver-core-3.6.4.jar

bson-3.6.4.jar

Python 3.3.2

Spark 2.1.0

Версия сервера MongoDB: 3.4.19

http://spark.apache.org/docs/2.1.0/api/python/

https://docs.mongodb.com/spark-connector/master/python-api/

У нас была встреча, чтобы обсудить подход, и я был готов принять на себя накладные расходы производительности при выполнении полного сканирования монго для определения схемы надмножества.

Итак, я попытался преобразовать кадр данных, возвращенный из примера коннектора mongo:

someDF = spark.read.format ("com.mongodb.spark.sql.DefaultSource"). Load ()

в RDD, где я мог бы использовать функцию API pyspark, созданнуюeDataFrame.Причина перехода от df к rdd и обратно заключалась в том, что я надеялся переопределить логический вывод схемы, использованный при создании оригинального df, где я мог явно выбрать большую часть населения, чтобы попытаться получить предполагаемую схему.

someNewDF = spark.createDataFrame (someDF.rdd, schema = None, samplingRatio = 1)

Что не работает - someNewDF возвращает не тип, который я считаю.Это как-то связано с переходом на RDD?По крайней мере, попытка выполнить показ на json_root вызывает исключение «не тип», не повторяемое исключение

import os
import sys
import platform

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as psf
from pyspark.sql.types import *
from py4j.protocol import Py4JError

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: myScript.py config.properties"
        sys.exit(-1)

arguments = sys.argv[1:]
print("Starting "+str(sys.argv[0])+" with "+str(sys.argv[1]))

conf = SparkConf()
conf.set("spark.mongodb.input.uri", "mongodb://hostname:27017/some_database.some_collection")
conf.set("spark.debug.maxToStringFields", 2048)

# Create a new SparkContext used for RDD's
sc = SparkContext("yarn")

# Set log level to ERROR to avoid distracting extra output ** changed to trace temporarily to capture more data
sc.setLogLevel("TRACE")

# Create a new SparkSession used for DataFrames
spark = SparkSession.builder \
                    .master("yarn") \
                    .appName(str(sys.argv[0])) \
                    .config(conf=conf) \
                    .getOrCreate()


someDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

**newDF = spark.createDataFrame(someDF.rdd, schema=None, samplingRatio=1)**

json_root = newDF.select(psf.reverse(newDF._id).alias("id"), \
            newDF.audit.modDate.alias("audit_modDate"), \                               
            newDF.audit.processedDate.alias("audit_processedDate"), \
            newDF.patch.alias("patch"), \                           
            newDF.report.acJurisdictions.alias("acJurisdictions"), \                                 
            newDF.report.cachemd5.alias("cachemd5"), \                               
            newDF.report.decode.baseLPrice.alias("baseLPrice"), \
            newDF.report.decode.baseMod.alias("baseMod"), \
           newDF.report.decode.baseShippingWeight.alias("baseShippingWeight"), \           
            newDF.report.decode.bodyTypeCode.alias("bodyTypeCode"), \
            psf.explode(newDF.some_col))

json_root.show()

# There are multiple steps of dataframe transformations after this where I explode out structs with * wildcard and explode more arrays (** you can only explode one array per dataframe unless you use the spark sql query interface). When the data is ready to be written to the parquet file in the final step is when evaluation of all the dataframes this is where the schema collision occurs with the inferred schema                     

##...more data frame operations to flatten nested structures here

json_lvl4.show()

json_lvl4.createOrReplaceTempView("vRecDF_tab")
spark.sql("DROP TABLE IF EXISTS database.my_table")
spark.sql("""
CREATE TABLE database.my_table
STORED AS PARQUET
as
SELECT * FROM vRecDF_tab
""")

Что частично работает, но вынудило меня предпринять попытку, упомянутую выше - Возвращение к простому использованиюоператор загрузки, а затем показ на операции фрейма данных дает данные.Данные успешно экспортируются в таблицу, если я экспортирую json_root через json_lvl3.Однако попытка еще больше расширить структуры во вложенных типах приводит к несоответствию типов при записи данных в таблицу при использовании оператора * в структурах.Это чисто моё предположение из-за экспериментов, но кажется, что это расширение дает разные определения таблиц при последовательных запусках из-за предполагаемого размера выборки схемы, встроенного в spark.read.format ("com.mongodb.spark.sql.DefaultSource"). Load () когда в документе слишком много необязательных полей.

import os
import sys
import platform

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as psf
from pyspark.sql.types import *
from py4j.protocol import Py4JError

if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >> sys.stderr, "Usage: myScript.py config.properties"
        sys.exit(-1)

arguments = sys.argv[1:]
print("Starting "+str(sys.argv[0])+" with "+str(sys.argv[1]))

conf = SparkConf()
conf.set("spark.mongodb.input.uri", "mongodb://hostname:27017/some_database.some_collection")
conf.set("spark.debug.maxToStringFields", 2048)

# Create a new SparkContext used for RDD's
sc = SparkContext("yarn")

# Set log level to ERROR to avoid distracting extra output ** changed to trace temporarily to capture more data
sc.setLogLevel("TRACE")

# Create a new SparkSession used for DataFrames
spark = SparkSession.builder \
                    .master("yarn") \
                    .appName(str(sys.argv[0])) \
                    .config(conf=conf) \
                    .getOrCreate()


newDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

json_root = newDF.select(psf.reverse(newDF._id).alias("id"), \
            newDF.audit.modDate.alias("audit_modDate"), \                               
            newDF.audit.processedDate.alias("audit_processedDate"), \
            newDF.patch.alias("patch"), \                           
            newDF.report.acJurisdictions.alias("acJurisdictions"), \                                 
            newDF.report.cachemd5.alias("cachemd5"), \                               
            newDF.report.decode.baseLPrice.alias("baseLPrice"), \
            newDF.report.decode.baseMod.alias("baseMod"), \
           newDF.report.decode.baseShippingWeight.alias("baseShippingWeight"), \           
            newDF.report.decode.bodyTypeCode.alias("bodyTypeCode"), \
            psf.explode(newDF.some_col))

json_root.show()

##...more data frame operations to flatten nested structures here

json_lvl4.show()

json_lvl4.createOrReplaceTempView("vRecDF_tab")
spark.sql("DROP TABLE IF EXISTS database.my_table")
spark.sql("""
CREATE TABLE database.my_table
STORED AS PARQUET
as
SELECT * FROM vRecDF_tab
""")

Мне трудно привести пример кода по нескольким причинам: 1. У меня есть обязательства по защите данных и NDA.2. Код, необходимый для репликации проблемы, довольно велик.

Когда я говорил ранее, что есть много необязательных полей, базовый документ json имеет примерно 113 тыс. Уникальных полей.Я полагаю, что из полмиллиарда записей, хранящихся в осколках монго, около 75% полей имеют покрытие для каждого идентификатора.

Чтобы еще больше усложнить фрагмент кода: psf.explode (newDF.some_col)) - один из немногих array<struct<struct,struct,struct,array<struct>.......

Выполнение операции схемы печати при многократном запуске подряд приводит к различным определениям для этих вложенных объектов, поскольку, как указывалось ранее, логический вывод схемы по умолчанию не отбирает достаточно документов.

Различия в неструктурированном формате документов достаточно распространены, поэтому я не могу написать общее решение для выборки из 100 тыс. Записей. Здесь важно отметить, что при конкретном жестком выборе элементов кодирования, когда известно, что элементы имеют охват всего набора данных, я могу успешно сохранить данные в файл партера в HDFS. Как только я начинаю расширять вложенные типы в документах, я начинаю сталкиваться со схемами.

Есть ли лучший способ сделать это, что я неправильно понимаю, когда существует такой сложный набор данных?

Возможно ли, что более подходящим может быть загрузка библиотеки json и помещение записей монго в rdd, где мы отображаем их в коллекции python и реструктурируем данные, чтобы сделать их более легко усваиваемыми?

любые идеи будут великолепны! ПОЖАЛУЙСТА, ПОМОГИТЕ!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...