Подталкивание предикатов всегда происходит на уровне источника данных. Это происходит таким образом, что источник данных выборочно сканирует те фрагменты данных, которые основаны на. Spark - это просто механизм обработки, который передает запрос источнику данных для окончательного выполнения. Источник данных, с другой стороны, будет выполнять запрос по своему усмотрению. Соединители Spark- sql осведомлены о поведении источников данных (основанных на схеме), поэтому они могут предсказать физический план с помощью предикатов pushdown, но не могут гарантировать, что он будет выполняться, поэтому звездочка.
Я выполнил запрос к локальному паркетному файлу. Физический план выдвинул предикат и без звездочки. Это локальный файл паркета, который Spark считывает сам, поэтому физический план является точным на 100%.
val df = spark.read.parquet("/Users/Documents/temp/temp1")
df.filter($"income" >= 30).explain(true)
== Physical Plan ==
*(1) Project [client#0, type#1, address#2, type_2#3, income#4]
+- *(1) Filter (isnotnull(income#4) && (income#4 >= 30))
+- *(1) FileScan parquet [client#0,type#1,address#2,type_2#3,income#4] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/User/Documents/temp/temp1], PartitionFilters: [], PushedFilters: [IsNotNull(income), GreaterThanOrEqual(income,30)], ReadSchema: struct<client:string,type:string,address:string,type_2:string,income:int>
Здесь таблица считывается из Oracle с использованием Spark- SQL. БД Oracle использует предикат pu sh down и индексный доступ, но Spark об этом не знает.
== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand InsertIntoHadoopFsRelationCommand file:/data/.., false, Parquet, Map(codec -> org.apache.hadoop.io.compress.snappyCodec, path -> /data/...), Overwrite, [COLUMN_01, COLUMN_02, COLUMN_03, COLUMN_04, COLUMN_05, COLUMN_06, COLUMN_07, COLUMN_08, COLUMN_09, COLUMN_10, COLUMN_11, COLUMN_12, COLUMN_13, COLUMN_14, COLUMN_15, COLUMN_16, COLUMN_17, COLUMN_18, ... 255 more fields]
+- Project [COLUMN_01#1246, COLUMN_02#1247, COLUMN_03#1248, COLUMN_04#1249, COLUMN_05#1250, COLUMN_06#1251, COLUMN_07#1252, COLUMN_08#1253, COLUMN_09#1254, COLUMN_10#1255, COLUMN_11#1256, COLUMN_12#1257, COLUMN_13#1258, COLUMN_14#1259, COLUMN_15#1260, COLUMN_16#1261, COLUMN_17#1262, COLUMN_18#1263, ... 255 more fields]
+- Scan JDBCRelation((select cu.*, ROWIDTONCHAR(t.rowid) as ROW_ID from table t where (column1 in (786567473,786567520,786567670,786567570,...........)) and column2 in (10,11, ...) and t.result is null)t) [numPartitions=20] [COLUMN_87#1332,COLUMN_182#1427,COLUMN_128#1373,COLUMN_104#1349,COLUMN_189#1434,COLUMN_108#1353,COLUMN_116#1361,COLUMN_154#1399,COLUMN_125#1370,COLUMN_120#1365,COLUMN_267#1512,COLUMN_54#1299,COLUMN_100#1345,COLUMN_230#1475,COLUMN_68#1313,COLUMN_44#1289,COLUMN_53#1298,COLUMN_97#1342,COLUMN_03#1248,COLUMN_16#1261,COLUMN_43#1288,COLUMN_50#1295,COLUMN_174#1419,COLUMN_20#1265,... 254 more fields] PushedFilters: [], ReadSchema: struct<COLUMN_87:string,COLUMN_182:string,COLUMN_128:string,COLUMN_104:string,COLUMN_189:string,C...