Странная проблема при обработке событий, исходящих от кинезиса - PullRequest
0 голосов
/ 18 апреля 2019

Я устанавливаю amazon connect на aws, и если я сделаю тестовый вызов, он поместит этот вызов в поток aws kinesis. Я пытаюсь написать лямбду, которая обрабатывает эти записи и сохраняет их в базе данных. Если я делаю простой звонок (позвоните по номеру - asnwer - Hangup), он работает просто отлично. Однако, если я сделаю многочастный вызов (вызову номер - ответ - перевод на другой номер - зависание), это переходит в kinesis как две отдельные записи (CTR). Моя лямбда обрабатывает CTR (Contact Trace Records) один за другим. Сначала он сохраняет CTR в таблицу с именем call_segments, а затем запрашивает эту таблицу, чтобы увидеть, есть ли другая часть этого вызова уже там. Если это так, он объединяет данные и сохраняет в таблицу с именем complete_calls, в противном случае он пропускается. Если вызов имеет более одного сегмента (если он был переведен на другой номер), он будет представлен вам как два события. Моя проблема в том, что, хотя я обрабатываю события одно за другим, кажется, что при обработке второго события (технически сегмент вызова из первого события уже находится в базе данных) он не может видеть первый сегмент вызова.

вот мой код:

const callRecordService = require("./call-records-service");
exports.handler = async (event) => {
await Promise.all(
    event.Records.map(async (record) => {
        return processRecord(record);
    })
);
};
const processRecord = async function(record)        {
    try{
        const payloadStr = new Buffer(record.kinesis.data, "base64").toString("ascii");
        let payload = JSON.parse(payloadStr);
    await     callRecordService.processCTR(payload);
}
catch(err){
    // console.error(err);
}
};

и вот служебный файл:

async function processCTR(ctr) {
let userId = "12"
let result = await saveCtrToContactDetails(ctr, userId);
    let paramsForCallSegments = [ctr.InstanceARN.split("instance/").pop(), ctr.ContactId]
    let currentCallSegements = await dbHelper.getAll(dbQueries.getAllCallSegmentsQuery, paramsForCallSegments)
    let completedCall = checkIfCallIsComplete(currentCallSegements);
    if (completedCall) {
        console.log('call is complete')
       let results = await saveCallToCompletedCalls(completedCall);
    }
}

//------------- Private functions --------------------
const saveCtrToContactDetails = async (ctr, userId) => {
let params = [ctr.ContactId,userId,ctr.callDuration];
let results = await dbHelper.executeQuery(dbQueries.getInsertCallDetailsRecordsQuery, params);
return results;
}

const checkIfCallIsComplete = (currentCallSegements) => {
   //This function checks if all callSegments are already in call_segments table.

}

const saveCallToCompletedCalls = async (completedCall) => {
    let contact_id = completedCall[0].contact_id;
    let user_id = completedCall[0].user_id;
    let call_duration = completedCall[0] + completedCall[1]
    completedCall.forEach(callSegment => {
        call_duration += callSegment.call_duration;
});
let params = [contact_id, user_id, call_duration];
let results = await dbHelper.executeQuery(dbQueries.getInsertToCompletedCallQuery, params);
};
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...