Возможно ли иметь два экземпляра одного и того же искрового задания для обработки темы Кафки? - PullRequest
0 голосов
/ 23 мая 2018

У меня есть работа по обогащению данных, и я обогащаю свои данные из источника данных (одна тема kafka), а затем публикую в другой приемник (тема kafka) после обработки.Само задание связано с вводом-выводом, и скорость обработки не увеличивается линейно, когда я добавляю больше ЦП / памяти.Должен ли я горизонтально масштабировать задания с двумя или тремя экземплярами и позволить им использовать одну и ту же тему Kafka с одинаковым groupId?

Сложность в том, что в настоящее время мои коды поддерживают смещение обработки в Kafka Broker.Поэтому ниже будут коды, добавляющие дополнительные параметры, чтобы определить, какие разделы загружать в RDD, и позволить высокому уровню решить, какое смещение им нужно прочитать.И это выглядит не так хорошо.

  private def loadRdd[T:ClassTag](maxMessages: Long = 0, messageFormatter: ((String, String)) => T)
                             (implicit inputConfig: Config): (RDD[T], Unit => Unit, Boolean) = {
val brokersConnectionString = Try(inputConfig.getString("brokersConnectionString")).getOrElse(throw new RuntimeException("Fail to retrieve the broker connection string."))
val topic                   = inputConfig.getString("topic")
val groupId                 = inputConfig.getString("groupId")
val retriesAttempts         = Try(inputConfig.getInt("retries.attempts")).getOrElse(SparkKafkaProviderUtilsFunctions.DEFAULT_RETRY_ATTEMPTS)
val retriesDelay            = Try(inputConfig.getInt("retries.delay")).getOrElse(SparkKafkaProviderUtilsFunctions.DEFAULT_RETRY_DELAY) * 1000

val topicOffsetRanges = KafkaClusterUtils.getTopicOffsetRanges(inputConfig, topic, SparkKafkaProviderUtilsFunctions.getDebugLogger(inputConfig)).toList
                                         .map { case (partitionId, (minOffset, maxOffset)) => OffsetRange(topic, partitionId, minOffset, maxOffset) }.toArray

val (offsetRanges, readAllAvailableMessages) = restrictOffsetRanges(topicOffsetRanges, maxMessages)

val rdd: RDD[ConsumerRecord[String, String]] = RetryUtils.retryOrDie(retriesAttempts, retryDelay = retriesDelay, loopFn = {SparkLogger.warn("Failed to create Spark RDD, retrying...")},
                                failureFn = { SparkLogger.warn("Failed to create Spark RDD, giving up...")})(
  KafkaUtils.createRDD(sc, KafkaClusterUtils.getKafkaConsumerParameters(brokersConnectionString, groupId), offsetRanges, LocationStrategies.PreferConsistent))

(rdd.map(pair => messageFormatter(pair.key(), pair.value())), Unit => commitOffsets(offsetRanges, inputConfig), readAllAvailableMessages)

}

Должен ли я продолжить в этом направлении и позволить высокому уровню решить, какие разделы принадлежат какому экземпляру задания?Или я должен просто увеличить масштаб с добавлением дополнительных ресурсов процессора и памяти?

Спасибо!

...