Spark Streaming не настроил количество записей на размер пакета? - PullRequest
1 голос
/ 08 июля 2019

Мое приложение для потоковой передачи в режиме искры читает из kafka с использованием подхода DStream, и я пытаюсь получить размер пакета для обработки 60 000 сообщений за 10 секунд.

Что я наделал,

  • Создана тема с 3 разделами
  • spark.streaming.kafka.maxRatePerPartition = 60000
  • spark.streaming.backpressure.enabled = true
  • установить длительность пакета до 10 секунд при создании StreamingContext
  • работает в режиме пряжи с 2 исполнителями (всего 4 ядра для 3 Перегородки)

Теперь, как я проверяю, что это работает.

У меня есть продюсер, который сразу отправляет 60 000 сообщений в тему. Когда я проверяю интерфейс искры, я получаю следующее:

  • время партии | Размер ввода | время обработки
  • 10: 54: 30 | 17610 | 5s
  • 10: 54: 20 | 32790 | 8s
  • 10: 54: 10 | 9600 | 3s

Таким образом, время каждой партии составляет 10 с. Я ожидаю, что 1 партия с 60000 записей. Есть ли какой-то другой параметр, который я не устанавливаю? Из того, что я прочитал о том, что я сейчас установил, я должен получить 10 * 60 000 * 3 = 1800000 в одной партии.

spark.app.id = application_1551747423133_0677

spark.app.name = KafkaCallDEV

spark.driver.cores = 2

spark.driver.extraJavaOptions = -XX: + UseG1GC -XX: ConcGCThreads = 2 -XX: InitiatingHeapOccupancyPercent = 35 -Dlog4j.configuration = log4j.properties -verbose: gc

spark.driver.memory = 3g

spark.driver.port = 33917

spark.executor.cores = 2

spark.executor.extraJavaOptions = -XX: + UseG1GC -XX: ConcGCThreads = 2 -XX: InitiatingHeapOccupancyPercent = 35 -Dlog4j.configuration = log4j.properties -verbose: gc

spark.executor.id = драйвер

spark.executor.instances = 2

spark.executor.memory = 2g

spark.master = пряжа

spark.scheduler.mode = FIFO

spark.streaming.backpressure.enabled = true

spark.streaming.kafka.maxRatePerPartition = 60000

spark.submit.deployMode = cluster

spark.ui.filters = org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter

spark.ui.port = 0

spark.yarn.app.container.log.dir = / data0 / пряжа / контейнеры-журналы / application_1551747423133_0677 / container_1551747423133_0677_01_000002

Ниже приведено то, что я распечатал, используя

logger.info (sparkSession.sparkContext.getConf.getAll.mkString ( "\ п"))

Я удалил некоторые ненужные журналы, например адрес сервера, имя приложения и т. Д.

(spark.executor.extraJavaOptions, -XX: + UseG1GC -XX: ConcGCThreads = 2

-XX: InitiatingHeapOccupancyPercent = 35 -Dlog4j.configuration = log4j.properties -verbose: gc) (spark.yarn.app.id, application_1551747423133_0681)

(spark.submit.deployMode, кластер)

(spark.streaming.backpressure.enabled, правда)

(spark.yarn.credentials.renewalTime, 1562764821939ms)

(spark.ui.filters, org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter)

(spark.executor.memory, 2g)

(spark.yarn.credentials.updateTime, 1562769141873ms)

(spark.driver.cores, 2)

(spark.executor.id, водитель)

(spark.executor.cores, 2)

(spark.master, пряжа)

(spark.driver.memory, 3g)

(spark.sql.warehouse.dir, / user / hive / warehouse)

(spark.ui.port, 0)

(spark.driver.extraJavaOptions, -XX: + UseG1GC -XX: ConcGCThreads = 2 -XX: InitiatingHeapOccupancyPercent = 35 -Dlog4j.configuration = log4j.properties -verbose: gc)

(spark.executor.instances, 2)

(spark.driver.port, 37375)

У меня также есть некоторые конфигурации Kafka, которые печатаются, поэтому я также опубликую их ниже.

org.apache.kafka.clients.consumer.ConsumerConfig:178 - ConsumerConfig values: 
    metric.reporters = []
    metadata.max.age.ms = 300000
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    max.partition.fetch.bytes = 1048576
    ssl.keystore.type = JKS
    enable.auto.commit = false
    sasl.mechanism = GSSAPI
    interceptor.classes = null
    exclude.internal.topics = true
    ssl.truststore.password = null
    client.id = 
    ssl.endpoint.identification.algorithm = null
    max.poll.records = 60000
    check.crcs = true
    request.timeout.ms = 40000
    heartbeat.interval.ms = 3000
    auto.commit.interval.ms = 5000
    receive.buffer.bytes = 65536
    ssl.truststore.type = JKS
    ssl.truststore.location = null
    ssl.keystore.password = null
    fetch.min.bytes = 1
    send.buffer.bytes = 131072
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    retry.backoff.ms = 100
    ssl.secure.random.implementation = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.trustmanager.algorithm = PKIX
    ssl.key.password = null
    fetch.max.wait.ms = 500
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    session.timeout.ms = 30000
    metrics.num.samples = 2
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    ssl.protocol = TLS
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.keystore.location = null
    ssl.cipher.suites = null
    security.protocol = PLAINTEXT
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    auto.offset.reset = latest

Ответы [ 2 ]

3 голосов
/ 08 июля 2019

spark.streaming.kafka.maxRatePerPartition = 60000 означает, что

максимальная скорость (в сообщениях в секунду), при которой каждый кафка раздел будет читаться этим прямым API, который будет включен свойством spark.streaming.backpressure.enabled = true

17610 + 32790 + 9600 = 60000 достигнут размер вашей партии.


см. это enter image description here

ваши 3 раздела кафки (с 60k сообщений) считываются искрой в чанках / искровых перегородках, в вашем случае 3 раздела от искры. но оригинальное количество сообщений в 3 разделах кафки составляет 60000 (17610 + 32790 + 9600). Даже если входящий поток сообщений с высокой скоростью возвращается, давление будет поддерживать одинаковую скорость сообщений, используя RateLimiter и PIDRateEstimator

Итак, вы сделали здесь ....

Далее расскажу мою публикацию - Краткое примечание о противодавлении искрового потока для лучшего понимания

Вывод: Если вы включите обратное давление, независимо от того, с какой скоростью вы отправляете сообщения. это позволит постоянной скорости сообщений

как в этом иллюстративном генеральном примере ... где свойства противодавления подобны управлению притоком - регулировочный винт давления для поддержания равномерной скорости потока сообщений.

enter image description here

0 голосов
/ 19 июля 2019

Итак, я нашел причину, по которой Spark разбивает пакет записей, которые я отправляю на несколько пакетов.У меня есть spark.streaming.backpressure.enabled = true.При этом используется петля обратной связи из предыдущих пакетов для управления скоростью приема, которая ограничена сверху максимальной скоростью на раздел, которую я установил в spark.streaming.kafka.maxRatePerPartition.Так что искра настраивает скорость приема для меня.

...