Kafka Connector HDFS Sink 5.3.1 не может произвести все записи JSON - PullRequest
0 голосов
/ 08 ноября 2019

USE CASE

Я читаю из уже созданной темы Kafka, в которой отдельный кластер создает некоторые ключи и значения. Моя конечная цель - записать HDFS в формате JSON, для чего я уже некоторое время экспериментирую с Kafka HDFS Sink 5.3. Проблема, с которой я сталкиваюсь, заключается в том, что я не могу принять и записать все записи из этой темы в HDFS. На данный момент, если моя тема содержит почасовые данные миллионов записей, я могу писать только в терминах только записей по 100 КБ.

Ниже приведены конфигурации, которые я использую для kafka-connect-standalone. .properties и мои HDFS quickstart-hdfs.properties

kafka-connect-standalone.properties

value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
schema.enable=false

offset.flush.interval.ms=10000

group.id=x-hdfs-consumer-group
consumer.session.timeout.ms=10000
consumer.heartbeat.interval.ms=3000
consumer.request.timeout.ms=1810000
consumer.max.poll.interval.ms=1800000

quickstart-hdfs.properties

name=hdfs-sink-mapr
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=10
topics=topic_name
hdfs.url=maprfs:///hive/x/poc_kafka_connect/
flush.size=20000
errors.tolerance=all 

format.class=io.confluent.connect.hdfs.json.JsonFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
timestamp.extractor=RecordField
timestamp.field=timestamp
partition.duration.ms=3600000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
locale=en
timezone=UTC

Если я не использую свойство errors.tolerance = all , тогда я просто создаю ~ 500 записей.

Что касается рабочих журналов, я не получаю никаких ошибок, поэтому я не уверен, что мне не хватает.

Поскольку я относительно новичок в Kafka Connector и имеюпытался что-то некоторое время, я был бы очень признателен, если бы кто-нибудь смог рассказать о том, что я делаю не так. То есть он работает нормально почти 2 дня, но через некоторое время перестал читать данные и ничего не выдает. Я запускаю его в автономном режиме, это может быть причиной? Я попытался описать группу потребителей, похоже, что все потребители умерли.

kafka/kafka_2.12-2.3.0/bin/kafka-consumer-groups.sh --bootstrap-server <server>:9092 --describe --group connect-ajay-hdfs-sink-mapr
GROUP                       TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
connect-ajay-hdfs-sink-mapr topic_name 21         1186755480      1187487551      732071          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 12         957021804       957736810       715006          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 17         957031965       957746941       714976          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 24         957496491       958212413       715922          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 0          956991807       957716202       724395          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 28         956940273       957668689       728416          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 5          957182822       957899308       716486          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 3          956974180       957695189       721009          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 19         956878365       957590196       711831          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 2          956968023       957685835       717812          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 16         957010175       957726139       715964          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 7          956900190       957624746       724556          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 8          957020325       957739604       719279          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 22         957064283       957788487       724204          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 29         957026931       957744496       717565          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 13         957400623       958129555       728932          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 6          956892063       957618485       726422          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 11         957117685       957841645       723960          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 1          957003873       957734649       730776          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 18         957007813       957734011       726198          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 27         957047658       957766131       718473          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 10         956975729       957689182       713453          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 15         957046441       957775251       728810          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 23         957011972       957727996       716024          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 14         957151628       957881644       730016          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 4          957118644       957845399       726755          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 9          957109152       957838497       729345          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 25         956923833       957646070       722237          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 26         957026885       957742112       715227          -               -               -
connect-ajay-hdfs-sink-mapr topic_name 20         957010071       957733605       723534          -               -               -

1 Ответ

0 голосов
/ 10 ноября 2019

Чтобы передать все существующие записи в теме в соединитель приемника, добавьте это в свойства рабочего и перезапустите соединение

consumer.auto.offset.reset=earliest

Если вы уже запустили соединитель, вам необходимолибо сбросьте свою группу потребителей, либо измените имя в конфигурации, чтобы создать новую группу

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...