Попробуйте проверить, публикуется ли в один топи c go через полный рабочий процесс. рабочий процесс: Publi sh to Topi c A -> облачная функция -> publi sh to TopicB
с использованием ts, @ google-cloud / pubsub
при запуске ниже код он проходит сразу без каких-либо ошибок или проверок сообщений подписки. Нужна помощь в исправлении этого кода, чтобы он работал, чтобы он проходил только в том случае, если сообщение подписки совпадает с shouldBeMessage, но в настоящее время оно проходит без ожидания подписки для запуска функции onMessage.
код:
const filePath = path.join (__ dirname, "file. json"); const file = JSON .parse (fs.readFileSyn c (filePath) .toString ());
// Publish to pub/sub topic
const topic = pubSubClient.topic(TopicA);
const publishMessageID = await topic.publish(Buffer.from(JSON.stringify(file)));
console.log(`Publish Message Id is : ${publishMessageID}`);
// Subscribe to the topic
await pubSubClient.subscription(TopicBSubscriptionName, function (err, subscription) {
if(err){
throw err;
}
function onError(err) {
console.log(`Error is : ${err}`);
throw err;
}
function onMessage(message) {
console.log(`Received message ${message.id}:`);
const parsedMessage = JSON.parse(message.data);
console.log(`Parsed message.data : ${parsedMessage}`);
console.log(`${shouldBeMessage.mappings}`)
message.ack();
return message;
}
subscription.on("error", onError);
subscription.on("message", onMessage).then((value) =>{
if(JSON.parse(value.data.mappings) === shouldBeMessage.mappings ){
return console.log(`Mapping object successfully matched`);
}
else {throw "Mapping object didn't match";}
});
//Remove listers to stop pulling for messages
setTimeout(() => {
subscription.removeListener('message', onMessage);
subscription.removeListener('error', onError);
}, timeout * 1000);
});