Kafka Producer вызывает org. apache .kafka.common.network.InvalidReceiveException: Неверное получение - PullRequest
1 голос
/ 14 апреля 2020

Я использую 3-zookeeper-cluster и 3-kafka-cluster на Kubernetes.
Кажется, что Kafka работает.
Однако, если я выдаю какое-то сообщение для topi c и проверяю topi c, сообщения вообще нет.

Вот мое высказывание брокера. Это говорит о недопустимом получении или о чем-то, что забавно - пытаться заставить темы работать хорошо, но производить.
также я мог смотреть темы или схемы, которые я создал ранее на Topics-ui, который является GUI инструментом для брокера.
Schema-registry, Connect, лог Реста в порядке, так что у брокера вроде бы все хорошо.

org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at kafka.network.Processor.poll(SocketServer.scala:863)
    at kafka.network.Processor.run(SocketServer.scala:762)
    at java.lang.Thread.run(Thread.java:748)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at kafka.network.Processor.poll(SocketServer.scala:863)
    at kafka.network.Processor.run(SocketServer.scala:762)
    at java.lang.Thread.run(Thread.java:748)

и вот мои конфигурации брокера с terraform Statefulset

          port {
            container_port = 9092
          }   

          env {
            name = "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR" 
            value = "3" 
          }   

          env {
            name = "KAFKA_DEFAULT_REPLICATION_FACTOR" 
            value = "3" 
          }   

          env {
            name = "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP" 
            value = "PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT"
          }   

          env {
            name = "KAFKA_ZOOKEEPER_CONNECT"
            value = "lucent-zookeeper-0.zookeeper-service.default:2181,lucent-zookeeper-1.zookeeper-service.default:2181,lucent-zookeeper-2.zookeeper-service.default:2181"
          }   

          env {
            name = "POD_IP"

            value_from {
              field_ref {
                field_path = "status.podIP"
              }   
            }   
          }   

          env {
            name = "HOST_IP"
            value_from {
              field_ref {
                field_path = "status.hostIP"
              }   
            }   
          }   

          env {
            name = "POD_NAME"

            value_from {
              field_ref {
                field_path = "metadata.name"
              }   
            }   
          }   

          env {
            name = "POD_NAMESPACE"

            value_from {
              field_ref {
                field_path = "metadata.namespace"
              }   
            }   
          }   

          command = [ 
            "sh",
            "-exec",
            "export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$${POD_NAME}.kafka-service.$${POD_NAMESPACE}:9092 && export KAFKA_BROKER_ID=$${HOSTNAME##*-} && exec /etc/confluent/docker/run"
          ]   

service

resource "kubernetes_service" "kafka-service" {
  metadata {
    name = "kafka-service"

    labels = {
      app = "broker" 
    }
  }

  spec {
    selector = {
      app = "broker"
    }

    port {
      port = 9092
    }

    cluster_ip = "None"
  }

код, чтобы попытаться произвести

kafka-console-producer --broker-list kafka-service:9092 --topic test

1 Ответ

2 голосов
/ 14 апреля 2020

Мое первоначальное предположение: вы, возможно, пытаетесь получить слишком большой запрос. Максимальный размер - это размер по умолчанию для socket.request.max.bytes, который составляет 100 МБ. Поэтому, если у вас есть сообщение размером более 100 МБ, попробуйте увеличить значение этой переменной до server.properties.

...