Я использую kafka для получения данных на потоковый процессор WSO2 в формате Json, но я получаю сообщение об ошибке «содержит отсутствующие атрибуты». Поэтому выкидывает сообщение «ошибка» в каждый файл json, который я отправляю в тему wso2 kafka. Я скачал последний разъем JSON siddhi "https://store.wso2.com/store/assets/analyticsextension/details/0e6a6b38-f1d1-49f5-a685-e8c16741494d" и заменил его в своем каталоге wso2 / lib.
Я не получаю никакой ошибки, если запускаю это на симуляторе, но только когда события публикуются в kafkaТема вручную.
Ниже приведен код процессора WSO2 Stream:
@App:name('Transaction json')
/*
TransactionStream definition. It receives events from "kafka_topic" in json format.
*/
@source(type = 'kafka', topic.list = 'kafka_topic', partition.no.list = '0', threading.option = 'single.thread', group.id = 'group', bootstrap.servers = 'localhost:9092',
@map(type = 'json',enclosing.element='$',
@attributes(code = "code", name = "name",desc = "desc",transRefId ="transRefId",origAmount ="origAmount",amount = "amount",currency = "currency",requestId = "requestId",redeemedCashcode = "redeemedCashcode", sender_id ="sender.id",sender_name = "sender.name", sender_phone = "sender.phone",sender_pocket = "sender_pocket",sender_client = "sender.client",receiver_id = "receiver.id",receiver_name = "receiver.name",receiver_phone = "receiver.phone",receiver_pocket = "receiver.pocket",receiver_client = "receiver.client",beneficiary_phone = "beneficiary.phone",receiver_client = "receiver.client",beneficiary_phone = "beneficiary.phone",beneficiary_name = "beneficiary.name",beneficiary_nric = "beneficiary.nric",depositor_phone = "depositor.phone",depositor_name = "depositor.name",depositor_nric = "depositor.nric",offer = "offer",service = "service" , message = "message",status = "status",processedBy_id = "processedBy.id",processedBy_name = "processedBy.name",processedBy_phone = "processedBy.phone",processedBy_client = "processedBy.client",processedBy_owner = "processedBy.owner",processedAt = "processedAt",fees_debitFee = "fees.debitFee",fees_creditFee = "fees.creditFee",deviceId = "deviceId",isRefund ="isRefund",oldTransRefId = "oldTransRefId",linkBankTrans_err = "linkBankTrans.err",linkBankTrans_message = "linkBankTrans.message",linkBankTrans_data = "linkBankTrans.data",linkBankTrans_status = "linkBankTrans.status",linkBankTrans_request_url = "linkBankTrans.request.url",linkBankTrans_request_requestParams = "linkBankTrans.request.requestParams",linkBankTrans_action = "linkBankTrans.action",bankAccountNo = "bankAccountNo",transType = "transType",devGrp = "devGrp",createdAt = "createdAt",updatedAt = "updatedAt")))
define stream TransactioninputStream (code string, name string, desc string, transRefId string, origAmount float, amount float, currency string, requestId string, redeemedCashcode string, sender_id string, sender_name string, sender_phone string, sender_pocket string, sender_client string, receiver_id string, receiver_name string, receiver_phone string, receiver_client string, beneficiary_phone string, beneficiary_name string, beneficiary_nric string, depositor_phone string, depositor_name string,depositor_nric string, offer string, service string, message string, status string, processedBy_id string, processedBy_name string, processedBy_phone string, processedBy_client string, processedBy_owner string,processedAt string, fees_debitFee float, fees_creditFee float, deviceId string, isRefund string, oldTransRefId string, linkBankTrans_err string, linkBankTrans_message string, linkBankTrans_data string, linkBankTrans_status string, linkBankTrans_request_url string, linkBankTrans_request_requestParams string, linkBankTrans_action string, bankAccountNo string, transType string, devGrp string, createdAt string);
Ссылка Json:
{
"cd": "acb235dd",
"name": "Newtest",
"desc": "testing env",
"ref": "3232d3dew3",
"ora": 500000,
"amount": 500000,
"curr": "INR",
"sen": {
"id": "fdgdfgv",
"name": "dao",
"phone": "8268826483",
"pocket": "bde4gvfdgd3fd",
"cl": "try"
},
"rec": {
"id": "fsfsgfs3322",
"name": "mmv",
"phone": "76288373",
"pocket": "null",
"cl": "test"
},
"bef": {
"phone": "null",
"name": "null",
"ic": "null"
},
"dep": {
"phone": "null",
"name": "null",
"ic": "null"
},
"offer": "htgdte44",
"service": "gdrgdrgdv34",
"status": "done",
"prb": {
"id": "fdgdgd",
"name": "test",
"phone": "frgvrd",
"cl": "test",
"owner": "null"
},
"processedAt": {
"$date": "2019-09-19T10:17:05.377+0000"
},
"fees": {
"debitFee": 0,
"creditFee": 0,
},
"dId": "vdsvdd433",
"anumb": "xxxx6452",
"ttype": "normal",
"devGrp": 0,
"createdAt": {
"$date": "2019-09-19T10:17:05.381+0000"
},
"updatedAt": {
"$date": "2019-09-19T10:17:05.381+0000"
},
"_id": {
"$oid": "5d8355a1a3b8053cb768eea8"
},
"bankTrans": {
"err": "0",
"message": "successfully!",
"data": "fbsvbsgfiyshiu39",
"status": 0,
"request": {
"url": "http://localhost/testing",
"requestParams": "89743874023804832084093278327082384-329-4932-r-98-384-83-24"
},
"action": "testing"
}
}