Как читать данные из CSV-файла в виде потока - PullRequest
0 голосов
/ 10 февраля 2019

У меня есть следующая таблица:

DEST_COUNTRY_NAME   ORIGIN_COUNTRY_NAME count
United States       Romania             15
United States       Croatia             1
United States       Ireland             344
Egypt               United States       15  

Таблица представлена ​​в виде набора данных.

scala> dataDS
res187: org.apache.spark.sql.Dataset[FlightData] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

Я могу отсортировать записи как пакетный процесс.

scala> dataDS.sort(col("count")).show(100);

Теперь я хочу попробовать, могу ли я сделать то же самое с использованием потоковой передачи.Для этого, я полагаю, мне придется читать файл как поток.

scala> val staticSchema = dataDS.schema;
staticSchema: org.apache.spark.sql.types.StructType = StructType(StructField(DEST_COUNTRY_NAME,StringType,true), StructField(ORIGIN_COUNTRY_NAME,StringType,true), StructField(count,IntegerType,true))

scala> val dataStream = spark.
     | readStream.
     | schema(staticSchema).
     | option("header","true").
     | csv("data/flight-data/csv/2015-summary.csv");
dataStream: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> dataStream.isStreaming;
res245: Boolean = true

Но я не могу продвинуться дальше в том, как читать данные как поток.

Я выполнил sort процесс преобразования

scala> dataStream.sort(col("count"));
res246: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string ... 1 more field]

Полагаю, теперь я должен использовать метод Dataset writeStream.Я выполнил следующие две команды, но обе вернули ошибки.

scala> dataStream.sort(col("count")).writeStream.
     | format("memory").
     | queryName("sorted_data").
     | outputMode("complete").
     | start();
org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;;

, а эта

scala> dataStream.sort(col("count")).writeStream.
     | format("memory").
     | queryName("sorted_data").
     | outputMode("append").
     | start();
org.apache.spark.sql.AnalysisException: Sorting is not supported on streaming DataFrames/Datasets, unless it is on aggregated DataFrame/Dataset in Complete output mode;;

Из ошибок кажется, что я должен агрегировать (группировать) данные, но я думал, что ямне не нужно это делать, поскольку я могу запустить любую пакетную операцию в виде потока.

Как понять, как сортировать данные, поступающие в виде потока?

1 Ответ

0 голосов
/ 10 февраля 2019

К сожалению, то, что сообщают об ошибках, является точным

Смысл, который вы указали:

, но я подумал, что мне не нужно это делать, поскольку я могу запустить любую пакетную операциюкак поток.

не без достоинств, но он упускает фундаментальный момент, что структурированная потоковая передача не тесно связана с микропакетированием.

Можно легко придумать какой-нибудь не поддающийся проверке взлом

import org.apache.spark.sql.functions._

dataStream
  .withColumn("time", window(current_timestamp, "5 minute"))  // Some time window
  .withWatermark("time", "0 seconds")  // Immediate watermark
  .groupBy("time")
  .agg(sort_array(collect_list(struct($"count", $"DEST_COUNTRY_NAME", $"ORIGIN_COUNTRY_NAME"))).as("data"))
  .withColumn("data", explode($"data"))
  .select($"data.*")
  .select(df.columns.map(col): _*)
  .writeStream
  .outputMode("append")
   ...
  .start()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...