Pyspark - фильтр RDD с датами в словаре вещания - PullRequest
0 голосов
/ 22 апреля 2019

У меня есть транслируемый словарь Python, который содержит фильтры даты по пользователю.

nested_filter = {"user1":"2018-02-15"}
b_filter = sc.broadcast(nested_filter)

Я хочу использовать эту широковещательную переменную для фильтрации больших RDD со строками, меньшими, чем дата фильтра.

rdd_set = sc.parallelize([("user1","2018-02-05"), ("user1","2018-02-20")])

rdd_set.filter(lambda fields: fields <= b_filter.value.items()).collect()

Но возвращается пустой RDD.

Может кто-нибудь указать, что я делаю неправильно?Кроме того, мне нужно преобразовать строковые даты в объект даты?

Правильный результат должен быть:

[("user1","2018-02-05")]

1 Ответ

1 голос
/ 22 апреля 2019

Обратите внимание, что значение, возвращаемое b_filter.value.items() внутри вашего filter вызова, совпадает с:

nested_filter.items()
#[('user1', '2018-02-15')]

Итак, ваше сравнение становится:

("user1","2018-02-05") < [('user1', '2018-02-15')]
#False

Что составляет False. Предполагая, что nested_filter - это словарь, содержащий всего 1 элемент (как показано здесь), вы, вероятно, намеревались сделать сравнение с первым элементом списка:

("user1","2018-02-05") < nested_filter.items()[0]
#True

Таким образом, чтобы «исправить» свой код, вы могли бы сделать следующее:

rdd_set.filter(lambda fields: fields <= b_filter.value.items()[0]).collect()
#[('user1', '2018-02-05')]

Но вместо этого я думаю, что вы действительно хотите следующее:

rdd_set.filter(lambda fields: fields[1] <= b_filter.value.get(fields[0])).collect()
#[('user1', '2018-02-05')]

Используется fields[0] для получения даты из nested_filter (или возврата None, если она не существует) и сравнение значения с fields[1].

Как вы заметили, это сравнение будет происходить лексикографически на строках. Это не будет проблемой для вас, если ваши даты останутся в формате YYYY-MM-DD, но для других форматов дат вам может потребоваться преобразовать в объект datetime.

...