Подобно сжатию журналов Kafka, существует довольно много случаев, когда требуется сохранять только последнее обновление для данного ключа и использовать результат, например, для объединения данных.
Как это может быть заархивировано при потоковой структуризации (предпочтительно с использованием PySpark)?
Например, предположим, у меня есть таблица
key | time | value
----------------------------
A | 1 | foo
B | 2 | foobar
A | 2 | bar
A | 15 | foobeedoo
Теперь я хотел бы сохранить последнююзначения для каждого ключа в качестве состояния (с водяными знаками), т. е. иметь доступ к фрейму данных
key | time | value
----------------------------
B | 2 | foobar
A | 15 | foobeedoo
, к которому я мог бы присоединиться к другому потоку.
Желательно, чтобы это было сделано безтратить один поддерживаемый шаг агрегации.Я полагаю, мне понадобится функция dropDuplicates()
с обратным порядком.
Обратите внимание, что этот вопрос явно о структурированной потоковой передаче и о том, как решить проблему без конструкций, которые тратят впустую шаг агрегации (следовательно, все с оконными функциями или максимальной агрегацией не является хорошим ответом).(Если вы не знаете: агрегирование цепочек прямо сейчас не поддерживается в структурированной потоковой передаче.)