Я занимаюсь разработкой своего собственного считывателя источника данных и внедрил фильтры с понижением.Я борюсь со случаем, когда я не получаю вызов pushFilters (), когда у меня нет фильтра для сброса ранее определенного фильтра.Вот сеанс оболочки Spark, чтобы продемонстрировать проблему, с инструкциями отладки, чтобы показать вызов вызова.
1) Начальная нефильтрованная загрузка / показ
scala> val df = spark.read.format("MyDataSource").
option("function", "testSpartan").
option("loglevel", "debug").load
18/08/26 07:42:56.580 DEBUG dr: MyDataSourceReader()
18/08/26 07:42:56.580 DEBUG dr: function: testSpartan
18/08/26 07:42:56.580 DEBUG dr: loglevel: debug
df: org.apache.spark.sql.DataFrame = [jcolumn: bigint, ccolumn: string]
scala> df.show
18/08/26 07:43:33.659 DEBUG dr: pruneColums()
18/08/26 07:43:33.659 DEBUG dr: StructField(jcolumn,LongType,false)
18/08/26 07:43:33.659 DEBUG dr: StructField(ccolumn,StringType,false)
18/08/26 07:43:33.659 DEBUG dr: pushedFilters()
18/08/26 07:43:33.659 DEBUG dr: pushedFilters()
18/08/26 07:43:33.659 DEBUG dr: pushedFilters()
18/08/26 07:43:33.659 DEBUG dr: pushedFilters()
18/08/26 07:43:33.678 DEBUG dr: createBatchDataReaderFactories()
18/08/26 07:43:33.699 DEBUG dr: next()
18/08/26 07:43:33.701 DEBUG dr: get()
18/08/26 07:43:33.701 DEBUG dr: next()
+-------+-------+
|jcolumn|ccolumn|
+-------+-------+
| 0| a|
| 1| b|
| 2| c|
| 3| a|
| 4| b|
| 5| c|
| 6| a|
| 7| b|
| 8| c|
| 9| a|
+-------+-------+
2) Вызов с простым фильтром.Обратите внимание на вызов pushFilters ()
scala> df.filter("jcolumn<2").show
18/08/26 07:45:42.500 DEBUG dr: pushedFilters()
18/08/26 07:45:42.500 DEBUG dr: pushedFilters()
18/08/26 07:45:42.500 DEBUG dr: pushedFilters()
18/08/26 07:45:42.500 DEBUG dr: pushedFilters()
18/08/26 07:45:42.501 DEBUG dr: pushFilters()
18/08/26 07:45:42.501 DEBUG dr: LessThan(jcolumn,2)
18/08/26 07:45:42.501 DEBUG dr: pruneColums()
18/08/26 07:45:42.501 DEBUG dr: StructField(jcolumn,LongType,false)
18/08/26 07:45:42.501 DEBUG dr: StructField(ccolumn,StringType,false)
18/08/26 07:45:42.501 DEBUG dr: pushedFilters()
18/08/26 07:45:42.501 DEBUG dr: pushedFilters()
18/08/26 07:45:42.501 DEBUG dr: pushedFilters()
18/08/26 07:45:42.501 DEBUG dr: pushedFilters()
18/08/26 07:45:42.512 DEBUG dr: createBatchDataReaderFactories()
18/08/26 07:45:42.529 DEBUG dr: next()
18/08/26 07:45:42.532 DEBUG dr: get()
18/08/26 07:45:42.532 DEBUG dr: next()
+-------+-------+
|jcolumn|ccolumn|
+-------+-------+
| 0| a|
| 1| b|
+-------+-------+
3) Последующий вызов без фильтра.Вы увидите, что я не получаю вызов pushFilters () с пустым массивом Filter.Я не уверен, какой «сигнал» я должен получить для сброса поддерживаемых фильтров
scala> df.show
18/08/26 07:46:21.442 DEBUG dr: pruneColums()
18/08/26 07:46:21.442 DEBUG dr: StructField(jcolumn,LongType,false)
18/08/26 07:46:21.442 DEBUG dr: StructField(ccolumn,StringType,false)
18/08/26 07:46:21.443 DEBUG dr: pushedFilters()
18/08/26 07:46:21.443 DEBUG dr: pushedFilters()
18/08/26 07:46:21.443 DEBUG dr: pushedFilters()
18/08/26 07:46:21.443 DEBUG dr: pushedFilters()
18/08/26 07:46:21.452 DEBUG dr: createBatchDataReaderFactories()
18/08/26 07:46:21.468 DEBUG dr: next()
18/08/26 07:46:21.470 DEBUG dr: get()
18/08/26 07:46:21.471 DEBUG dr: next()
+-------+-------+
|jcolumn|ccolumn|
+-------+-------+
| 0| a|
| 1| b|
+-------+-------+