Мое приложение для потоковой передачи в режиме искры читает из kafka с использованием подхода DStream, и я пытаюсь получить размер пакета для обработки 60 000 сообщений за 10 секунд.
Что я наделал,
- Создана тема с 3 разделами
- spark.streaming.kafka.maxRatePerPartition = 60000
- spark.streaming.backpressure.enabled = true
- установить длительность пакета до 10 секунд при создании
StreamingContext
- работает в режиме пряжи с 2 исполнителями (всего 4 ядра для 3
Перегородки)
Теперь, как я проверяю, что это работает.
У меня есть продюсер, который сразу отправляет 60 000 сообщений в тему. Когда я проверяю интерфейс искры, я получаю следующее:
- время партии | Размер ввода | время обработки
- 10: 54: 30 | 17610 | 5s
- 10: 54: 20 | 32790 | 8s
- 10: 54: 10 | 9600 | 3s
Таким образом, время каждой партии составляет 10 с. Я ожидаю, что 1 партия с 60000 записей. Есть ли какой-то другой параметр, который я не устанавливаю? Из того, что я прочитал о том, что я сейчас установил, я должен получить 10 * 60 000 * 3 = 1800000 в одной партии.
spark.app.id = application_1551747423133_0677
spark.app.name = KafkaCallDEV
spark.driver.cores = 2
spark.driver.extraJavaOptions = -XX: + UseG1GC -XX: ConcGCThreads = 2
-XX: InitiatingHeapOccupancyPercent = 35 -Dlog4j.configuration = log4j.properties -verbose: gc
spark.driver.memory = 3g
spark.driver.port = 33917
spark.executor.cores = 2
spark.executor.extraJavaOptions = -XX: + UseG1GC -XX: ConcGCThreads = 2
-XX: InitiatingHeapOccupancyPercent = 35 -Dlog4j.configuration = log4j.properties -verbose: gc
spark.executor.id = драйвер
spark.executor.instances = 2
spark.executor.memory = 2g
spark.master = пряжа
spark.scheduler.mode = FIFO
spark.streaming.backpressure.enabled = true
spark.streaming.kafka.maxRatePerPartition = 60000
spark.submit.deployMode = cluster
spark.ui.filters = org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
spark.ui.port = 0
spark.yarn.app.container.log.dir = / data0 / пряжа / контейнеры-журналы / application_1551747423133_0677 / container_1551747423133_0677_01_000002
Ниже приведено то, что я распечатал, используя
logger.info (sparkSession.sparkContext.getConf.getAll.mkString ( "\ п"))
Я удалил некоторые ненужные журналы, например адрес сервера, имя приложения и т. Д.
(spark.executor.extraJavaOptions, -XX: + UseG1GC -XX: ConcGCThreads = 2
-XX: InitiatingHeapOccupancyPercent = 35 -Dlog4j.configuration = log4j.properties -verbose: gc) (spark.yarn.app.id, application_1551747423133_0681)
(spark.submit.deployMode, кластер)
(spark.streaming.backpressure.enabled, правда)
(spark.yarn.credentials.renewalTime, 1562764821939ms)
(spark.ui.filters, org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter)
(spark.executor.memory, 2g)
(spark.yarn.credentials.updateTime, 1562769141873ms)
(spark.driver.cores, 2)
(spark.executor.id, водитель)
(spark.executor.cores, 2)
(spark.master, пряжа)
(spark.driver.memory, 3g)
(spark.sql.warehouse.dir, / user / hive / warehouse)
(spark.ui.port, 0)
(spark.driver.extraJavaOptions, -XX: + UseG1GC -XX: ConcGCThreads = 2
-XX: InitiatingHeapOccupancyPercent = 35 -Dlog4j.configuration = log4j.properties -verbose: gc)
(spark.executor.instances, 2)
(spark.driver.port, 37375)
У меня также есть некоторые конфигурации Kafka, которые печатаются, поэтому я также опубликую их ниже.
org.apache.kafka.clients.consumer.ConsumerConfig:178 - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 60000
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
retry.backoff.ms = 100
ssl.secure.random.implementation = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest