как мы можем заставить сливную кафку подключить s3 к флешу - PullRequest
0 голосов
/ 08 июня 2018

Я настраиваю приемник kafka connect s3, продолжительность устанавливается на 1 час, а также я устанавливаю довольно большой счет сброса, скажем, 10 000.Теперь, если в канале kafka не так много сообщений, приемник s3 попытается буферизовать их в памяти и подождать, пока он накопится до счетчика сброса, затем загрузить их вместе и зафиксировать смещение в своей собственной группе потребителей.

Но подумайте об этой ситуации.Если на канале, я отправляю только 5000 сообщений.Тогда нет смыва s3.Затем через долгое время сообщение 5000 будет в конечном итоге выселено из кафки из-за времени удержания.Но эти сообщения все еще находятся в памяти приемника s3, а не в s3.Это очень опасно, например, если мы перезапустили приемник s3 или машина, на которой запущен приемник s3, просто выходит из строя.Затем мы потеряли эти 5000 сообщений.Мы не можем найти их снова из kafka, потому что он уже удален.

Произойдет ли это с приемником s3?Или есть какие-то настройки, которые заставляют его сбрасывать через некоторое время?

1 Ответ

0 голосов
/ 13 июня 2018

Если ваш поток из Кафки в S3 не имеет постоянного потока записей, вы можете использовать свойство

rotate.schedule.interval.ms

для очисткизаписи в запланированные интервалы.

Обратите внимание, что в случае повторной обработки ваша нисходящая система должна быть в состоянии справиться с дубликатами, если используется эта опция.Это связано с тем, что очистка таких записей на основе настенных часов может привести к появлению дубликатов в разных файлах, если для соединителя запланирован повторный экспорт записей из Kafka.

В качестве идентификатора, если вы используете свойство:

rotate.interval.ms

с экстрактором отметок времени Wallclock (timestamp.extractor=Wallclock), ваши записи будут сброшены без установки rotate.schedule.interval.ms.Но это означает, что ваш разделитель зависит от настенных часов, и поэтому вы должны иметь возможность учитывать дубликаты записей.

Соединитель может предлагать точную однократную доставку в постоянном потоке записей с детерминированными разделителями и имеет различные экстракторы временных меток, например, такую, которая зависит от временной метки записи (Record) или временной метки поля(RecordField).

Свойства конфигурации для разбиения здесь

...