У меня есть тема с именем topic1 и тема с именем topic2 .У меня есть следующий код:
- поток topic1 :
KStream<Long, byte[]> events = builder.stream("topic1", Consumed.with(Serdes.Long(), Serdes.ByteArray()));
- таблица topic2 :
KTable<Long, byte[]> table = builder.table("topic2",
Consumed.with(Serdes.Long(), Serdes.ByteArray()));
Когда я работаю с продюсером из topic1 , все получается нормально.Значения topic1 и topic2 отличаются, я хочу использовать созданную запись от первого производителя тем с этим потоком "events", а после этого добавить эти события в таблицу тем секунд исохранить состояние значения второй темы для того же ключа (ключ длинный, это одно и то же число для первой и второй темы).Другими словами, я хочу объединить поток с таблицей на основе того же ключа и обновить таблицу.
Мой код:
StoreBuilder<KeyValueStore<Long, byte[]>> store = Stores
.keyValueStoreBuilder(Stores.persistentKeyValueStore(STORE_TOPIC2), Serdes.Long(), Serdes.ByteArray())
.withLoggingEnabled(new HashMap<>());
builder.addStateStore(store);
events.join(table, KeyValue::new, Joined.with(Serdes.Long(), Serdes.ByteArray(), Serdes.ByteArray()))
.transform(Update::new, STORE_TOPIC2)
.to("topic2", Produced.with(Serdes.Long(), Serdes.ByteArray()));
В последней строке я создаю присоединенные события кtopic2, но по этой теме ничего не производится, мой трансформер "Update" выглядит так:
private static class Update implements Transformer<Long, KeyValue<byte[], byte[]>, KeyValue<Long, byte[]>> {
private KeyValueStore<Long, byte[]> store1;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
store1 = (KeyValueStore<Long, byte[]>) context.getStateStore(store);
}
@Override
public KeyValue<Long, byte[]> transform(final Long key, final KeyValue<byte[], byte[]> updates) {
System.out.println("Inside event transformer for key: " + key);
System.out.println("Last produced graph: " + store.get(key));
CustomClass c = null;
try {
c = deserializeModel(store.get(key));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if (c == null) {
c = new CustomClass(...);
}
try {
return KeyValue.pair(companyKey, serializeModel(companyNetwork));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
}