Spark Structured Streaming: агрегирование строк в столбцы без водяных знаков - PullRequest
0 голосов
/ 21 января 2019

У меня есть две следующие схемы паркета:

users:
--------------------
|user_id | username|
|--------|---------|
|  1     |  micky  |
|  2     |  minnie |
|__3_____|__donald_|

orders:
--------------------
|user_id | order_id|
|--------|---------|
|  1     |  411    |
|  2     |  412    |
|  3     |  413    |
|  1     |  414    |
|  2     |  415    |
|  3     |  416    |
|  3     |  471    |
|  3     |  482    |
|__3_____|__500____|

Я пытаюсь собрать все заказы для клиента в один столбец:

users_and_orders
-------------------------------------------
|user_id | username| orders               |
|--------|---------|-------- --------------
|  1     |  micky  | [411,414]            |
|  2     |  minnie | [412,415]            |
|__3_____|__donald_| [413,416,471,482,500]|

Код выглядит следующим образом:

val users = sparkSession.readStream.option("checkpointLocation", "somelocation")
  .schema(userSchema.asInstanceOf[StructType])
  .format("parquet")
  .load(commandLineArguments.userPath)
  .distinct()

val orders = sparkSession
  .read
  .option("checkpointLocation", "someOtherLocation")
  .schema(orderSchema.asInstanceOf[StructType])
  .format("parquet")
  .load(commandLineArguments.orderPath)



val userJoinOrders = users.join(orders, Seq("user_id"))

val currentDate = new SimpleDateFormat("ddMM").format(new Date())
val currentTimeFormat = new SimpleDateFormat("hhmm")
val dateFormat = new SimpleDateFormat("YYMMDD")
val systemDateTimeFormat = new SimpleDateFormat("YYYYMMDDHHmmss")



//Original data does not have watermark/timestamp
val usersWithOrders = userJoinOrders
  .withColumn("timestamp", lit(unix_timestamp(date_format(current_timestamp, "yyyy-MM-dd HH-mm"),"yyyy-MM-dd HH-mm").cast("timestamp")))
  .withWatermark("timestamp","10 minutes")
  .groupBy("user_id","username","timestamp")
  .agg(collect_set(struct($"order_id")) as "orders")
  .map(account => {
    //Do whatever
})

Как я понимаю, поскольку я собираю потоковые данные, для этого требуется водяной знак. Исходные данные не имеют водяных знаков. Поэтому я должен был сделать один. Но вышесказанное не работает. Когда я запускаю потоковый код, в выводе я вижу:

Streaming query made progress: {
  "id" : "5f31d025-712e-4577-b6d7-8e8d450ac926",
  "runId" : "643fe17a-979a-4599-ac94-45be60136350",
  "name" : null,
  "timestamp" : "2019-01-21T11:07:22.922Z",
  "batchId" : 0,
  "numInputRows" : 856,
  "processedRowsPerSecond" : 52.42207116173679,
  "durationMs" : {
    "addBatch" : 15222,
    "getBatch" : 44,
    "getOffset" : 45,
    "queryPlanning" : 888,
    "triggerExecution" : 16329,
    "walCommit" : 51
  },
  "eventTime" : {
    "avg" : "2019-01-21T11:07:00.000Z",
    "max" : "2019-01-21T11:07:00.000Z",
    "min" : "2019-01-21T11:07:00.000Z",
    "watermark" : "1970-01-01T00:00:00.000Z"
  },
  "stateOperators" : [ {
    "numRowsTotal" : 856,
    "numRowsUpdated" : 856,
    "memoryUsedBytes" : 805831,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 0,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 777031
    }
  }, {
    "numRowsTotal" : 856,
    "numRowsUpdated" : 856,
    "memoryUsedBytes" : 659463,
    "customMetrics" : {
      "loadedMapCacheHitCount" : 0,
      "loadedMapCacheMissCount" : 0,
      "stateOnCurrentVersionSizeBytes" : 630663
    }
  } ],
  "sources" : [ {
    "description" : "FileStreamSource[file:/blah/blah/blah]",
    "startOffset" : null,
    "endOffset" : {
      "logOffset" : 0
    },
    "numInputRows" : 856,
    "processedRowsPerSecond" : 52.42207116173679
  } ],
  "sink" : {
    "description" : "FileSink[/blah/blah/blah/user-orders]"
  }
}

Мой вопрос:

  1. Что я делаю не так?
  2. Есть ли лучший способ конвертировать связанные строки в значениях столбцов и без необходимости использовать агрегат / группу по?

Любая помощь будет принята с благодарностью.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...