Рад, что вы нашли Кибу полезным! Существуют различные решения для этого варианта использования.
Я делаю некоторые предположения здесь (если они неправильные, решения будут существовать, но будут другими, например, определение границ и внешнее хранилище):
- Вы работаете с конечными пакетами (а не с непрерывным потоком обновлений).
- Горстка кратких записей, на которые вы ссылаетесь, может храниться в памяти.
Мой совет здесь заключается в том, чтобы использовать способность Kiba v3 для получения записи в методе close
преобразования (более подробно описанном в этой статье ):
class InMemoryReduceTransform
attr_reader :buffer, :summarize_cb
def initialize(summarize_cb:)
@buffer = []
@summarize_cb = summarize_cb
end
def process(row)
buffer << row
nil # do not forward the row to the rest of the pipeline
end
def close
summarize_cb(buffer).each do |row|
yield row
end
end
end
По сути, вы просто складывать входные строки до тех пор, пока в источнике не будет данных, после чего будет вызван метод close
, а затем вы суммируете имеющиеся у вас данные и получите итоговые строки N.
Примечание: это Упрощенная c реализация, чтобы поставить вас на правильный путь. Следующая итерация Kiba Pro будет включать в себя более масштабируемую и универсальную c версию этой версии (с поддержкой поставщиков). Пожалуйста, обратитесь, если вы заинтересованы в этом!
Дайте мне знать, если это правильно отвечает на ваш вопрос!