Скажите, у меня есть это:
const register = function(cb){
process.on('message', m => {
cb(m, (err, v) => {
process.send({error: err, value: v});
});
});
};
мы используем вышеперечисленное как:
register((m, cb) => {
// do something with m
setTimeout(() => {
cb('some error', m);
}, 100);
});
мой вопрос - есть ли способ использовать Rx.Observable
с этим - большинство примеровRx.Observable.fromCallback
обернет обратный вызов с ошибкой.Но в этом случае обратный вызов не является ошибкой.Как упражнение, как заставить это работать для безошибочных первых обратных вызовов?Вот начало:
const {Observable, bindNodeCallback} = require('rxjs');
const obs = bindNodeCallback(register);
const subscription = obs().subscribe(v => {
}, e => {
// this gets hit since the first arg is not an error
});
кто-нибудь имел дело с этим раньше?
Обновление , я немного ближе, в следующем примере вместо процесса используется setInterval.(«сообщение»), чтобы с ним было проще работать:
https://gist.github.com/ORESoftware/ea5d754d6ca6c92ca7e3bddc7eb318c2