У меня есть производитель кафки с 4 темами. Когда я загружаю CSV в соответствии с типом CSV, я отправляю его потребителю в зависимости от его темы. В первый раз, если я загружу файл, он будет отлично работать, НО, когда я загружу другой файл CSV, он также загрузит предыдущие данные, а затем добавит новые данные CSV.
Я новичок в kafka, поэтому я не смог найти правильного решения этой проблемы. Я пытался найти смещение, но не смог его реализовать. Я попытался сбросить массив обратно в ноль, но после сброса в ноль, когда приходят новые файлы, в них есть как предыдущие, так и новые данные.
Код производителя.js
var kafka = require('kafka-node'),
HighLevelProducer = kafka.HighLevelProducer,
HighLevelConsumer = kafka.HighLevelConsumer,
client = new kafka.Client(),
producer = new HighLevelProducer(client),
fs = require('fs'),
consumer = new HighLevelConsumer(client, [{ topic: 'csvDealData', partition: 0 }, { topic: 'csvAssetData', partition: 0 }, { topic: 'csvPricingData', partition: 0 }, { topic: 'csvRedeemData', partition: 0 }], { autoCommit: false });
var payloads;
var async = require('async');
console.log("STARTING PRODUCER");
var config = require("./config.json")
var http = require('http');
var express = require('express');
var app = express();
var port = '9094';
let tempCSVArray = [];
var server = http.createServer(app).listen(port);
server.timeout = 24000;
var totalDataLength = 0;
var tempIndex;
// var offset = new kafka.Offset(client)
// offset.fetchLatestOffsets([topic], (err, offsets) => {
// if (err) {
// console.log(`error fetching latest offsets ${err}`)
// return
// }
// var latest = 1
// Object.keys(offsets[topic]).forEach( o => {
// latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
// })
// console.log(" topic ::: "+topic);
// console.log(" offsets ::: "+offsets);
// console.log(" latest ::: "+latest);
// consumer.setOffset(topic, 0, latest-1)
// });
var io = require('socket.io').listen(server, function () {
console.log("Connected To Invoke Server.... ");
});
io.on('connection', function (socket) {
socket.on('csvDataFromUI', function (data) {
producer.on('error', function (err) { });
tempCSVArray.push(data.dataArr);
// here we are getting all the rows from CSV and we wait for the end line to come once we recived it we create and array and then send it tp and async function
if (data.isEnd) {
totalDataLength = tempCSVArray.length
console.log(" \n length of data send to invoke function is ::::: " + totalDataLength + " \n dataArray value :::: " + JSON.stringify(tempCSVArray));
csvInvoke(tempCSVArray);
}
});
})
function csvInvoke(tempCSVArray) {
async.eachOfSeries(tempCSVArray, (a, index, asyncCallback) => {
a = a[0];
tempIndex = index;
let csvType = a.CsvType;
if (csvType === "DealCaptureUpload") {
var message = a;
var originator = a.RepoData[0].Party[0].ParticipantID.trim();
var collection = a.Collection;
console.log(" originator :::: " + originator);
console.log("\nCollection: " + collection);
if (a.RepoData[0].Trade[0].TransactionStatus == "NEW") {
var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "DealPrivateCC", "fcn": "invoke", "Invokeargs": ["Capture", collection, message, "NEW", "DealPrivateCC", "globalchannel"], "username": "adminY", "orgName": config.orgList[0][originator] };
payloads = [{ topic: 'csvDealData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
} else if (a.RepoData[0].Trade[0].TransactionStatus == "CANCEL") {
var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "DealPrivateCC", "fcn": "invoke", "Invokeargs": ["Capture", collection, message, "CANCEL", "PartyPrivateCC", "globalchannel"], "username": "adminY", "orgName": config.orgList[0][originator] };
payloads = [{ topic: 'csvDealData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
}
setTimeout(() => {
producer.send(payloads, function (err, data) {
if (err != null)
console.log("Error sending payload to consumer - " + err)
else
console.log("\n index ::: " + tempIndex + "\n Payloads :::::::: " + JSON.stringify(payloads));
});
asyncCallback();
}, 6000);
} else if (csvType === "AssetIssuanceUpload") {
var message = a.Record[0];
var originator = a.Party.trim();
var collection = a.Collection;
console.log(" \n originator :::: " + originator);
console.log("\n Collection: " + collection);
var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "OwnershipPrivateCC", "fcn": "invokeInternal", "Invokeargs": ["Creation", collection, message], "username": "adminY", "orgName": config.orgList[0][originator] };
payloads = [{ topic: 'csvAssetData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
setTimeout(() => {
producer.send(payloads, function (err, data) {
if (err != null)
console.log("Error sending payload to consumer - " + err)
else
console.log("\n index ::: " + tempIndex + "\n Payloads :::::::: " + JSON.stringify(payloads));
});
asyncCallback();
}, 500);
} else if (csvType === "PricingDataUpload") {
var message = a.Record[0];
var originator = a.Party.trim();
console.log(" \n originator :::: " + originator);
var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "DataCC", "fcn": "invoke", "Invokeargs": ["DataSetup", message], "username": "adminY", "orgName": config.orgList[0][originator] };
payloads = [{ topic: 'csvPricingData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
setTimeout(() => {
producer.send(payloads, function (err, data) {
if (err != null)
console.log("Error sending payload to consumer - " + err)
else
console.log("\n index ::: " + index + "\n Payloads :::::::: " + JSON.stringify(payloads));
});
asyncCallback();
}, 500);
} else if (csvType === "RedeemDataUpload") {
var message = a.Record[0];
var originator = a.Party.trim();
var collection = a.Collection;
console.log(" \n originator :::: " + originator);
console.log("\n Collection: " + collection);
var newdata = { "originator": originator, "peers": ["0.0.0.0:" + config.peerPort[0][originator]], "channelName": "globalchannel", "chaincodeName": "PrivateCC", "fcn": "invoke", "Invokeargs": ["invokeWithdrawal", collection, message, "OwnershipPrivateCC"], "username": "adminY", "orgName": config.orgList[0][originator] };
payloads = [{ topic: 'csvRedeemData', messages: JSON.stringify(newdata), partition: 0, originator: originator }];
setTimeout(() => {
producer.send(payloads, function (err, data) {
if (err != null)
console.log("Error sending payload to consumer - " + err)
else
console.log("\n index ::: " + index + "\n Payloads :::::::: " + JSON.stringify(payloads));
});
asyncCallback();
}, 2000);
}
if (index === totalDataLength - 1) {
tempCSVArray = [];
//a = [];
payloads = [];
console.log(" We are flushing the tempCSVArray ::::");
console.log("\n final tempCSVArray ::: " + JSON.stringify(tempCSVArray) + " final a :::::: final payloads ::::: " + JSON.stringify(payloads))
}
client.refreshMetadata(['csvDealData', 'csvAssetData', 'csvPricingData', 'csvRedeemData'], (err) => {
if (err) {
console.warn('Error refreshing kafka metadata', err);
}
});
}, function (err) {
if (err) console.error(err.message);
console.warn('Error refreshing kafka metadata', err);
});
}
Код для потребителя
var kafka = require('kafka-node');
var HighLevelConsumer = kafka.HighLevelConsumer;
var Client = kafka.Client;
var argv = require('optimist').argv;
var client = new Client('localhost:2181');
var topics = [{ topic: 'csvDealData' }, { topic: 'csvAssetData' }, { topic: 'csvPricingData' }, { topic: 'csvRedeemData' }];
var options = { autoCommit: true, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024 };
var consumer = new HighLevelConsumer(client, topics, options);
console.log("STARTING CONSUMER");
var hfcSocket = require("socket.io-client");
var invoke1 = hfcSocket.connect('http://10.21.134.17:5001');
var invoke2 = hfcSocket.connect('http://10.21.134.17:5004');
var invoke3 = hfcSocket.connect('http://10.21.134.17:5005');
var invoke4 = hfcSocket.connect('http://10.21.134.17:5002');
var invoke5 = hfcSocket.connect('http://10.21.134.17:5006');
var invoke6 = hfcSocket.connect('http://10.21.134.17:5003');
var invoke7 = hfcSocket.connect('http://10.21.134.17:5007');
var invoke8 = hfcSocket.connect('http://10.21.134.17:5008');
consumer.on('message', function (message) {
console.log(" message in consumer :::: " + JSON.stringify(message));
if (message.topic == "csvDealData") {
console.log(" Message ::: " + JSON.stringify(message));
var originator = JSON.parse(message.value).originator;
if (originator == "Org1")
invoke1.emit('csvDealData', message)
else if (originator == "Org2")
invoke2.emit('csvDealData', message);
else if (originator == "Org3")
invoke6.emit('csvDealData', message);
else if (originator == "Org4")
invoke5.emit('csvDealData', message);
else if (originator == "Org5")
invoke3.emit('csvDealData', message);
else if (originator == "Org6")
invoke4.emit('csvDealData', message);
else if (originator == "Org7")
invoke7.emit('csvDealData', message);
else if (originator == "Org8")
invoke8.emit('csvDealData', message);
} else if (message.topic == "csvAssetData") {
var originator = JSON.parse(message.value).originator;
if (originator == "Org1")
invoke1.emit('csvAssetData', message)
else if (originator == "Org2")
invoke2.emit('csvAssetData', message);
else if (originator == "Org3")
invoke6.emit('csvAssetData', message);
else if (originator == "Org4")
invoke5.emit('csvAssetData', message);
else if (originator == "Org5")
invoke3.emit('csvAssetData', message);
else if (originator == "Org6")
invoke4.emit('csvAssetData', message);
else if (originator == "Org7")
invoke7.emit('csvAssetData', message);
else if (originator == "Org8")
invoke8.emit('csvAssetData', message);
} else if (message.topic == "csvPricingData") {
var originator = JSON.parse(message.value).originator;
if (originator == "Org1")
invoke1.emit('csvPricingData', message)
else if (originator == "Org2")
invoke2.emit('csvPricingData', message);
else if (originator == "Org3")
invoke6.emit('csvPricingData', message);
else if (originator == "Org4")
invoke5.emit('csvPricingData', message);
else if (originator == "Org5")
invoke3.emit('csvPricingData', message);
else if (originator == "Org6")
invoke4.emit('csvPricingData', message);
else if (originator == "Org7")
invoke7.emit('csvPricingData', message);
else if (originator == "Org8")
invoke8.emit('csvPricingData', message);
} else if (message.topic == "csvRedeemData") {
var originator = JSON.parse(message.value).originator;
if (originator == "Org1")
invoke1.emit('csvRedeemData', message)
else if (originator == "Org2")
invoke2.emit('csvRedeemData', message);
else if (originator == "Org3")
invoke6.emit('csvRedeemData', message);
else if (originator == "Org4")
invoke5.emit('csvRedeemData', message);
else if (originator == "Org5")
invoke3.emit('csvRedeemData', message);
else if (originator == "Org6")
invoke4.emit('csvRedeemData', message);
else if (originator == "Org7")
invoke7.emit('csvRedeemData', message);
else if (originator == "Org8")
invoke8.emit('csvRedeemData', message);
}
});
consumer.on('error', function (err) {
console.log('error', err);
});
Я ожидал, что если пользователь уже загрузил CSV и загрузит другой тип CSV, он не должен препятствовать данным предыдущего CSV, а затем он должен также правильно отправлять новый CSV без вызова предыдущих данных.
моя скриптовая ссылка для продюсера.js
ссылка для потребителя