KStream оставил Join KStream с тем же ключом - PullRequest
0 голосов
/ 25 августа 2018

У меня проблема с тем, что я пытаюсь покинуть 2 потока. MergedKey имеет более 100 списки с тем же ключом и DataStream имеет только 1 листинг с тем же ключом, что и mergedKey. Я хочу в endStream Значение из mergedKey Объединить с DataStream.

      //get DataStream 
        final KStream<String, GenericRecord> DataStream = builder.stream("Datastreams");
        // Transform merged to Equals Keys to DataStream.Iot
        final KStream<String, GenericRecord> mergedKey = mergedFoIObs
                .map((key, value) -> KeyValue.pair(value.get("Datastream").toString(), value)); 
        // Join the DataStream with MergedStream


        final KStream<String, String> mergedFoIObsData = mergedKey.leftJoin(
                DataStream,
            (value, data) -> {
                try {
                    if(data != null{
                        value.put("Datastream", data.toString());
                        JSONObject jo = (JSONObject) new JSONParser().parse(value.toString());
                        return jo.toJSONString();}
                      return null



                } catch (ParseException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                return null;

            }, JoinWindows.of(10000));

Но моя проблема в том, что в Endstream я получаю только 1 листинг с правильными значениями, а другие листинги имеют нулевые значения.

Taht означает, что данные являются нулевыми после первого "раунда".

Когда я преобразовываю DataStream в KTable, у меня возникает проблема с тем, что я получаю правильные списки, но только 37 списков, поэтому 60 ошибочно.

Надеюсь, вы мне поможете.

1 Ответ

0 голосов
/ 26 августа 2018

Для соединения KStream-KStream это зависит от отметок времени записи, если они присоединяются или нет.Посмотрите этот пост в блоге для получения более подробной информации: https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

Для соединения KStream-KTable это зависит от того, когда запись KTable будет загружена в KTable - Kafka Streams пытается синхронизировать загрузку на основе временных меток,но это наилучший подход.Таким образом, может случиться так, что некоторые записи KStream обрабатываются первыми, а таблица KTable все еще пуста.Только после того, как записи KTable были обработаны (т. Е. KTable обновлен и содержит эту запись), последовательные записи KStream будут успешно соединены.

Обратите внимание, что следующий выпуск Kafka 2.1 улучшит эту синхронизацию меток времени и обеспечит более надежные гарантиии пользователи даже смогут настроить, насколько строгими должны быть гарантии.

...