Кафка Производитель не пишет в кафку тему - PullRequest
0 голосов
/ 21 ноября 2018

извините за вопрос noob: я пишу в kafka, используя akka, но не вижу его в приемнике консоли kafka.

Конфигурация для записи в kafka:

kafka {
  bootstrap.servers = "localhost:9002"
  auto.offset.reset = "earliest"
}

У меня естькод для записи в тему kafka с помощью akka:

class ServiceKafkaProducer(topicName: String, actorSystem: ActorSystem, configuration: Configuration) {
  val bootstrapServers: String = configuration
    .getString("kafka.bootstrap.servers")
    .getOrElse(
      throw new Exception("No config element foe kafka.bootstrap.servers")
    )

  val producerSettings: ProducerSettings[String, String] = ProducerSettings(
    actorSystem,
    new StringSerializer,
    new StringSerializer
  ).withBootstrapServers(bootstrapServers)

  val producer: KafkaProducer[String, String] = producerSettings.createKafkaProducer()

  def send(logRecordStr: String): Unit = {
    Logger.debug(s"Inside ServiceKafkaProducer, writing to $topicName")
    Logger.debug(logRecordStr)
    producer.send(
      new ProducerRecord(topicName, logRecordStr)
    )
  }
}

 def createTag(text: String, createdBy: UUID): Unit = {
  Logger.debug("Inside TagEventProducer#createTag")
  val tagId = UUID.randomUUID()

  val event = TagCreated(tagId, text, createdBy)
  println(event)
  val record = createLogRecord(event)

  send(record.encode)
}

LOGS

```[debug] - application - Inside TagEventProducer#createTag
TagCreated(d393d223-9eb6-45e3-8610-56a3f65c84cc,scala,f5b61ca0-0ccc-4064-94c1-cba2a5a4087b)
[debug] - application - Inside ServiceKafkaProducer, writing to tags
[debug] - application - {"id":"ed27f0d1-6b6c-469b-af97-1929dc6a5cc7","action":"tag-created","data":{"id":"d393d223-9eb6-45e3-8610-56a3f65c84cc","text":"scala","createdBy":"f5b61ca0-0ccc-4064-94c1-cba2a5a4087b"},"timestamp":1542776716868}```

(отредактировано) Я запускаю kafka с использованием образа dotify spotify следующим образом:

version: '3.5'
services:
  kafka:
    image: 'spotify/kafka'
    hostname: kafka
    environment:
      - ADVERTISED_HOST=kafka
      - ADVERTISED_PORT=9092
    ports:
      - "9092:9092"
      - "2181:2181"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    networks:
      - kafka_net
  kafkaManager:
    image: 'sheepkiller/kafka-manager'
    environment:
      - ZK_HOSTS=kafka:2181
      - APPLICATION_SECRET=letmein
    ports:
      - "8000:8000"
    networks:
      - kafka_net
networks:
  kafka_net:
    name: my_network

Ответы [ 3 ]

0 голосов
/ 21 ноября 2018

Вы уверены, что ваша конфигурация для записи в Kafka выглядит следующим образом?

bootstrap.servers = "localhost:9002"
auto.offset.reset = "earliest"

Я думаю, что это должно быть localhost:9092.Возможно, вы совершили опечатку здесь.

0 голосов
/ 21 ноября 2018

Установите kafka.bootstrap.servers в {kafka_docker_host_api}:9092.И если ваша проблема не будет решена, попробуйте использовать producer.flush().Производитель Kafka не отправляет сообщения сразу.Поэтому, если вы хотите форсировать отправку сообщений, вам необходимо выполнить сброс.

0 голосов
/ 21 ноября 2018

Прежде всего, вы используете неправильный порт (9002 вместо 9092) в производителе.Попробуйте использовать bootstrap.servers = kafka:9092 вместо localhost:9002 в производителе, потому что ваш рекламируемый хост установлен как kafka

...