Корпус:
Я знаю, что этот вопрос довольно длинный (извините за это). Я думаю, что подробное описание объясняет мой случай лучше.
Я получаю данные устройства от конечной точки потоковой передачи. Каждая полезная нагрузка в потоке включает в себя несколько объектов данных. Я создаю следующую структуру для хранения в Dynamo
db_pk = str(msg.get('key'))
db_sk = str(msg.get('sortKey'))
area_id = str(msg.get('area'))
## other entities in following mapped_values...
mapped_values = {
startTime: {
'alert_msg': alert,
'area ': area_id,
'speed': speed,
'state': state,
'startTime': startTime,
'duration': duration,
'inspector': inspector_id
}
}
# Data item
lt_item = {
'id': db_pk,
'serialNo': db_sk,
'value_map': [mapped_values]
}
Я использую следующий оператор обновления для создания сопоставления входящих данных в каждой полезной нагрузке.
table = dynamo_db.Table('device')
table.update_item(
Key = {
'id': db_pk,
'serialNo': db_sk
},
UpdateExpression = 'SET #value_map = list_append(#value_map, :mapped_values)',
ConditionExpression = 'attribute_exists(#value_map)',
ExpressionAttributeValues = {
':mapped_values': [mapped_values]
},
ExpressionAttributeNames = {
'#value_map': 'value_map'
}
)
Текущий результат
Вышеупомянутое выражение обновления создает карту сообщений под ключом value_map
в динамо, и всякий раз, когда новое сообщение получено, оно добавляется в карту следующим образом:
{
"id": "KD_125",
"serialNo": "KDCRT-231"
"value_map": [
{
"2019-05-29 14:36:03": { #first msg in payload
"alert_msg": "0x12 LOGICAL ERROR AT DIRECTION SELECT SIGNAL",
"area": "TX-112",
"speed": "65",
"startTime": "2019-05-29 14:36:03",
"state": "ACTIVE",
"duration": "None"
"inspector": "None"
}
},
{
"2019-05-29 14:36:03": { # second msg, same timestamp with first one
"alert_msg": "0x12 LOGICAL ERROR AT DIRECTION SELECT SIGNAL",
"area": "None",
"speed": "None",
"startTime": "2019-05-29 14:36:03",
"state": "PASSIVE",
"duration": "1200"
"inspector" "422TX19"
}
}
#another message with different timestamp and data values
]
}
Проблема и требование
Некоторые сообщения отправляются с одинаковой отметкой времени (startTime
). Если в сообщении указано state='ACTIVE'
, оно включает все данные, кроме duration
и inspector
; и если state='PASSIVE'
, он включает duration, inspector
и другие, кроме area
и speed
(см. образец выше).
Если в полезной нагрузке отправлено такое же timestamp
, мне нужно заменить значения None
(duration
и inspector
) и state
из 'ACTIVE'
сообщения о состоянии на сообщение о состоянии 'PASSIVE'
вместо создания двух объектов с одинаковым timestamp
. Таким образом, окончательная форма должна быть такой:
{
"id": "KD_125",
"serialNo": "KDCRT-231"
"value_map": [
{
"2019-05-29 14:36:03": {
"alert_msg": "0x12 LOGICAL ERROR AT DIRECTION SELECT SIGNAL",
"area": "TX-112",
"speed": "65",
"startTime": "2019-05-29 14:36:03",
"state": "PASSIVE", # updated from PASSIVE state msg
"duration": "1200" # updated values from PASSIVE state msg
"inspector": "422TX19" # updated values from PASSIVE state msg
}
}
# other messages with different timestamps
]
}
Я пробовал разные выражения обновления , такие как if_not_exists
. Кроме того, протестировано с
ExpressionAttributeNames = {
'#value_map': 'value_map.startTime'
}
вариант .. Но пока безуспешно. Я надеюсь, что кто-то может предложить мне способ сделать это обновление ..