Потребитель Kafka не опрашивает записи периодически - PullRequest
0 голосов
/ 03 марта 2020

Я написал простую утилиту в scala для чтения сообщения kafka как байтового массива.

Утилита работает на одной машине, но не на другой. Оба находятся на одной и той же ОС (centos 7) и на одном и том же сервере kafka (который находится на другом компьютере вместе).

Однако Kafka Tool (www.kafkatool.com) работает на компьютере, на котором нет утилиты - так что его маловероятная проблема доступности.

Ниже приведена сущность кода потребителя:

import java.io.BufferedOutputStream
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer

val outputFile = "output.txt"
val topic = "test_topic"
val server = "localhost:9092"
val id = "record-tool"

val props = new Properties()
props.put("bootstrap.servers", server)
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
props.put("auto.offset.reset", "earliest")
props.put("enable.auto.commit", "false")
props.put("max.partition.fetch.bytes", "104857600")
props.put("group.id", id)

val bos = new BufferedOutputStream(new FileOutputStream(outputFile))
val consumer = new KafkaConsumer[String, Array[Byte]](props)
consumer.subscribe(Seq(topic).asJava)
Stream.continually(consumer.poll(5000).asScala.toList).takeWhile(_.nonEmpty).flatten.foreach(c => bos.write(c.value)))
consumer.close
bos.close

Я также не вижу никаких ошибок в журналах, ниже приведен журнал отладки

[root@vm util]# bin/record-tool consume --server=kafka-server:9092 --topic=test_topic --asBin --debug

16:44:02.548 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
    metric.reporters = []
    metadata.max.age.ms = 300000
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    max.partition.fetch.bytes = 104857600
    bootstrap.servers = [kafka-server:9092]
    ssl.keystore.type = JKS
    enable.auto.commit = false
    sasl.mechanism = GSSAPI
    interceptor.classes = null
    exclude.internal.topics = true
    ssl.truststore.password = null
    client.id = 
    ssl.endpoint.identification.algorithm = null
    max.poll.records = 2147483647
    check.crcs = true
    request.timeout.ms = 40000
    heartbeat.interval.ms = 3000
    auto.commit.interval.ms = 5000
    receive.buffer.bytes = 65536
    ssl.truststore.type = JKS
    ssl.truststore.location = null
    ssl.keystore.password = null
    fetch.min.bytes = 1
    send.buffer.bytes = 131072
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    group.id = record-tool
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    ssl.key.password = null
    fetch.max.wait.ms = 500
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    session.timeout.ms = 30000
    metrics.num.samples = 2
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    auto.offset.reset = earliest

16:44:02.550 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Starting the Kafka consumer
16:44:02.621 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [kafka-server:9092 (id: -1 rack: null)], partitions = [])
16:44:02.632 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:
16:44:02.636 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:
16:44:02.637 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:
16:44:02.637 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:
16:44:02.638 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:
16:44:02.638 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:
16:44:02.639 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:
16:44:02.649 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
    metric.reporters = []
    metadata.max.age.ms = 300000
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    max.partition.fetch.bytes = 104857600
    bootstrap.servers = [kafka-server:9092]
    ssl.keystore.type = JKS
    enable.auto.commit = false
    sasl.mechanism = GSSAPI
    interceptor.classes = null
    exclude.internal.topics = true
    ssl.truststore.password = null
    client.id = consumer-1
    ssl.endpoint.identification.algorithm = null
    max.poll.records = 2147483647
    check.crcs = true
    request.timeout.ms = 40000
    heartbeat.interval.ms = 3000
    auto.commit.interval.ms = 5000
    receive.buffer.bytes = 65536
    ssl.truststore.type = JKS
    ssl.truststore.location = null
    ssl.keystore.password = null
    fetch.min.bytes = 1
    send.buffer.bytes = 131072
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    group.id = record-tool
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    ssl.key.password = null
    fetch.max.wait.ms = 500
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    session.timeout.ms = 30000
    metrics.num.samples = 2
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    auto.offset.reset = earliest

16:44:02.657 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency
16:44:02.657 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name join-latency
16:44:02.657 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name sync-latency
16:44:02.659 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name commit-latency
16:44:02.663 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched
16:44:02.664 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-fetched
16:44:02.664 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-latency
16:44:02.664 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-lag
16:44:02.664 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time
16:44:02.666 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.0.1
16:44:02.666 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : a7a17cdec9eaa6c5
16:44:02.668 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created
16:44:02.680 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s): test_topic
16:44:02.681 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending coordinator request for group record-tool to broker kafka-server:9092 (id: -1 rack: null)
16:44:02.695 [main] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at kafka-server:9092.
16:44:02.816 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
16:44:02.817 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received
16:44:02.818 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
16:44:02.820 [main] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1
16:44:02.902 [main] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request {topics=[test_topic]} to node -1
16:44:02.981 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(nodes = [kafka-server.mydomain.com:9092 (id: 0 rack: null)], partitions = [Partition(topic = test_topic, partition = 0, leader = 0, replicas = [0,], isr = [0,]])
16:44:02.982 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received group coordinator response ClientResponse(receivedTimeMs=1583225042982, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@434a63ab, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, body={group_id=record-tool}), createdTimeMs=1583225042692, sendTimeMs=1583225042904), responseBody={error_code=0,coordinator={node_id=0,host=kafka-server.mydomain.com,port=9092}})
16:44:02.983 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered coordinator kafka-server.mydomain.com:9092 (id: 2147483647 rack: null) for group record-tool.
16:44:02.983 [main] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 2147483647 at kafka-server.mydomain.com:9092.
16:44:02.986 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking previously assigned partitions [] for group record-tool
16:44:02.986 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group record-tool
16:44:02.988 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup ({group_id=record-tool,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=25 cap=25]}]}) to coordinator kafka-server.mydomain.com:9092 (id: 2147483647 rack: null)
16:44:03.051 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.bytes-sent
16:44:03.051 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.bytes-received
16:44:03.052 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.latency
16:44:03.052 [main] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 2147483647
16:44:03.123 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received successful join group response for group record-tool: {error_code=0,generation_id=3,group_protocol=range,leader_id=consumer-1-f82633ab-a06e-4474-8ddb-1ec096d6c7f2,member_id=consumer-1-f82633ab-a06e-4474-8ddb-1ec096d6c7f2,members=[{member_id=consumer-1-f82633ab-a06e-4474-8ddb-1ec096d6c7f2,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=25 cap=25]}]}
16:44:03.123 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Performing assignment for group record-tool using strategy range with subscriptions {consumer-1-f82633ab-a06e-4474-8ddb-1ec096d6c7f2=Subscription(topics=[test_topic])}
16:44:03.124 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Finished assignment for group record-tool: {consumer-1-f82633ab-a06e-4474-8ddb-1ec096d6c7f2=Assignment(partitions=[test_topic-0])}
16:44:03.124 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending leader SyncGroup for group record-tool to coordinator kafka-server.mydomain.com:9092 (id: 2147483647 rack: null): {group_id=record-tool,generation_id=3,member_id=consumer-1-f82633ab-a06e-4474-8ddb-1ec096d6c7f2,group_assignment=[{member_id=consumer-1-f82633ab-a06e-4474-8ddb-1ec096d6c7f2,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=33 cap=33]}]}
16:44:03.198 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Successfully joined group record-tool with generation 3
16:44:03.199 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [test_topic-0] for group record-tool
16:44:03.200 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group record-tool fetching committed offsets for partitions: [test_topic-0]
16:44:03.268 [main] DEBUG org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Group record-tool has no committed offset for partition test_topic-0
16:44:03.269 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Resetting offset for partition test_topic-0 to earliest offset.
16:44:03.270 [main] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at kafka-server.mydomain.com:9092.
16:44:03.336 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent
16:44:03.337 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-received
16:44:03.337 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.latency
16:44:03.338 [main] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 0
16:44:03.407 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - Fetched offset 0 for partition test_topic-0
16:44:06.288 [main] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Received successful heartbeat response for group record-tool
16:44:07.702 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:
16:44:07.702 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:
16:44:07.702 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:
16:44:07.702 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:
16:44:07.703 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:
16:44:07.703 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:
16:44:07.704 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:
16:44:07.704 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent
16:44:07.705 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received
16:44:07.705 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency
16:44:07.705 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-2147483647.bytes-sent
16:44:07.706 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-2147483647.bytes-received
16:44:07.706 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-2147483647.latency
16:44:07.706 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-sent
16:44:07.707 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-received
16:44:07.707 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.latency
16:44:07.707 [main] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - The Kafka consumer has closed.

Что я заметил в takeWhile(_.nonEmpty), так это то, что список пуст.

Есть ли ошибка в коде? Спасибо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...