У меня есть искровой фрейм данных с именем flightData2015 в следующем формате:
+--------------------------+---------------------+-------+
| Destination_country_name | Origin_country_name | count |
+--------------------------+---------------------+-------+
| United States | Romania | 15 |
| United States | Croatia | 1 |
| United States | Ireland | 15 |
| Egypt | United States | 10 |
+--------------------------+---------------------+-------+
Я хочу получить все строки с максимальным количеством.Таким образом, в приведенном выше примере я получу результат:
+--------------------------+---------------------+-------+
| Destination_country_name | Origin_country_name | count |
+--------------------------+---------------------+-------+
| United States | Romania | 15 |
| United States | Ireland | 15 |
+--------------------------+---------------------+-------+
Я могу сделать это через SparkSQL следующим образом:
spark.sql("select * from flight_data_2015 where count = (select max(count) from flight_data_2015)")
Однако, как и ожидалось, когда я проверяю план выполнения, я нахожучто в наборе данных есть несколько проходов.
== Physical Plan ==
*(1) Project [DEST_COUNTRY_NAME#10, ORIGIN_COUNTRY_NAME#11, count#12]
+- *(1) Filter (isnotnull(count#12) && (count#12 = Subquery subquery209))
: +- Subquery subquery209
: +- *(2) HashAggregate(keys=[], functions=[max(count#12)])
: +- Exchange SinglePartition
: +- *(1) HashAggregate(keys=[], functions=[partial_max(count#12)])
: +- *(1) FileScan csv [count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/utk/Documents/Spark-The-Definitive-Guide/data/flight-data/csv/2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<count:int>
+- *(1) FileScan csv [DEST_COUNTRY_NAME#10,ORIGIN_COUNTRY_NAME#11,count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/utk/Documents/Spark-The-Definitive-Guide/data/flight-data/csv/2..., PartitionFilters: [], PushedFilters: [IsNotNull(count)], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
+- Subquery subquery209
+- *(2) HashAggregate(keys=[], functions=[max(count#12)])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_max(count#12)])
+- *(1) FileScan csv [count#12] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/utk/Documents/Spark-The-Definitive-Guide/data/flight-data/csv/2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<count:int>
Мне было интересно, есть ли способ сделать это за один проход.Если нет, то каков наиболее оптимальный способ сделать это с SparkSQL и без него.
Также имейте в виду, что на самом деле в фрейме данных содержится более 2 миллиардов строк, поэтому перенести все в один раздел будет невозможно.