Фильтр данных через Spark читает из Монго - PullRequest
0 голосов
/ 25 сентября 2019

Я извлекаю данные из монго в кирпичи данных, используя блокнот Python.Однако я не уверен, как фильтровать данные с помощью комбинации одного регулярного поля и одного из массива.Может кто-нибудь, пожалуйста, помогите мне?

Step1: подключен к Mongo

Step2: определен ниже структуры

Input_Schema= StructType([StructField("lot",StringType(),True),
              StructField("lot_status",StringType(),True),
            StructField("recipients",ArrayType(
                StructType([
                StructField("account",StringType(),True),
                StructField("id",IntegerType(),True),
                StructField("type",StringType(),True),
                StructField("status",StringType(),True)             
                ])))]) 

Step3: создан конвейер для фильтрации данных

pipeline = [{'$match':{'lot_status':'fetch'}}]

Используя вышеописанный конвейер, мы выбираем только те лоты, которые имеют статус = 'fetch'

Шаг 4: Выборка данных с использованием следующего оператора

Data = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri",connectionstring).option("pipeline",pipeline).schema(input_schema).load()

display(Data)

Получение вывода всех лотов, имеющих lot_status= 'Fetch'

Однако мне нужно включить еще один фильтр из массива, который будет

, где status = 'Fetch'.Прямо сейчас, используя конвейер, я могу отфильтровать данные, используя lot_status, однако я не уверен, как фильтровать, используя два условия.

Если я попытаюсь использовать ниже, он получает всю коллекцию из receients.status (Fetch иnon-Fetch)

 pipeline = [{'$match':{ '$and': [ { 'lot_status':'Fetch' }, {'recipients.status':'Fetch'} ] }}]
...