У меня есть требование, где мне нужно создать файл, указывающий c для каждого раздела. Я увидел, что раздел, на который отправляются сообщения, не соответствует точному файлу в формате hdf. Как назначаются разделы у потребителя?
Производитель:
def send(key: String, value: String): Unit = {
val recordMetadataFuture = kafkaProducer.send(new ProducerRecord[String, String](topic, key, value))
try {
val recordMetadata = recordMetadataFuture.get
println("Topic " + recordMetadata.topic + " Offset " + recordMetadata.offset + "Partition " + recordMetadata.partition + " timeStamp" + recordMetadata.timestamp)
} catch {
case ex: Exception => {
ex.printStackTrace
}
}
}
Потребитель:
streamedEvents.writeStream.foreach(new ForeachWriter[Message] {
var fSDataOutputStream: FSDataOutputStream = _
var partition: Long = _
def open(partitionId: Long, version: Long): Boolean = {
println("Partition id ::" + partitionId + " version :: " + version)
val configuration = new Configuration();
val hdfs = FileSystem.get(new URI("URI"), configuration);
val path = new Path("Path" + partitionId + ".txt");
val fs = path.getFileSystem(configuration)
if (fs.exists(path)) {
fSDataOutputStream = fs.append(path)
} else {
fSDataOutputStream = fs.create(path)
}
partition = partitionId
true
}
def process(e: Message) = {
val message = e.message
val messageKey = e.messageKey
val partition = e.partition
val offset = e.offset
val eventData = e.getMessage();
var eventMessage = getObjectMapper.readValue(eventData, classOf[EventMessage])
fSDataOutputStream.write(getObjectMapper.writeValueAsString(eventMessage.getBytes)
}
def close(errorOrNull: Throwable): Unit = {
fSDataOutputStream.close()
}
}