Как сделать соединение двух разных тем в кафке - PullRequest
0 голосов
/ 04 декабря 2018

У меня есть тема с именем topic1 и тема с именем topic2 .У меня есть следующий код:

- поток topic1 :

KStream<Long, byte[]> events = builder.stream("topic1", Consumed.with(Serdes.Long(), Serdes.ByteArray()));

- таблица topic2 :

KTable<Long, byte[]> table = builder.table("topic2",
        Consumed.with(Serdes.Long(), Serdes.ByteArray()));

Когда я работаю с продюсером из topic1 , все получается нормально.Значения topic1 и topic2 отличаются, я хочу использовать созданную запись от первого производителя тем с этим потоком "events", а после этого добавить эти события в таблицу тем секунд исохранить состояние значения второй темы для того же ключа (ключ длинный, это одно и то же число для первой и второй темы).Другими словами, я хочу объединить поток с таблицей на основе того же ключа и обновить таблицу.

Мой код:

StoreBuilder<KeyValueStore<Long, byte[]>> store = Stores
        .keyValueStoreBuilder(Stores.persistentKeyValueStore(STORE_TOPIC2), Serdes.Long(), Serdes.ByteArray())
        .withLoggingEnabled(new HashMap<>());

    builder.addStateStore(store);

    events.join(table, KeyValue::new, Joined.with(Serdes.Long(), Serdes.ByteArray(), Serdes.ByteArray()))
        .transform(Update::new, STORE_TOPIC2)
        .to("topic2", Produced.with(Serdes.Long(), Serdes.ByteArray()));

В последней строке я создаю присоединенные события кtopic2, но по этой теме ничего не производится, мой трансформер "Update" выглядит так:

private static class Update implements Transformer<Long, KeyValue<byte[], byte[]>, KeyValue<Long, byte[]>> {

    private KeyValueStore<Long, byte[]> store1;

    @Override
    @SuppressWarnings("unchecked")
    public void init(ProcessorContext context) {
        store1 = (KeyValueStore<Long, byte[]>) context.getStateStore(store);
    }

    @Override
    public KeyValue<Long, byte[]> transform(final Long key, final KeyValue<byte[], byte[]> updates) {

        System.out.println("Inside event transformer for key: " + key);
        System.out.println("Last produced graph: " + store.get(key));

        CustomClass c = null;
        try {
        c = deserializeModel(store.get(key));
        } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        }

        if (c == null) {
        c = new CustomClass(...);
        }

        try {
        return KeyValue.pair(companyKey, serializeModel(companyNetwork));
        } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        return null;
        }
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...