Как создать несколько разделов по Alpakka - PullRequest
0 голосов
/ 04 апреля 2019

Я пытаюсь создать простого продюсера, который создает тему с некоторыми разделами, предоставленными конфигурацией.

Согласно Документ настройки Alpakka Producer любое свойство из org.apache.kafka.clients.producer.ProducerConfig может быть установлено в секции kafka-clients. И есть свойство num.partitions, как прокомментировано в Документ API производителя .

Таким образом, я добавил это свойство в свой файл application.conf, как показано ниже:

topic = "topic"
topic = ${?TOPIC}

# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
  # Tuning parameter of how many sends that can run in parallel.
  parallelism = 100
  parallelism = ${?PARALLELISM}

  # Duration to wait for `KafkaConsumer.close` to finish.
  close-timeout = 20s

  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the producer stages. Some blocking may occur.
  # When this value is empty, the dispatcher configured for the stream
  # will be used.
  use-dispatcher = "akka.kafka.default-dispatcher"

  # The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
  eos-commit-interval = 100ms

  # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
  # can be defined in this configuration section.
  kafka-clients {
    bootstrap.servers = "my-kafka:9092"
    bootstrap.servers = ${?BOOTSTRAPSERVERS}
    num.partitions = "3"
    num.partitions = ${?NUM_PARTITIONS}
  }
}

Код приложения производителя также указан ниже:

object Main extends App {

  val config = ConfigFactory.load()

  implicit val system: ActorSystem = ActorSystem("producer")
  implicit val materializer: Materializer = ActorMaterializer()

  val producerConfigs = config.getConfig("akka.kafka.producer")
  val producerSettings = ProducerSettings(producerConfigs, new StringSerializer, new StringSerializer)

  val topic = config.getString("topic")

  val done: Future[Done] =
    Source(1 to 100000)
      .map(_.toString)
      .map(value => new ProducerRecord[String, String](topic, value))
      .runWith(Producer.plainSink(producerSettings))

  implicit val ec: ExecutionContextExecutor = system.dispatcher
  done onComplete  {
    case Success(_) => println("Done"); system.terminate()
    case Failure(err) => println(err.toString); system.terminate()
  }

}

Но это не работает. Producer создает тему с одним разделом вместо 3 разделов, как я установил в конфигурации:

num.partitions = "3"

Наконец, вывод Kafkacat приведен ниже:

~$ kafkacat -b my-kafka:9092 -L
Metadata for all topics (from broker -1: my-kafka:9092/bootstrap):
 3 brokers:
  broker 2 at my-kafka-2.my-kafka-headless.default:9092
  broker 1 at my-kafka-1.my-kafka-headless.default:9092
  broker 0 at my-kafka-0.my-kafka-headless.default:9092
 1 topics:
  topic "topic" with 1 partitions:
    partition 0, leader 2, replicas: 2, isrs: 2

Что не так? Можно ли установить свойства из API Kafka Producer в разделе kafka-clients с помощью Alpakka?

Ответы [ 2 ]

2 голосов
/ 04 апреля 2019

# Свойства, определенные org.apache.kafka.clients.producer.ProducerConfig

# можно определить в этом разделе конфигурации.

Как говорится, ProducerConfig для настроек производителя, а не настроек брокера, что и есть num.partitions (я думаю, что вы потеряли, в какой таблице свойство было показано в документах Apache Kafka ... прокрутите до сверху, чтобы увидеть правильный заголовок).

Невозможно установить разделы темы от производителя ... Вам нужно будет использовать класс AdminClient для создания темы, и количество разделов является параметром, а не свойством конфигурации.

Пример кода

val props = new Properties()
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")

val adminClient = AdminClient.create(props)

val numPartitions = 3
val replicationFactor = 3.toShort
val newTopic = new NewTopic("new-topic-name", numPartitions, replicationFactor)
val configs = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "gzip")
// settings some configs
newTopic.configs(configs.asJava)

adminClient.createTopics(List(newTopic).asJavaCollection)

И тогда вы можете запустить продюсера

1 голос
/ 04 апреля 2019

Похоже, что тема создается по умолчанию, что является поведением по умолчанию для Кафки. Если это так, вам нужно определить количество разделов по умолчанию в файле server.properties для вашего брокера.

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...