Исходя из различных критериев, я должен сравнить временную метку текущей строки с предыдущей строкой и проверить разницу.Это большой массив данных, содержащий более миллиарда строк.Вот формулировка проблемы: ![enter image description here](https://i.stack.imgur.com/CuFMa.png)
Для каждой комбинации errorId
и containerId
- , если есть только один
logId
, этоэто новая запись в журнале ошибок - в случае более чем одного refId, если разница меток времени между предыдущей строкой и текущей строкой превышает 1 минуту, это новая запись в журнале ошибок.
Вот реализация:
//Scala Sequence of the columns
List<Column> columns = Arrays.asList(rawData.col("errorId"), rawData.col("containerid"),
rawData.col("logId"));
Seq<Column> columnSeq = JavaConverters.asScalaIteratorConverter(columns.iterator()).asScala().toSeq();
//repartitioning raw data
Dataset<Row> sortedData = rawData
.repartition(columnSeq)
.sort("timestamp");
//Windowspec
WindowSpec w = Window.partitionBy(columnSeq).orderBy("timestamp");
//lag function
sortedData.withColumn("new_log_entry", functions.lag("timestamp", 60).over(w), "false");
//Write sorted data to db
Это хорошая идея, чтобы перераспределить данные дважды?сначала создайте sortedData
, а затем WindowSpec
.
Хорошо ли выглядит эта реализация?Как мне проверить null
logIds и т. Д.?