инкрементальная агрегация кафки - PullRequest
0 голосов
/ 28 октября 2019

У меня есть поток delta чисел в теме кафки, который должен быть агрегирован особым образом, например:

aggregate[0] = 0
aggregate[N] = aggregate[N-1] * (N - 1) / N + delta[N - 1] / N

(точная формула не имеет значения, обратите внимание на зависимость от предыдущего элементав aggregate хотя)

По сути, мне нужно подписаться на две темы кафки одновременно, где я продвигаюсь в обеих темах одновременно: когда я читаю элемент в теме delta, мне нужнопрочитайте также соответствующий элемент из темы aggregate и запишите результат в тему aggregate до того, как будет использован следующий элемент в теме delta.

Возможно ли это вообще? в кафке? Может ли ksql с помощью умного соединения помочь?

1 Ответ

0 голосов
/ 29 октября 2019

Интересно, может ли помочь мой псевдокод. Предположим, что есть две темы: «дельта» и «совокупность». И раздел обеих тем равен 1, чтобы упростить ситуацию (чтобы мы получили глобальный порядок)

# this is just pseudocode to show my thoughts
def demo():
    delta_consumer = Consumer("delta")
    aggregate_consumer = Consumer("aggregate")
    aggregate_producer = Producer("aggregate")

    is_pre_aggregate_result_exists = aggregate_consumer.get_offset() != 0 # check whether it's first running 
    for delta_data in delta_consumer.poll():
        if not is_pre_aggregate_result_exists:
            last_aggregate_result = 0
        else:
            last_aggregate_result = aggregate_consumer.get_last_record()
        new_aggregate_result = user_define_function(delta_data, last_aggregate_result)
        aggregate_producer.producer(new_aggregate_result)
        is_pre_aggregate_result_exists = True

Между тем, я предполагаю, что kafka + structurd-steaming может решить вашу проблему, потому что внутренняя потребность вашего вопроса состоит в том, чтобыполучить агрегатный_результат в таблице потоков и затем вывести результат в раздел kafka, где идеальным решением является kafka + структурированный стераминг.

...