Kafka Streams: ktable в качестве поиска и место назначения для объединения потоков - PullRequest
1 голос
/ 21 мая 2019

Привет, у меня есть 3 потока событий, которые я хочу объединить, используя потоки Kafka.

Я не могу найти простое решение для решения проблемы параллелизма:

    // merged values Ktable
    KTable<String, ProdForecastPowerPlantAgg> mergedTable = builder.table(
            getMergedValuesTopic(),
            [...]);


    // records A

    // stream
    KStream<String, RecordA> recordsAStream = builder.stream(
            getRecordATopic(),
            [...]);

    // rekeyed stream
    KStream<String, ProductionRecordValue> recordsABySomeId = recordsAStream
            .selectKey((k, v) -> getKey(v);


    // records B

    // stream
    KStream<String, RecordB> recordsBStream = builder.stream(
            getRecordBTopic(),
            [...]);

    // rekeyed stream
    KStream<String, RecordB> recordsBBySomeId = recordsBStream
            .selectKey((k, v) -> getKey(v);


    // records C

    // stream
    KStream<String, RecordA> recordsCStream = builder.stream(
            getRecordCTopic(),
            [...]);

    // rekeyed stream
    KStream<String, ProductionRecordValue> recordsCBySomeId = recordsCStream
            .selectKey((k, v) -> getKey(v);


    // when a recordA arrives
    KStream<String, RecordA> aggFromA = recordsABySomeId
            .filter((k, v) -> v != null)
            // join recordA and current join result together
            .leftJoin(mergedTable, (recA, oldMerge) -> {
                        if (oldMerge != null) {
                            return new Merge(recA, oldMerge.B, oldMerge.C);
                        }
                        return new Merge(recA, null, null)
                    },
                    [...]
            );

    // when a recordB arrives
    KStream<String, RecordB> aggFromB = recordsBBySomeId
            .filter((k, v) -> v != null)
            // join recordB and current join result together
            .leftJoin(mergedTable, (recB, oldMerge) -> {
                        if (oldMerge != null) {
                            return new Merge(oldMerge.A, recB, oldMerge.C);
                        }
                        return new Merge(null, recB, null)
                    },
                    [...]
            );


    // when a recordC arrives
    KStream<String, RecordB> aggFromC = recordsCBySomeId
            .filter((k, v) -> v != null)
            // join recordB and current join result together
            .leftJoin(mergedTable, (recC, oldMerge) -> {
                        if (oldMerge != null) {
                            return new Merge(oldMerge.A, oldMerge.B, recC);
                        }
                        return new Merge(null, null, recC)
                    },
                    [...]
            );


    // save aggreagtion
aggFromA.merge(aggFromB).merge(aggFromC)
            .to(getMergedValuesTopic(), Produced.with(Serdes.String(), aggSerdes));



    return builder.build();

Действительно, этот фрагмент недействителен: таблица KTable на основе getMergedValuesTopic не отражает последнее состояние слияния , когда поиск выполнен: когда две разные записи приходят одновременно, одно обновление может отменить другое (так как поиск устарел).

У кого-нибудь есть простое решение этой проблемы с использованием потоков Кафки?

1 Ответ

2 голосов
/ 21 мая 2019

Я думаю, что простой агрегат должен добиться цели. Агрегация выполняет описанную вами операцию: «KTable как поиск и назначение».

В каждой поступающей записи таблица агрегации проверяется на совпадения. Если совпадений не найдено, инициализатор, определенный в агрегации, используется для создания новой начальной записи: документация доступна здесь

пример кода:

public class KTableMerge {

protected Topology buildTopology() {
    final StreamsBuilder builder = new StreamsBuilder();

    //Streams
    KStream<String, RecordA> recordAKStream = builder.stream("test-record-a");
    KStream<String, RecordB> recordBKStream = builder.stream("test-record-b");
    KStream<String, RecordC> recordCKStream = builder.stream("test-record-c");

    //Re-key and Merge Streams in parent 'Record' container
    KStream<String, Record> mergedStream =
        recordAKStream
            .selectKey((key, value) -> value.getForeignKey())
            .mapValues(value -> (Record) value)
            .merge(recordBKStream
                .selectKey((key, value) -> value.getForeignKey())
                .mapValues(value -> (Record) value))
            .merge(recordCKStream
                .selectKey((key, value) -> value.getForeignKey())
                .mapValues(value -> (Record) value));

    //Aggregate
    mergedStream
        .groupByKey()
        .aggregate(
            Merge::new,
            (key, value, aggregate) -> {
                if (value instanceof RecordA) {
                    aggregate.recordA = (RecordA) value;
                } else if (value instanceof RecordB) {
                    aggregate.recordB = (RecordB) value;
                } else if (value instanceof RecordC) {
                    aggregate.recordC = (RecordC) value;
                }
                return aggregate;
            })
        .toStream()
        .to("merge-topic");

    return builder.build();
}

private static class Merge {
    RecordA recordA;
    RecordB recordB;
    RecordC recordC;
}

private interface Record {
    String getForeignKey();
}

private static class RecordA implements Record {
    String id;
    String foreignKey;

    public String getForeignKey() {
        return foreignKey;
    }
}

private static class RecordB implements Record {
    String id;
    String foreignKey;

    public String getForeignKey() {
        return foreignKey;
    }
}

private static class RecordC implements Record {
    String id;
    String foreignKey;

    public String getForeignKey() {
        return foreignKey;
    }
}

}

надеюсь, это поможет

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...