Мне нужна ясность о том, как работает спарк, когда дело доходит до извлечения данных из внешних баз данных. Из документации spark я понял, что, если я не упомяну такие атрибуты, как "numPartitons", "lowerBound" и "upperBound", тогда чтение через jdb c не будет параллельным. В таком случае, что происходит? Данные читаются одним конкретным исполнителем, который выбирает все данные? Как тогда достигается параллелизм? Разве этот исполнитель позже передает данные другим исполнителям? Но я считаю, что исполнители не могут делиться такими данными.
Пожалуйста, дайте мне знать, если кто-нибудь из вас исследовал это.
Изменить на мой вопрос - Привет, Амит, спасибо за твой ответ, но это не то, что я ищу. Позвольте мне уточнить: - Ссылка это - https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
См. Ниже фрагмент кода -
val MultiJoin_vw = db.getDataFromGreenplum_Parallel(ss, MultiJoin, bs,5,"bu_id",10,9000)
println(MultiJoin_vw.explain(true))
println("Number of executors")
ss.sparkContext.statusTracker.getExecutorInfos.foreach(x => println(x.host(),x.numRunningTasks()))
println("Number of partitons:" ,MultiJoin_vw.rdd.getNumPartitions)
println("Number of records in each partiton:")
MultiJoin_vw.groupBy(spark_partition_id).count().show(10)
Выход:
Fetch Starts
== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=5] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev18,0)
(ddlhdcdev41,0)
(Number of partitons:,5)
Number of records in each partition:
+--------------------+------+
|SPARK_PARTITION_ID()| count|
+--------------------+------+
| 1|212267|
| 3| 56714|
| 4|124824|
| 2|232193|
| 0|627712|
+--------------------+------+
Здесь я читать таблицу с помощью пользовательской функции db.getDataFromGreenplum_Parallel (ss, MultiJoin, bs, 5, "bu_id", 10,9000), которая указывает на создание 5 разделов на основе поля bu_id, нижнее значение которого равно 10, а верхнее значение равно 9000. Посмотрите, как данные считывания с помощью spark в 5 разделах с 5 параллельными соединениями (как указано в spark do c). Теперь давайте прочитаем эту таблицу, не упоминая ни один из параметров выше -
Я просто получаю данные, используя другую функцию - val MultiJoin_vw = db.getDataFromGreenplum(ss, MultiJoin, bs)
Здесь я передаю только сеанс spark (ss), запрос для получения данных (MultiJoin) и другой параметр для обработки исключений (bs). O / p как показано ниже - выборка начинается
== Physical Plan ==
*(1) Scan JDBCRelation((select * from mstrdata_rdl.cmmt_sku_bu_vw)as mytab) [numPartitions=1] [sku_nbr#0,bu_id#1,modfd_dts#2] PushedFilters: [], ReadSchema: struct<sku_nbr:string,bu_id:int,modfd_dts:timestamp>
()
Number of executors
(ddlhdcdev31,0)
(ddlhdcdev27,0)
(Number of partitons:1)
Number of records in each partiton:
+--------------------+-------+
|SPARK_PARTITION_ID()| count|
+--------------------+-------+
| 0|1253710|
Посмотрите, как данные считываются в один раздел, означает порождение только 1 соединения. Вопрос остается: этот раздел будет только на одной машине, и ему будет назначено 1 задание. Таким образом, здесь нет параллелизма. Как данные распределяются между другими исполнителями?
Кстати, это команда spark-submit, которую я использовал для обоих сценариев ios -
spark2-submit --master yarn --deploy-mode cluster --driver-memory 1g --num-executors 1 --executor-cores 1 --executor-memory 1g --class jobs.memConnTest $home_directory/target/mem_con_test_v1-jar-with-dependencies.jar