Kafka Streams Управление внутренними данными - PullRequest
0 голосов
/ 10 мая 2018

В моей компании мы широко используем Kafka, но мы используем реляционную базу данных для хранения результатов нескольких промежуточных преобразований и агрегаций по причинам отказоустойчивости. Сейчас мы исследуем Kafka Streams как более естественный способ сделать это. Часто наши потребности довольно просты - один такой случай

  • Прослушивание входной очереди <K1,V1>, <K2,V2>, <K1,V2>, <K1,V3>...
  • Для каждой записи выполнить некоторую операцию с высокой задержкой (вызвать удаленную службу)
  • Если к моменту обработки <K1,V1>, и оба <K1,V2>, <K1,V3> были произведены, то я должен обработать V3, так как V2 уже устарел

Для этого я читаю тему как KTable. Код выглядит ниже

KStreamBuilder builder = new KStreamBuilder();
KTable<String, String> kTable = builder.table("input-topic");
kTable.toStream().foreach((K,V) -> client.post(V));
return builder;

Это работает, как и ожидалось, но мне не ясно, как Кафка достигает этого автоматически. Я предполагал, что Кафка создает внутренние темы для достижения этой цели, но я не вижу созданных внутренних тем. Javadoc для метода , кажется, подтверждает это наблюдение. Но затем я наткнулся на эту официальную страницу , которая, кажется, предполагает, что Kafka использует отдельное хранилище данных, также называемое RocksDB, вместе с темой журнала изменений.

Теперь я запутался, так как при каких обстоятельствах создаются темы журнала изменений. Мои вопросы

  1. Если поведение хранилища состояний по умолчанию является отказоустойчивым, как предполагает официальная страница, то где хранится это состояние? В RocksDB? В теме журнала изменений или в обоих?
  2. Каковы последствия использования RocksDB в производстве? (Edited)
    1. Как я понял, зависимость от rocksdb прозрачна (просто файл jar), а rockdb хранит данные в локальной файловой системе. Но это также означает, что в нашем случае это приложение будет хранить копию защищенных данных в хранилище, где выполняется приложение. Когда мы заменяем удаленную базу данных на KTable, это влияет на хранилище, и это моя точка зрения.
    2. Будут ли релизы Kafka заботиться о том, чтобы RocksDB продолжал работать на различных платформах? (Поскольку это, кажется, зависит от платформы и не написано на Java)
  3. Имеет ли смысл сокращать журнал входных тем?

Я использую v. 0.11.0

1 Ответ

0 голосов
/ 10 мая 2018
  1. Kafka Streams хранит состояние локально. По умолчанию используется RocksDB. Однако местное государство эфемерно. Для обеспечения отказоустойчивости все обновления магазина также записываются в раздел журнала изменений. Это позволяет перестроить и / или перенести хранилище в случае сбоя или увеличения / уменьшения масштаба. Для вашего особого случая тема журнала изменений не создается, поскольку KTable не является результатом агрегации, а заполняется непосредственно из темы - это только оптимизация. Поскольку тема журнала изменений будет содержать те же данные, что и тема ввода, Kafka Streams избегает дублирования данных и использует тему ввода как тему журнала изменений в случае возникновения ошибки.

  2. Не совсем точно, что вы подразумеваете под этим вопросом. Обратите внимание, что RocksDB считается эфемерным магазином. Он используется по умолчанию по разным причинам, как описано здесь: Почему Apache Kafka Streams использует RocksDB и если как это можно изменить? (например, он позволяет удерживать состояние больше, чем основная память, так как он может разлиться на диск). Вы можете заменить RocksDB любым другим магазином. Kafka Streams также поставляется с хранилищем в памяти. (Edit)

    1. Это верно. Вам необходимо соответствующим образом подготовить свое приложение, чтобы иметь возможность хранить локальный фрагмент общего состояния. Для этого есть руководство по размерам: https://docs.confluent.io/current/streams/sizing.html

    2. RocksDB написан на C ++ и включен через привязку JNI. В Windows есть некоторые известные проблемы, поскольку RocksDB не предоставляет предварительно скомпилированные двоичные файлы для всех версий Windows. Пока вы остаетесь на платформе Linux, она должна работать. Сообщество Kafka проводит тесты обновления для RocksDB, чтобы убедиться в его совместимости.

  3. Да. Kafka Streams фактически предполагает, что тема ввода для операции table() сжата в журнале. В противном случае существует риск потери данных в случае сбоя. (Edit)

    1. Если вы включите сжатие журнала, настройка времени хранения игнорируется. Таким образом, да, последнее обновление будет сохраняться вечно (или до тех пор, пока не будет написано надгробное сообщение со значением = null). Обратите внимание, что когда сжатие выполняется на стороне посредника, старые данные собираются сборщиком мусора, и, таким образом, при восстановлении читается только новая версия для каждого ключа - старые версии удаляются в процессе сжатия. Если вас не интересуют какие-либо данные по прошествии некоторого времени, вам нужно написать надгробную плиту в исходной теме, чтобы она работала.
...