Kafka Streams KGroupedTable.count () возвращает отрицательное значение.Как это возможно? - PullRequest
0 голосов
/ 08 февраля 2019

KGroupedTable.count () возвращает отрицательные значения?

idAndJobTransaction
                .filter((k,v) -> v!=null)
                .mapValues(jobTransaction -> {
                    jobTransaction.setCount(0);
                    jobTransaction.setId(0L);
                    jobTransaction.setRunsheet_id(0L);
                    jobTransaction.setTimestamp(0L);
                    if(jobTransaction.getDelete_flag() == 1)
                        return null;
                    else
                        return jobTransaction;
                } )
                .groupBy((id,jobTransaction)->new KeyValue<>(jobTransaction,jobTransaction),Serialized.with(jobTransactionSerde,jobTransactionSerde))
                .count()
                .toStream()
                .mapValues((k,v)-> new JobSummary(k,v))
                .peek((k,v)->{
                    log.info(k.toString());
                    log.info(v.toString());
                }).selectKey((k,v)-> v.getCompany_id())  // So that the count is consumed in order for each company
                .to(JOB_SUMMARY,Produced.with(Serdes.Long(),jobSummarySerde));

Метод подсчета иногда возвращает отрицательные значения.Около 1% процентов значений являются отрицательными.Как это возможно?

РЕДАКТИРОВАТЬ 1:

Я помещаю результаты этой агрегации в таблицу Postgres.Отрицательные значения не ограничены -1, но доходят до очень высоких значений.

Я использую 2 потребителя.Имеет ли это какое-то значение?

Может ли это быть проблемой с потоками Кафки?или я должен искать другие возможные причины?

enter image description here

РЕДАКТИРОВАТЬ 3: мне удалось захватить некоторые из доступных журналов, и я действительно увидел отрицательные значенияв кратчайшие сроки:

enter image description here

Что касается класса JobSummary, то это действительно очень простой класс POJO.Вот конструктор, вызываемый в приложении KStream.

  public JobSummary(JobTransaction j, Long count){
    this.setUser_id(j.getUser_id());
    this.setHub_id(j.getHub_id());
    this.setCity_id(j.getCity_id());
    this.setCompany_id(j.getCompany_id());
    this.setJob_master_id(j.getJob_master_id());
    this.setJob_status_id(j.getJob_status_id());
    this.setCount(count);
    this.setDate(j.getDate());
}

1 Ответ

0 голосов
/ 08 февраля 2019

Я думаю (это единственное объяснение, которое я могу придумать), что это особый случай.Сначала вы должны понять, как агрегация KTable работает внутри.Это объясняется на другом вопросе: TopologyTestDriver отправляет неверное сообщение на агрегаты KTable

С этим фоном может произойти отрицательное число, если текущее число в таблице результатов равно нулю, иисходная базовая таблица (т. е. idAndJobTransaction) получает идемпотентное обновление (т. е. запись в базовой таблице обновляется с <K,V> до <K,V>. Это приведет к одному вычитанию и одной записи сложения, которые идут вта же строка в таблице результатов (обратите внимание, что Kafka Streams не сравнивает старое и новое значение при обновлении таблицы и вслепую предполагает, что оба они различны) .Также, запись вычитания и сложения отправляется в нисходящем направлении независимо, а нисходящий count() обновляет свойрезультат в два этапа. Таким образом, число в таблице результатов изменяется от 0 до -1 при обработке записи вычитания и возвращается от -1 до 0 при обработке записи сложения.

...