У меня есть следующая таблица:
DEST_COUNTRY_NAME ORIGIN_COUNTRY_NAME count
United States Romania 15
United States Croatia 1
United States Ireland 344
Egypt United States 15
Таблица представлена в виде набора данных.
scala> dataDS
res187: org.apache.spark.sql.Dataset[FlightData] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
Я могу отсортировать записи как пакетный процесс.
scala> dataDS.sort(col("count")).show(100);
Теперь я хочу попробовать, могу ли я сделать то же самое с использованием потоковой передачи.Для этого, я полагаю, мне придется читать файл как поток.
scala> val staticSchema = dataDS.schema;
staticSchema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,IntegerType,true))
scala> val dataStream = spark.
| readStream.
| schema(staticSchema).
| option("header","true").
| csv("data/flight-data/csv/2015-summary.csv");
dataStream: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
scala> dataStream.isStreaming;
res245: Boolean = true
Но я не могу продвинуться дальше в том, как читать данные как поток.
Я выполнил sort
процесс преобразования
scala> dataStream.sort(col("count"));
res246: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]
Полагаю, теперь я должен использовать метод Dataset
writeStream
.Я выполнил следующие две команды, но обе вернули ошибки.
scala> dataStream.sort(col("count")).writeStream.
| format("memory").
| queryName("sorted_data").
| outputMode("complete").
| start();
org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;
, а эта
scala> dataStream.sort(col("count")).writeStream.
| format("memory").
| queryName("sorted_data").
| outputMode("append").
| start();
org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;
Из ошибок кажется, что я должен агрегировать (группировать) данные, но я думал, что ямне не нужно это делать, поскольку я могу запустить любую пакетную операцию в виде потока.
Как понять, как сортировать данные, поступающие в виде потока?