Как включить Kafka Producer Metrics в Spark? - PullRequest
0 голосов
/ 17 мая 2018

Мы используем Kafka 0.10 со Spark 2.1, и я обнаружил, что публикация нашего производителя всегда выполнялась медленно. Я могу достичь скорости около 1 к / с после того, как отдаю 8 ядер исполнителям Spark, в то время как в других постах говорилось, что машина легко достигает миллионов в секунду. Я попытался настроить linger.ms и batch.size, чтобы выяснить это. Однако я обнаружил, что linger.ms = 0 выглядит оптимально для меня, и batch.size не дает особого эффекта. И я отправлял 160 тыс. Событий за итерацию. Похоже, я должен позволить метрикам производителя Kafka точно знать, что именно происходит. Но, похоже, включить его в Spark Executor нелегко.

Может ли кто-нибудь поделиться со мной светом?

Мои коды такие:

private def publishMessagesAttempt(producer: KafkaProducer[String, String], topic: String, messages: Iterable[(String, String)], producerMaxDelay: Long,
                                 individualMessageMaxDelay: Long, logger: (String, Boolean) => Unit = KafkaClusterUtils.DEFAULT_LOGGER): Iterable[(String, String)] = {
val futureMessages = messages.map(message => (message, producer.send(new ProducerRecord[String, String](topic, message._1, message._2))))
val messageSentTime = System.currentTimeMillis
val awaitedResults = futureMessages.map { case (message, future) =>
  val waitFor = Math.max(producerMaxDelay - (System.currentTimeMillis - messageSentTime), individualMessageMaxDelay)
  val failed = Try(future.get(waitFor, TimeUnit.MILLISECONDS)) match {
    case Success(_) => false
    case Failure(f) =>
      logger(s"Error happened when publish to Kafka: ${f.getStackTraceString}", true)
      true
  }
  (message, failed)
}
awaitedResults.filter(_._2).map(_._1)
}

1 Ответ

0 голосов
/ 23 мая 2018

Я наконец-то нашел ответ. 1. KafkaProducer имеет функцию metrics (), которая может получать метрики производителя. Достаточно просто напечатать, этого должно быть достаточно.

Некоторые коды, как это должно работать:

public class MetricsProducerReporter implements Runnable {
private final Producer<String, StockPrice> producer;
private final Logger logger =
        LoggerFactory.getLogger(MetricsProducerReporter.class);

//Used to Filter just the metrics we want
private final Set<String> metricsNameFilter = Sets.set(
        "record-queue-time-avg", "record-send-rate", "records-per-request-avg",
        "request-size-max", "network-io-rate", "record-queue-time-avg",
        "incoming-byte-rate", "batch-size-avg", "response-rate", "requests-in-flight"
);

public MetricsProducerReporter(
        final Producer<String, StockPrice> producer) {
    this.producer = producer;
}

@Override
public void run() {
    while (true) {
        final Map<MetricName, ? extends Metric> metrics
                = producer.metrics();

        displayMetrics(metrics);
        try {
            Thread.sleep(3_000);
        } catch (InterruptedException e) {
            logger.warn("metrics interrupted");
            Thread.interrupted();
            break;
        }
    }
}
  1. Мои коды медленные, потому что на карте scala по умолчанию параллель не включена. Мне придется использовать messages.par.map () для достижения параллелизма.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...