Я использую PySpark и настроил соединитель MongoDB для получения доступа к моей базе данных.Я пытаюсь построить конвейер, где этап $ match включает фильтрацию по датам.Я не могу заставить это работать, и мне может понадобиться что-то вроде ISODATE ("") здесь.
Я уже пробовал объекты datetime с правильными форматами, и я также пытался передать строку isoformat на этапе $ match.Ничего из этого не сработало.
mongoURI = "mongodb://localhost/databaseName.collection"
my_spark = SparkSession.builder.master("local").appName("test").config("spark.mongodb.input.uri", mongoURI).getOrCreate()
from_date = datetime.datetime(2018, 1, 1)
to_date = datetime.datetime(2018, 1, 1)
products =['P1', 'P2']
clients = ['C1', 'C2']
pipeline = [
{'$match': {'$and': [ { '_created_at': { '$gt': from_date } }, { '_created_at': { '$lt': to_date } } ],'product': {'$in': products},'client': {'$in':clients }}},
{'$project': {'product': 1,'client': 1}}]
df = my_spark.read.format("com.mongodb.spark.sql.DefaultSource").option("pipeline", pipeline).load()
print(df.printSchema())
Я ожидаю получить некоторые документы обратно, но вместо этого получаю эту ошибку
Py4JJavaError: An error occurred while calling o36.load.: java.lang.IllegalArgumentException: requirement failed: Invalid Aggregation map Map ...
It should be a list of pipeline stages (Documents) or a single pipeline stage (Document)
Если я передаю даты в виде строки, я получаю Noneсхема, и когда я исключаю основанную на дате часть запроса, я получаю документы обратно.Итак, именно эта часть имеет проблему.