У меня есть конечная точка Websocket, на которую я подписываюсь. Я хочу получить эти Данные, а затем оперировать ими.
КОД:
// Simple HTTP POST Request. Works Perfectly. I am Logged In to the API
const authenticationRequest = () => axios.post(authenticationUrl, {
user: username, password
})
.then((response) => response.data)
.catch((error) => console.error(console.error('Error Response', error)));
// WS Request. I need to wait for this to return my Data and then operate on them
const wsRequest = async () => {
// Getting the Auth Token. Working Perfectly.
const reqToken = await authenticationRequest();
// Hitting the ws Endplint. Working Perfectly.
const webSocketRequest = new WebSocket(topicDataUrl);
// Setting the Data for the First Message. Works Perfectly.
const firstMessage = {
token: reqToken,
stats: 2,
sql: "SELECT * FROM cc_payments LIMIT 100",
live: false
};
// Initialising an Empty Array. Works.
let websocketData = [];
// Opening the Endpoint
webSocketRequest.onopen = () => {
// Sending the first Message
webSocketRequest.send(JSON.stringify(firstMessage));
// On Each Message
webSocketRequest.onmessage = (streamEvent) => {
of(streamEvent).pipe(
map(event => JSON.parse(event.data)), // Parse the Data
filter(message => message.type === 'RECORD') // Filter the Data
).subscribe(
message => websocketData.push(message.data.value)// Adding each Value from each message to the Array.
);
};
};
console.log(JSON.stringify(websocketData), 'Websocket DATA'); // Empty Array
return websocketData;
};
Здесь я называю это несколькими строками вниз, но все еще без Результаты. Я получаю пустой массив.
(async function () {
const data = await wsRequest();
console.log(JSON.stringify(data), 'Data'); // Still Empty
}());
Итак, что я делаю не так? Может кто-нибудь, объясните мне проблему? Я имею в виду, я получаю асинхронность вещей, но я жду. Я даже попытался установить время ожидания, но не сработало.
Правильно ли отображается мой поток? Может быть, там есть проблема ??
Итак, действия RX JS являются асинхронными. Итак, мне нужно 2 вещи. - Закройте поток после завершения операции. (Пробовал takeUntil
, takeWhile
, но, очевидно, что-то делал не так) - Подождите, чтобы вернуть фактические данные (WebsocketData
).
ОБНОВЛЕНИЕ:
async function authenticationRequest() {
const AuthenticateWith = await axios.post(authenticationUrl, {
user: username,
password
})
.then(response => response.data)
.catch((error) => console.error('Error:', error));
return AuthenticateWith;
}
const webSocketRequest = new WebSocket(topicDataUrl);
const websocketData = new Array;
const subject = new Subject();
async function requestToWSEndpoint() {
const reqToken = await authenticationRequest();
const firstMessage = {
token: reqToken,
stats: 2,
sql: "SELECT * FROM cc_payments LIMIT 100",
live: false
};
webSocketRequest.onopen = () => {
webSocketRequest.send(JSON.stringify(firstMessage));
webSocketRequest.onmessage = (streamEvent) => {
JSON.parse(streamEvent.data).type === 'RECORD' && websocketData.push(JSON.parse(streamEvent.data).data.value);
subject.next(websocketData);
JSON.parse(streamEvent.data).type === 'END' && subject.complete();
};
};
};
(async function () {
requestToWSEndpoint();
const chartData = subject.subscribe((event) => console.log(event, 'Event')); // Event Adds to the Array and at the End I have all my Items(Filtered). It printed 100 Times.
console.log('ARRAY', chartData); // This returns [Subscriber {closed: false, _parentOrParents: null, _subscriptions: Array(1), syncErrorValue: null, syncErrorThrown: false, …}]. This is what I want. The Array.
}());