Соединитель PySpark и MongoDB: невозможно выполнить запрос с датами внутри конвейера - PullRequest
0 голосов
/ 20 декабря 2018

Я использую PySpark и настроил соединитель MongoDB для получения доступа к моей базе данных.Я пытаюсь построить конвейер, где этап $ match включает фильтрацию по датам.Я не могу заставить это работать, и мне может понадобиться что-то вроде ISODATE ("") здесь.

Я уже пробовал объекты datetime с правильными форматами, и я также пытался передать строку isoformat на этапе $ match.Ничего из этого не сработало.

mongoURI = "mongodb://localhost/databaseName.collection"
my_spark = SparkSession.builder.master("local").appName("test").config("spark.mongodb.input.uri", mongoURI).getOrCreate()
from_date = datetime.datetime(2018, 1, 1)
to_date = datetime.datetime(2018, 1, 1)
products =['P1', 'P2']
clients = ['C1', 'C2']
pipeline = [
 {'$match': {'$and': [ { '_created_at': { '$gt': from_date } }, { '_created_at': { '$lt': to_date } } ],'product': {'$in': products},'client': {'$in':clients }}},
 {'$project': {'product': 1,'client': 1}}]
df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").option("pipeline", pipeline).load()
print(df.printSchema())

Я ожидаю получить некоторые документы обратно, но вместо этого получаю эту ошибку

 Py4JJavaError: An error occurred while calling o36.load.: java.lang.IllegalArgumentException: requirement failed: Invalid Aggregation map Map ... 
 It should be a list of pipeline stages (Documents) or a single pipeline stage (Document)

Если я передаю даты в виде строки, я получаю Noneсхема, и когда я исключаю основанную на дате часть запроса, я получаю документы обратно.Итак, именно эта часть имеет проблему.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...