Spark структурированное потоковое приложение для чтения из нескольких тем Kafka - PullRequest
2 голосов
/ 01 мая 2019

У меня есть приложение структурированного потокового вещания Spark (v2.3.2), которое нуждается в чтении из ряда тем Kafka, делает некоторую относительно простую обработку (в основном агрегации и несколько объединений) и публикует результаты в ряде других тем Kafka,Таким образом, несколько потоков обрабатываются в одном приложении.

Мне было интересно, имеет ли это значение с точки зрения ресурсов (память, исполнители, потоки, слушатели Кафки и т. Д.), Если я установил только 1 прямой readStreamкоторый подписывается на несколько тем, а затем разделяет потоки с помощью выбора, вместо 1 readStream на тему.

Что-то вроде

df = spark.readStream.format("kafka").option("subscribe", "t1,t2,t3")
...
t1df = df.select(...).where("topic = 't1'")...
t2df = df.select(...).where("topic = 't2'")...

против.

t1df = spark.readStream.format("kafka").option("subscribe", "t1")
t2df = spark.readStream.format("kafka").option("subscribe", "t2")

Является ли один из них "более эффективным", чем другой?Я не смог найти документацию о том, имеет ли это значение.

Спасибо!

Ответы [ 2 ]

2 голосов
/ 01 мая 2019

Каждое действие требует полного исполнения линии. Вам лучше разделить это на три отдельных чтения кафки. В противном случае вы прочитаете каждую тему N раз, где N - количество записей.

Я бы действительно рекомендовал против этого, но если вы хотите поместить все темы в одно и то же чтение, сделайте следующее:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.filter().write.format(...).save(...)  // location 1
  batchDF.filter().write.format(...).save(...)  // location 2
  batchDF.unpersist()
}
0 голосов
/ 01 мая 2019

С точки зрения ресурса (памяти и ядер) будет различие, если вы запускаете его как несколько потоков (несколько приводов-исполнителей) в кластере.

В первом случае вы упомянули -

df = spark.readStream.format("kafka").option("subscribe", "t1,t2,t3")... t1df = df.select(...).where("topic = 't1'")... t2df = df.select(...).where("topic = 't2'")...

Учитывая наличие драйвера и 2 исполнителей, которые вы указали выше.

Во втором случае -

t1df = spark.readStream.format("kafka").option("subscribe", "t1") t2df = spark.readStream.format("kafka").option("subscribe", "t2")

Вы можете запускать их как разные потоки - 2 драйвера и 2 исполнителя (по 1 исполнителю каждый). Во втором случае потребуется больше памяти и ядер для дополнительного драйвера.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...