Я немного играю с Kafka Streams и, исследуя WordCountProcessorDemo
, я понял, что, должно быть, была часть картины, которую мне не хватает.А именно, как библиотека гарантирует, что в следующем коде не может произойти грязного чтения:
@Override
public void process(final String dummy, final String line) {
final String[] words = line.toLowerCase(Locale.getDefault()).split(" ");
for (final String word : words) {
final Integer oldValue = this.kvStore.get(word);
if (oldValue == null) {
this.kvStore.put(word, 1);
} else {
this.kvStore.put(word, oldValue + 1);
}
}
context.commit();
}
Насколько я понимаю, после срабатывания kvStore.get(..)
состояние может измениться другим экземпляром StreamProcessor,живя на другой машине, потребляющей другой раздел.Поэтому, поскольку мы выполнили грязное чтение, состояние станет несовместимым.
Кафка-стрим как-то справляется с такой ситуацией?