У меня есть две следующие схемы паркета:
users:
--------------------
|user_id | username|
|--------|---------|
| 1 | micky |
| 2 | minnie |
|__3_____|__donald_|
orders:
--------------------
|user_id | order_id|
|--------|---------|
| 1 | 411 |
| 2 | 412 |
| 3 | 413 |
| 1 | 414 |
| 2 | 415 |
| 3 | 416 |
| 3 | 471 |
| 3 | 482 |
|__3_____|__500____|
Я пытаюсь собрать все заказы для клиента в один столбец:
users_and_orders
-------------------------------------------
|user_id | username| orders |
|--------|---------|-------- --------------
| 1 | micky | [411,414] |
| 2 | minnie | [412,415] |
|__3_____|__donald_| [413,416,471,482,500]|
Код выглядит следующим образом:
val users = sparkSession.readStream.option("checkpointLocation", "somelocation")
.schema(userSchema.asInstanceOf[StructType])
.format("parquet")
.load(commandLineArguments.userPath)
.distinct()
val orders = sparkSession
.read
.option("checkpointLocation", "someOtherLocation")
.schema(orderSchema.asInstanceOf[StructType])
.format("parquet")
.load(commandLineArguments.orderPath)
val userJoinOrders = users.join(orders, Seq("user_id"))
val currentDate = new SimpleDateFormat("ddMM").format(new Date())
val currentTimeFormat = new SimpleDateFormat("hhmm")
val dateFormat = new SimpleDateFormat("YYMMDD")
val systemDateTimeFormat = new SimpleDateFormat("YYYYMMDDHHmmss")
//Original data does not have watermark/timestamp
val usersWithOrders = userJoinOrders
.withColumn("timestamp", lit(unix_timestamp(date_format(current_timestamp, "yyyy-MM-dd HH-mm"),"yyyy-MM-dd HH-mm").cast("timestamp")))
.withWatermark("timestamp","10 minutes")
.groupBy("user_id","username","timestamp")
.agg(collect_set(struct($"order_id")) as "orders")
.map(account => {
//Do whatever
})
Как я понимаю, поскольку я собираю потоковые данные, для этого требуется водяной знак. Исходные данные не имеют водяных знаков. Поэтому я должен был сделать один. Но вышесказанное не работает. Когда я запускаю потоковый код, в выводе я вижу:
Streaming query made progress: {
"id" : "5f31d025-712e-4577-b6d7-8e8d450ac926",
"runId" : "643fe17a-979a-4599-ac94-45be60136350",
"name" : null,
"timestamp" : "2019-01-21T11:07:22.922Z",
"batchId" : 0,
"numInputRows" : 856,
"processedRowsPerSecond" : 52.42207116173679,
"durationMs" : {
"addBatch" : 15222,
"getBatch" : 44,
"getOffset" : 45,
"queryPlanning" : 888,
"triggerExecution" : 16329,
"walCommit" : 51
},
"eventTime" : {
"avg" : "2019-01-21T11:07:00.000Z",
"max" : "2019-01-21T11:07:00.000Z",
"min" : "2019-01-21T11:07:00.000Z",
"watermark" : "1970-01-01T00:00:00.000Z"
},
"stateOperators" : [ {
"numRowsTotal" : 856,
"numRowsUpdated" : 856,
"memoryUsedBytes" : 805831,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 777031
}
}, {
"numRowsTotal" : 856,
"numRowsUpdated" : 856,
"memoryUsedBytes" : 659463,
"customMetrics" : {
"loadedMapCacheHitCount" : 0,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 630663
}
} ],
"sources" : [ {
"description" : "FileStreamSource[file:/blah/blah/blah]",
"startOffset" : null,
"endOffset" : {
"logOffset" : 0
},
"numInputRows" : 856,
"processedRowsPerSecond" : 52.42207116173679
} ],
"sink" : {
"description" : "FileSink[/blah/blah/blah/user-orders]"
}
}
Мой вопрос:
- Что я делаю не так?
- Есть ли лучший способ конвертировать связанные
строки в значениях столбцов и без необходимости использовать агрегат / группу по?
Любая помощь будет принята с благодарностью.