Я пытаюсь запросить хранилище состояний, чтобы получить данные в окне за 5 минут.Для этого я использую tumbling window
.Добавили REST для запроса данных.У меня есть stream A
, который использует данные из topic1
, выполняет некоторые преобразования и выводит значение ключа в topic2
.Сейчас в stream B
я делаю операцию переворачивания окна на topic2
данных.Когда я запускаю код и запрашиваю с помощью REST, я вижу пустые данные в моем браузере.Я вижу, как текут данные в государственном магазине.
Я наблюдал, вместо того чтобы topic2
получать данные из stream A
, я использовал класс продюсера для ввода данных в topic2
и мог запрашивать данные из браузера.Но когда topic2
получает данные от stream A
, я получаю пустые данные.
Вот мой stream A
код:
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("topic1");
KStream<String, String> output = source
.map((k,v)->
{
Map<String, Object> Fields = new LinkedHashMap<>();
Fields.put("FNAME","ABC");
Fields.put("LNAME","XYZ");
Map<String, Object> nFields = new LinkedHashMap<>();
nFields.put("ADDRESS1","HY");
nFields.put("ADDRESS2","BA");
nFields.put("addF",Fields);
Map<String, Object> eve = new LinkedHashMap<>();
eve.put("nFields", nFields);
Map<String, Object> fevent = new LinkedHashMap<>();
fevent.put("eve", eve);
LinkedHashMap<String, Object> newMap = new LinkedHashMap<>(fevent);
return new KeyValue<>("JAY1234",newMap.toString());
});
output.to("topic2");
}
Вот мой stream B
код (где происходит переворачиваемое окно):
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> eventStream = builder.stream("topic2");
eventStream.groupByKey()
.windowedBy(TimeWindows.of(300000))
.reduce((v1, v2) -> v1 + ";" + v2, Materialized.as("TumblingWindowPoc"));
final Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
}
REST-код:
@GET()
@Path("/{storeName}/{key}")
@Produces(MediaType.APPLICATION_JSON)
public List<KeyValue<String, String>> windowedByKey(@PathParam("storeName") final String storeName,
@PathParam("key") final String key) {
final ReadOnlyWindowStore<String, String> store = streams.store(storeName,
QueryableStoreTypes.<String, String>windowStore());
if (store == null) {
throw new NotFoundException(); }
long timeTo = System.currentTimeMillis();
long timeFrom = timeTo - 30000;
final WindowStoreIterator<String> results = store.fetch(key, timeFrom, timeTo);
final List<KeyValue<String,String>> windowResults = new ArrayList<>();
while (results.hasNext()) {
final KeyValue<Long, String> next = results.next();
windowResults.add(new KeyValue<String,String>(key + "@" + next.key, next.value));
}
return windowResults;
}
И вот так выглядят мои данные значения ключа:
JAY1234 {eve = {nFields = {ADDRESS1 = HY,ADDRESS2 = BA,Fields = {FNAME = ABC,LNAME = XYZ,}}}}
Я смогу получить данные при запросе с помощью REST.Любая помощь очень ценится.Спасибо!