У меня есть работа по обогащению данных, и я обогащаю свои данные из источника данных (одна тема kafka), а затем публикую в другой приемник (тема kafka) после обработки.Само задание связано с вводом-выводом, и скорость обработки не увеличивается линейно, когда я добавляю больше ЦП / памяти.Должен ли я горизонтально масштабировать задания с двумя или тремя экземплярами и позволить им использовать одну и ту же тему Kafka с одинаковым groupId?
Сложность в том, что в настоящее время мои коды поддерживают смещение обработки в Kafka Broker.Поэтому ниже будут коды, добавляющие дополнительные параметры, чтобы определить, какие разделы загружать в RDD, и позволить высокому уровню решить, какое смещение им нужно прочитать.И это выглядит не так хорошо.
private def loadRdd[T:ClassTag](maxMessages: Long = 0, messageFormatter: ((String, String)) => T)
(implicit inputConfig: Config): (RDD[T], Unit => Unit, Boolean) = {
val brokersConnectionString = Try(inputConfig.getString("brokersConnectionString")).getOrElse(throw new RuntimeException("Fail to retrieve the broker connection string."))
val topic = inputConfig.getString("topic")
val groupId = inputConfig.getString("groupId")
val retriesAttempts = Try(inputConfig.getInt("retries.attempts")).getOrElse(SparkKafkaProviderUtilsFunctions.DEFAULT_RETRY_ATTEMPTS)
val retriesDelay = Try(inputConfig.getInt("retries.delay")).getOrElse(SparkKafkaProviderUtilsFunctions.DEFAULT_RETRY_DELAY) * 1000
val topicOffsetRanges = KafkaClusterUtils.getTopicOffsetRanges(inputConfig, topic, SparkKafkaProviderUtilsFunctions.getDebugLogger(inputConfig)).toList
.map { case (partitionId, (minOffset, maxOffset)) => OffsetRange(topic, partitionId, minOffset, maxOffset) }.toArray
val (offsetRanges, readAllAvailableMessages) = restrictOffsetRanges(topicOffsetRanges, maxMessages)
val rdd: RDD[ConsumerRecord[String, String]] = RetryUtils.retryOrDie(retriesAttempts, retryDelay = retriesDelay, loopFn = {SparkLogger.warn("Failed to create Spark RDD, retrying...")},
failureFn = { SparkLogger.warn("Failed to create Spark RDD, giving up...")})(
KafkaUtils.createRDD(sc, KafkaClusterUtils.getKafkaConsumerParameters(brokersConnectionString, groupId), offsetRanges, LocationStrategies.PreferConsistent))
(rdd.map(pair => messageFormatter(pair.key(), pair.value())), Unit => commitOffsets(offsetRanges, inputConfig), readAllAvailableMessages)
}
Должен ли я продолжить в этом направлении и позволить высокому уровню решить, какие разделы принадлежат какому экземпляру задания?Или я должен просто увеличить масштаб с добавлением дополнительных ресурсов процессора и памяти?
Спасибо!