Соответствует ли идентификатор раздела Kafka Topi c, которому было отправлено сообщение, идентификатору раздела открытого метода в ForeachWriter? - PullRequest
1 голос
/ 02 апреля 2020

У меня есть требование, где мне нужно создать файл, указывающий c для каждого раздела. Я увидел, что раздел, на который отправляются сообщения, не соответствует точному файлу в формате hdf. Как назначаются разделы у потребителя?

Производитель:

def send(key: String, value: String): Unit = {
    val recordMetadataFuture = kafkaProducer.send(new ProducerRecord[String, String](topic, key, value))
    try {
      val recordMetadata = recordMetadataFuture.get
      println("Topic " + recordMetadata.topic + " Offset " + recordMetadata.offset + "Partition " + recordMetadata.partition + " timeStamp" + recordMetadata.timestamp)
    } catch {
      case ex: Exception => {
        ex.printStackTrace
      }
    }
  }

Потребитель:

streamedEvents.writeStream.foreach(new ForeachWriter[Message] {
    var fSDataOutputStream: FSDataOutputStream = _
    var partition: Long = _
    def open(partitionId: Long, version: Long): Boolean = {
      println("Partition id ::" + partitionId + " version :: " + version)
      val configuration = new Configuration();
      val hdfs = FileSystem.get(new URI("URI"), configuration);
      val path = new Path("Path" + partitionId + ".txt");
      val fs = path.getFileSystem(configuration)
      if (fs.exists(path)) {
        fSDataOutputStream = fs.append(path)
      } else {
        fSDataOutputStream = fs.create(path)
      }
      partition = partitionId
      true
    }

    def process(e: Message) = {
      val message = e.message
      val messageKey = e.messageKey
      val partition = e.partition
      val offset = e.offset
      val eventData = e.getMessage();

      var eventMessage = getObjectMapper.readValue(eventData, classOf[EventMessage])

      fSDataOutputStream.write(getObjectMapper.writeValueAsString(eventMessage.getBytes)
    }

    def close(errorOrNull: Throwable): Unit = {
      fSDataOutputStream.close()
    }
  }

1 Ответ

0 голосов
/ 02 апреля 2020

Как разделы назначаются в потребителе?

В Kafka у вас есть полный контроль над тем, как создавать и принимать сообщения от разделов topi c.

Для производителя стратегия разделения основана на ключе сообщения. По умолчанию он вычисляет hash(key) % number_of_partitions и распределяет сообщения по разделам. Если ключ не указан, сообщения отправляются в разделы на основе циклического перебора. Кроме того, вы также можете написать и предоставить свой пользовательский разделитель класс.

Для потребителя вы можете настроить partition.assignment.strategy в Consumer Configuration . По умолчанию это класс org.apache.kafka.clients.consumer.RangeAssignor

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...