Apache Spark: использование простых запросов SQL против методов Spark SQL - PullRequest
0 голосов
/ 02 октября 2019

Я очень новичок в Apache Spark. У меня есть очень простой вопрос: что лучше с точки зрения производительности между двумя нижеприведенными синтаксисами: использование простых SQL-запросов или использование Spark SQL-методов, таких как select, filter и т. Д. Вот короткий пример на Java, который поможет вам лучше понять мой вопрос.

    private static void queryVsSparkSQL() throws AnalysisException {
        SparkConf conf = new SparkConf();

        SparkSession spark = SparkSession
                .builder()
                .master("local[4]")
                .config(conf)
                .appName("queryVsSparkSQL")
                .getOrCreate();

        //using predefined query
        Dataset<Row> ds1 = spark
                .read()
                .format("jdbc")
                .option("url", "jdbc:oracle:thin:hr/hr@localhost:1521/orcl")
                .option("user", "hr")
                .option("password", "hr")
                .option("query","select * from hr.employees t where t.last_name = 'King'")
                .load();
        ds1.show();

        //using spark sql methods: select, filter
        Dataset<Row> ds2 = spark
                .read()
                .format("jdbc")
                .option("url", "jdbc:oracle:thin:hr/hr@localhost:1521/orcl")
                .option("user", "hr")
                .option("password", "hr")
                .option("dbtable", "hr.employees")
                .load()
                .select("*")
                .filter(col("last_name").equalTo("King"));

        ds2.show();
    }

1 Ответ

1 голос
/ 02 октября 2019

Попробуйте .explain и проверьте, используется ли предикат pushdown для вашего второго запроса.

Это должно быть во втором случае. Если это так, то технически это эквивалентно передаче явного запроса с pushdown, уже содержащегося в опции запроса.

См. Смоделированную версию для mySQL, основанную на вашем подходе.

CASE 1: оператор выбора через переданный запрос, содержащий фильтр

val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://mysql-rfam-public.ebi.ac.uk:4497/Rfam").option("driver", "org.mariadb.jdbc.Driver").option("query","select * from family where rfam_acc = 'RF01527'").option("user", "rfamro").load().explain()

== Physical Plan ==
*(1) Scan JDBCRelation((select * from family where rfam_acc = 'RF01527') SPARK_GEN_SUBQ_4) [numPartitions=1] #[rfam_acc#867,rfam_id#868,auto_wiki#869L,description#870,author#871,seed_source#872,gathering_cutoff#873,trusted_cutoff#874,noise_cutoff#875,comment#876,previous_id#877,cmbuild#878,cmcalibrate#879,cmsearch#880,num_seed#881L,num_full#882L,num_genome_seq#883L,num_refseq#884L,type#885,structure_source#886,number_of_species#887L,number_3d_structures888,num_pseudonokts#889,tax_seed#890,... 11 more fields] PushedFilters: [], ReadSchema: struct<rfam_acc:string,rfam_id:string,auto_wiki:bigint,description:string,author:string,seed_sour...

ЗдесьPressedFilters не используется, поскольку используется только запрос;он содержит фильтр в фактическом передаваемом в db запрос.

СЛУЧАЙ 2: Нет оператора select, вместо этого используются API-интерфейсы Spark SQL, ссылающиеся на фильтр

val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://mysql-rfam-public.ebi.ac.uk:4497/Rfam").option("driver", "org.mariadb.jdbc.Driver").option("dbtable", "family").option("user", "rfamro").load().select("*").filter(col("rfam_acc").equalTo("RF01527")).explain()

== Physical Plan ==
*(1) Scan JDBCRelation(family) [numPartitions=1] [rfam_acc#1149,rfam_id#1150,auto_wiki#1151L,description#1152,author#1153,seed_source#1154,gathering_cutoff#1155,trusted_cutoff#1156,noise_cutoff#1157,comment#1158,previous_id#1159,cmbuild#1160,cmcalibrate#1161,cmsearch#1162,num_seed#1163L,num_full#1164L,num_genome_seq#1165L,num_refseq#1166L,type#1167,structure_source#1168,number_of_species#1169L,number_3d_structures#1170,num_pseudonokts#1171,tax_seed#1172,... 11 more fields] PushedFilters: [*IsNotNull(rfam_acc), *EqualTo(rfam_acc,RF01527)], ReadSchema: struct<rfam_acc:string,rfam_id:string,auto_wiki:bigint,description:string,author:string,seed_sour...

PhedFilter isустановите критерии, чтобы фильтрация применялась в самой базе данных перед возвратом данных в Spark. Обратите внимание на символ * на PhedFilters, который указывает на фильтрацию в источнике данных = базе данных.

Сводка

Я запустил оба варианта, и время было быстрым. Они эквивалентны с точки зрения того, что выполняется обработка БД, в Spark возвращаются только отфильтрованные результаты, но через два разных механизма, которые дают одинаковую производительность и физические результаты.

...