WSO2 потоковый процессор, ошибка json "содержит недостающие атрибуты" с kafka - PullRequest
0 голосов
/ 08 ноября 2019

Я использую kafka для получения данных на потоковый процессор WSO2 в формате Json, но я получаю сообщение об ошибке «содержит отсутствующие атрибуты». Поэтому выкидывает сообщение «ошибка» в каждый файл json, который я отправляю в тему wso2 kafka. Я скачал последний разъем JSON siddhi "https://store.wso2.com/store/assets/analyticsextension/details/0e6a6b38-f1d1-49f5-a685-e8c16741494d" и заменил его в своем каталоге wso2 / lib.

Я не получаю никакой ошибки, если запускаю это на симуляторе, но только когда события публикуются в kafkaТема вручную.

Ниже приведен код процессора WSO2 Stream:

@App:name('Transaction json')
/*

TransactionStream definition. It receives events from "kafka_topic" in json format.

*/

@source(type = 'kafka', topic.list = 'kafka_topic', partition.no.list = '0', threading.option = 'single.thread', group.id = 'group', bootstrap.servers = 'localhost:9092', 
    @map(type = 'json',enclosing.element='$',
    @attributes(code = "code", name = "name",desc = "desc",transRefId ="transRefId",origAmount ="origAmount",amount = "amount",currency = "currency",requestId = "requestId",redeemedCashcode = "redeemedCashcode", sender_id ="sender.id",sender_name = "sender.name", sender_phone = "sender.phone",sender_pocket = "sender_pocket",sender_client = "sender.client",receiver_id = "receiver.id",receiver_name = "receiver.name",receiver_phone = "receiver.phone",receiver_pocket = "receiver.pocket",receiver_client = "receiver.client",beneficiary_phone = "beneficiary.phone",receiver_client = "receiver.client",beneficiary_phone = "beneficiary.phone",beneficiary_name = "beneficiary.name",beneficiary_nric = "beneficiary.nric",depositor_phone = "depositor.phone",depositor_name = "depositor.name",depositor_nric = "depositor.nric",offer = "offer",service = "service" , message = "message",status = "status",processedBy_id = "processedBy.id",processedBy_name = "processedBy.name",processedBy_phone = "processedBy.phone",processedBy_client = "processedBy.client",processedBy_owner = "processedBy.owner",processedAt = "processedAt",fees_debitFee = "fees.debitFee",fees_creditFee = "fees.creditFee",deviceId = "deviceId",isRefund ="isRefund",oldTransRefId = "oldTransRefId",linkBankTrans_err = "linkBankTrans.err",linkBankTrans_message = "linkBankTrans.message",linkBankTrans_data = "linkBankTrans.data",linkBankTrans_status = "linkBankTrans.status",linkBankTrans_request_url = "linkBankTrans.request.url",linkBankTrans_request_requestParams = "linkBankTrans.request.requestParams",linkBankTrans_action = "linkBankTrans.action",bankAccountNo = "bankAccountNo",transType = "transType",devGrp = "devGrp",createdAt = "createdAt",updatedAt = "updatedAt")))
define stream TransactioninputStream (code string, name string, desc string, transRefId string, origAmount float, amount float, currency string, requestId string, redeemedCashcode string, sender_id string, sender_name string, sender_phone string, sender_pocket string, sender_client string, receiver_id string, receiver_name string, receiver_phone string, receiver_client string, beneficiary_phone string, beneficiary_name string, beneficiary_nric string, depositor_phone string, depositor_name string,depositor_nric  string, offer string, service string, message string, status string, processedBy_id string, processedBy_name string, processedBy_phone string, processedBy_client string, processedBy_owner string,processedAt string, fees_debitFee float, fees_creditFee float, deviceId string, isRefund string, oldTransRefId string, linkBankTrans_err string, linkBankTrans_message string, linkBankTrans_data string, linkBankTrans_status string, linkBankTrans_request_url string, linkBankTrans_request_requestParams string, linkBankTrans_action string, bankAccountNo string, transType string, devGrp string, createdAt string);

Ссылка Json:

{
  "cd": "acb235dd",
  "name": "Newtest",
  "desc": "testing env",
  "ref": "3232d3dew3",
  "ora": 500000,
  "amount": 500000,
  "curr": "INR",
  "sen": {
    "id": "fdgdfgv",
    "name": "dao",
    "phone": "8268826483",
    "pocket": "bde4gvfdgd3fd",
    "cl": "try"
  },
  "rec": {
    "id": "fsfsgfs3322",
    "name": "mmv",
    "phone": "76288373",
    "pocket": "null",
    "cl": "test"
  },
  "bef": {
    "phone": "null",
    "name": "null",
    "ic": "null"
  },
  "dep": {
    "phone": "null",
    "name": "null",
    "ic": "null"
  },
  "offer": "htgdte44",
  "service": "gdrgdrgdv34",
  "status": "done",
  "prb": {
    "id": "fdgdgd",
    "name": "test",
    "phone": "frgvrd",
    "cl": "test",
    "owner": "null"
  },
  "processedAt": {
    "$date": "2019-09-19T10:17:05.377+0000"
  },
  "fees": {
    "debitFee": 0,
    "creditFee": 0,
  },
  "dId": "vdsvdd433",
  "anumb": "xxxx6452",
  "ttype": "normal",
  "devGrp": 0,
  "createdAt": {
    "$date": "2019-09-19T10:17:05.381+0000"
  },
  "updatedAt": {
    "$date": "2019-09-19T10:17:05.381+0000"
  },
  "_id": {
    "$oid": "5d8355a1a3b8053cb768eea8"
  },
  "bankTrans": {
    "err": "0",
    "message": "successfully!",
    "data": "fbsvbsgfiyshiu39",
    "status": 0,
    "request": {
      "url": "http://localhost/testing",
      "requestParams": "89743874023804832084093278327082384-329-4932-r-98-384-83-24"
    },
    "action": "testing"
  }
}

1 Ответ

0 голосов
/ 11 ноября 2019

Это происходит, когда в сообщении отсутствуют некоторые атрибуты. Здесь в примере сообщения отсутствует атрибут кода. Вот почему сообщения отбрасываются. Тем не менее, вы можете попросить siddhi JSON mapper обрабатывать сообщения, даже если атрибуты отсутствуют, используя fail.on.missing.attributes=false. Пожалуйста, ознакомьтесь с документами API JSON mapper https://siddhi -io.github.io / siddhi-map-json / api / 5.0.5 /

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...