Искра Предсказание Pushdown не работает, как ожидалось - PullRequest
4 голосов
/ 22 марта 2020

У меня проблема с предикатным поведением Spark. Кажется, что-то не так. Я использую Spark версии 2.4.5 на MacOS

Ниже приведен мой пример CSV-данных results2.csv

enter image description here

val df = spark.read.option("header", "true").csv("/Users/apple/kaggle-data/results2.csv")

раздел на 2 столбца: страна и город

df.repartition($"country",$"city").write.option("header", "true").partitionBy("country","city").parquet("/Users/apple/kaggle-data/part2/")

раздел на 1 столбец: страна

val df2 = spark.read.option("header", "true").csv("/Users/apple/kaggle-data/results2.csv")
df2.repartition($"country").write.option("header", "true").partitionBy("country").parquet("/Users/apple/kaggle-data/part1/")

Я читаю данные только с разделом на страна и запрос по предикату страна и город, но фильтр выпадающего показывает город, который не ожидается, я ожидал, что страна будет здесь

val kaggleDf1 = spark.read.option("header", "true").parquet("/Users/apple/kaggle-data/part1/") 
kaggleDf1.where($"country" === "England" && $"city" === "London").explain(true)

план

== Parsed Logical Plan ==
'Filter (('country = England) && ('city = London))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet

== Analyzed Logical Plan ==
date: string, home_team: string, away_team: string, home_score: string, away_score: string, tournament: string, city: string, neutral: string, country: string
Filter ((country#146 = England) && (city#144 = London))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet

== Optimized Logical Plan ==
Filter (((isnotnull(country#146) && isnotnull(city#144)) && (country#146 = England)) && (city#144 = London))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet

== Physical Plan ==
*(1) Project [date#138, home_team#139, away_team#140, home_score#141, away_score#142, tournament#143, city#144, neutral#145, country#146]
+- *(1) Filter (isnotnull(city#144) && (city#144 = London))
   +- *(1) FileScan parquet [date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] Batched: true, Format: Parquet, Location: InMemoryFileIndex[/Users/apple/kaggle-data/part1], PartitionCount: 1, PartitionFilters: [isnotnull(country#146), (country#146 = England)], ***PushedFilters: [IsNotNull(city), EqualTo(city,London)]***, ReadSchema: struct<date:string,home_team:string,away_team:string,home_score:string,away_score:string,tourname...

Я читаю данные с разделением только по стране и запросом по стране-предикату, но в раскрывающемся списке отображается пустое значение, чего не ожидается, я ожидал, что страна будет здесь

kaggleDf1.where($"country" === "England").explain(true)

план:

== Parsed Logical Plan ==
'Filter ('country = England)
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet

== Analyzed Logical Plan ==
date: string, home_team: string, away_team: string, home_score: string, away_score: string, tournament: string, city: string, neutral: string, country: string
Filter (country#146 = England)
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet

== Optimized Logical Plan ==
Filter (isnotnull(country#146) && (country#146 = England))
+- Relation[date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] parquet

== Physical Plan ==
*(1) FileScan parquet [date#138,home_team#139,away_team#140,home_score#141,away_score#142,tournament#143,city#144,neutral#145,country#146] Batched: true, Format: Parquet, Location: InMemoryFileIndex[/Users/apple/kaggle-data/part1], PartitionCount: 1, PartitionFilters: [isnotnull(country#146), (country#146 = England)], ***PushedFilters: []***, ReadSchema: struct<date:string,home_team:string,away_team:string,home_score:string,away_score:string,tourname...

Я читаю данные с разделением по стране и городу и запрашиваю страну и город предиката, но в раскрывающемся списке отображается пустое значение, чего не ожидается, я ожидал, что страна и город будут здесь

val kaggleDf2 = spark.read.option("header", "true").parquet("/Users/apple/kaggle-data/part2/")
kaggleDf2.where($"country" === "England" && $"city" === "London").explain(true)

план:

== Parsed Logical Plan ==
'Filter (('country = England) && ('city = London))
+- Relation[date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] parquet

== Analyzed Logical Plan ==
date: string, home_team: string, away_team: string, home_score: string, away_score: string, tournament: string, neutral: string, country: string, city: string
Filter ((country#165 = England) && (city#166 = London))
+- Relation[date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] parquet

== Optimized Logical Plan ==
Filter (((isnotnull(country#165) && isnotnull(city#166)) && (country#165 = England)) && (city#166 = London))
+- Relation[date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] parquet

== Physical Plan ==
*(1) FileScan parquet [date#158,home_team#159,away_team#160,home_score#161,away_score#162,tournament#163,neutral#164,country#165,city#166] Batched: true, Format: Parquet, Location: InMemoryFileIndex[/Users/apple/kaggle-data/part2], PartitionCount: 1, PartitionFilters: [isnotnull(country#165), isnotnull(city#166), (country#165 = England), (city#166 = London)], ***PushedFilters: []***, ReadSchema: struct<date:string,home_team:string,away_team:string,home_score:string,away_score:string,tourname...

Может кто-нибудь помочь мне, что здесь не так. Я что-то упустил?

Ответы [ 2 ]

2 голосов
/ 22 марта 2020

Это из-за PartitionFilters и ожидаемого поведения.

Когда данные в файле паркета сохраняются с использованием partition by и если запрос соответствует определенному разделу filter criteria, Spark считывает только те подкаталоги, которые соответствуют фильтрам разделов, поэтому ему не нужно снова применять этот фильтр к данным, чтобы вообще не было никакого фильтра для этих столбцов.

Теперь в вашем случае:

kaggleDf1.where($"country" === "England" && $"city" === "London")
PartitionFilters: [isnotnull(country#146), (country#146 = England)]
PushedFilters: [IsNotNull(city), EqualTo(city,London)]

Spark считывает только те файлы, которые содержат country === "England" (потому что ваши данные были разделены на country во время сохранения), поэтому не нужно примените этот фильтр к данным снова. И вы не найдете этот фильтр нигде, кроме PartitionFilters.

1 голос
/ 22 марта 2020

Я думаю, что вы неверно истолковываете.

Я читаю данные с разделом только по стране и запрашиваю по стране и городу предиката, но фильтр с понижением показывает город, который не ожидается, я ожидал, что страна Быть здесь 1012 * вместо того, чтобы быть введенным в Spark - хотя вы можете отключить это. Это из соображений производительности.

Pu sh вниз имеет 2 аспекта. Фильтр разделов позволяет считывать только те разделы, это экономит при сканировании, а затем внутри этого раздела или разделов впоследствии применяется фильтр города. PARQUET также является столбчатым.

...PartitionFilters: [isnotnull(country#146), (country#146 = England)], ***PushedFilters: [IsNotNull(city), EqualTo(city,London)]***...

Так что никаких проблем, ожидания должны быть согласованы, вот и все. Во втором случае вы должны быть в состоянии работать сейчас.

...