Как spark читает из jdb c и распространяет данные - PullRequest
1 голос
/ 19 апреля 2020

Мне нужна ясность о том, как работает спарк, когда дело доходит до извлечения данных из внешних баз данных. Из документации spark я понял, что, если я не упомяну такие атрибуты, как "numPartitons", "lowerBound" и "upperBound", тогда чтение через jdb c не будет параллельным. В таком случае, что происходит? Данные читаются одним конкретным исполнителем, который выбирает все данные? Как тогда достигается параллелизм? Разве этот исполнитель позже передает данные другим исполнителям? Но я считаю, что исполнители не могут делиться такими данными.

Пожалуйста, дайте мне знать, если кто-нибудь из вас исследовал это.

Изменить на мой вопрос - Привет, Амит, спасибо за твой ответ, но это не то, что я ищу. Позвольте мне уточнить: - Ссылка это - https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

См. Ниже фрагмент кода -

val MultiJoin_vw = db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000)
println(MultiJoin_vw.explain(true))
println("Number of executors")
ss.sparkContext.statusTracker.getExecutorInfos.foreach(x => println(x.host(),x.numRunningTasks()))
println("Number of partitons:" ,MultiJoin_vw.rdd.getNumPartitions)
println("Number of records in each partiton:")
MultiJoin_vw.groupBy(spark_partition_id).count().show(10)

Выход:

Fetch Starts
== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=5] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev18,0)
(ddlhdcdev41,0)
(Number of partitons:,5)
Number of records in each partition:
+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
|                   1|212267|
|                   3| 56714|
|                   4|124824|
|                   2|232193|
|                   0|627712|
+--------------------+------+

Здесь я читать таблицу с помощью пользовательской функции db.getDataFromGreenplum_Parallel (ss, MultiJoin, bs, 5, "bu_id", 10,9000), которая указывает на создание 5 разделов на основе поля bu_id, нижнее значение которого равно 10, а верхнее значение равно 9000. Посмотрите, как данные считывания с помощью spark в 5 разделах с 5 параллельными соединениями (как указано в spark do c). Теперь давайте прочитаем эту таблицу, не упоминая ни один из параметров выше -

Я просто получаю данные, используя другую функцию - val MultiJoin_vw = db.getDataFromGreenplum(ss, MultiJoin, bs)

Здесь я передаю только сеанс spark (ss), запрос для получения данных (MultiJoin) и другой параметр для обработки исключений (bs). O / p как показано ниже - выборка начинается

== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=1] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev31,0)
(ddlhdcdev27,0)
(Number of partitons:1)
Number of records in each partiton:
+--------------------+-------+
|SPARK_PARTITION_ID()|  count|
+--------------------+-------+
|                   0|1253710|

Посмотрите, как данные считываются в один раздел, означает порождение только 1 соединения. Вопрос остается: этот раздел будет только на одной машине, и ему будет назначено 1 задание. Таким образом, здесь нет параллелизма. Как данные распределяются между другими исполнителями?

Кстати, это команда spark-submit, которую я использовал для обоих сценариев ios -

spark2-submit --master yarn --deploy-mode cluster --driver-memory 1g --num-executors 1 --executor-cores 1 --executor-memory 1g --class jobs.memConnTest $home_directory/target/mem_con_test_v1-jar-with-dependencies.jar

1 Ответ

1 голос
/ 20 апреля 2020

Re: «извлекать данные из внешних баз данных». В вашем приложении spark это обычно часть кода, которая будет выполняться исполнителями. Количество исполнителей можно контролировать, передавая искровую конфигурацию «num-executors». Если вы работали со Spark и RDD / Dataframe, то одним из примеров подключения к базе данных являются функции преобразования, такие как map, flatmap, filter et c. Эти функции при выполнении на исполнителях (настроенных num-executors) устанавливают sh соединение с базой данных и используют его.

Здесь важно отметить одну важную вещь: если вы работаете со слишком многими исполнителями, ваш сервер баз данных может работать все медленнее и медленнее и в конечном итоге не отвечать на запросы. Если вы дадите слишком мало исполнителей, это может привести к тому, что ваша работа по поиску займет больше времени до конца sh. Следовательно, вы должны найти оптимальное число в зависимости от емкости вашего сервера БД.

Re: «Как тогда достигается параллелизм? Делит ли этот исполнитель данные позже другим исполнителям?»

Параллелизм как упомянутое выше достигается путем настройки количества исполнителей. Настройка количества исполнителей - это только один из способов увеличения параллелизма и не единственный способ. Рассмотрим случай, когда у вас есть данные меньшего размера, приводящие к меньшему количеству разделов, тогда вы увидите меньший параллелизм. Таким образом, вам нужно иметь достаточное количество разделов (соответствующих задачам) и затем соответствующее (определенное количество зависит от варианта использования) количество исполнителей для параллельного выполнения этих задач. Пока вы можете обрабатывать каждую запись по отдельности, она масштабируется, однако, как только у вас будет действие, которое вызовет случайное перемешивание, вы увидите статистику, касающуюся задач и исполнителей в действии. Spark постарается наилучшим образом распространить данные, чтобы они могли работать на оптимальном уровне.

Пожалуйста, обратитесь к https://blog.cloudera.com/how-to-tune-your-apache-spark-jobs-part-1/ и последующим частям, чтобы узнать больше о внутренних деталях.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...