Неверный результат Kstream-Kstream Join с асимметричным временным окном - PullRequest
0 голосов
/ 18 мая 2018

У меня есть 2 потока с именами «тревога» и «вмешательство», которые содержат JSON.Если тревога и вмешательство связаны, они будут иметь одинаковый ключ.Я хочу связаться с ними, чтобы обнаружить все тревоги, которые не имели вмешательства 24 часа назад.Но эта программа не работает и в результате выдает мне все сигналы тревоги, как будто никакого вмешательства не было сделано 24 часа назад.Я перепроверил свой набор данных 5 раз, и есть сигналы тревоги, в которых вмешательства были сделаны менее чем за 24 часа до даты сигнала тревоги.Эта картина объясняет ситуацию: введите описание изображения здесь Поэтому мне нужно знать, есть ли вмешательство до тревоги.Код программы:

    final KStream<String, JsonNode> alarm = ...;

    final KStream<String, JsonNode> intervention = ...;

    final JoinWindows jw = JoinWindows.of(TimeUnit.HOURS.toMillis(24)).before(TimeUnit.HOURS.toMillis(24)).after(0);

    final KStream<String, JsonNode> joinedAI = alarm.filter((String key, JsonNode value) -> {
        return value != null;
    }).leftJoin(intervention, (JsonNode leftValue, JsonNode rightValue) -> {
        ObjectMapper mapper = new ObjectMapper();
        JsonNode actualObj = null;

        if (rightValue == null) {//No intervention before
            try {
                actualObj = mapper.readTree("{\"date\":\"" + leftValue.get("date").asText() + "\","
                        + "\"alarm\":" + leftValue.toString()
                        + "}");
            } catch (IOException ex) {
                Logger.getLogger(Main.class.getName()).log(Level.SEVERE, null, ex);
            }
            return actualObj;
        } else {
            return null;
        }
    }, jw, Joined.with(Serdes.String(), jsonSerde, jsonSerde));

    final KStream<String, JsonNode> fraude = joinedAI.filter((String key, JsonNode value) -> {
        return value != null;
    });

    fraude.foreach((key, value) -> {
        rl.println("Fraude=" + key + " => " + value);
        System.out.println("Fraude=" + key + " => " + value);
    });

    final KafkaStreams streams = new KafkaStreams(builder.build(), streamingConfig);

    streams.cleanUp();
    streams.start();

    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
        @Override
        public void run() {
            streams.close();
            rl.close();
            el.close();
            nfl.close();
        }
    }));

Подводя итог, хочу обнаружить шаблон в красном прямоугольнике введите описание изображения здесь

PS: я уверенчто записи вмешательства отправляются до записи тревоги

1 Ответ

0 голосов
/ 31 мая 2018

M.Djx,

Я не думаю, что сейчас есть идеальное решение для этого варианта использования в Kafka Streams, но у меня есть несколько мыслей, чтобы приблизить вас.Я готовлюсь представить KIP для точного рассмотрения вариантов использования, подобных этому, в ближайшем будущем.

Один момент: в отличие от KTable, KStreams не являются журналами изменений, поэтому более новые события не перезаписывают более старые события с тем жеключ;они просто сосуществуют в одном потоке.Я думаю, именно поэтому ваш foreach делает так, что все предупреждения не имеют никакого вмешательства;вы видите промежуточные события соединения до вмешательств.

Например:

LEFT   RIGHT    JOIN
a:1             a:(1,null)
       a:X      a:(1,X)

foreach будет вызвано для обоих результатов соединения, что будет выглядеть как правильное значениеотсутствует, когда на самом деле немного поздно.

Если вы примените временное окно к потоку результатов, вы получите журнал изменений - более новые значения перезапишут более старые.Что-то вроде:

joinedAI
  .groupByKey()
  .windowedBy(
      TimeWindows
          .of(1000 * 60 * 60 * 24) // the window will be 24 hours in size
          .until(1000 * 60 * 60 * 48) // and we'll keep it in the state store for at least 48 hours
  ).reduce(
      new Reducer<JsonNode>() {
          @Override
          public Long apply(final JsonNode value1, final JsonNode value2) {
              return value2;
          }
      },
      Materialized.<String, JsonNode, WindowStore<Bytes, byte[]>>as("alerts-without-interventions")
  );

Облом - то, что это создаст поток журнала изменений с правильной семантикой, но вы все равно увидите промежуточные значения, поэтому вы не захотите запускать какие-либо действия прямо из этогоПоток либо (как foreach).

Одна вещь, которую вы могли бы сделать, это запланировать работу, один раз в день, для сканирования "alerts-without-interventions" для окон с вчера .Любой результат, который вы получите из хранилища окон, будет самым последним значением этого ключа.

KIP, который я готовлю, предложит вам способ отфильтровать промежуточные результаты из окна, что позволит вамприкрепить foreach к журналу изменений и запускать его только в конечном результате окна.

В качестве альтернативы, если данные для вашего приложения не слишком велики, и если вы не слишком беспокоитесь о крайних случаях, вы могли бы рассмотреть возможность реализации семантики «оконных событий» самостоятельно с помощью LinkedHashMap или кэша Guava.

Надеюсь, это поможет.

...