RX JS Stream - Как дождаться окончания моего STREAM и вернуть данные, а затем выполнить мои ОПЕРАЦИИ - PullRequest
0 голосов
/ 09 апреля 2020

У меня есть конечная точка Websocket, на которую я подписываюсь. Я хочу получить эти Данные, а затем оперировать ими.

КОД:

// Simple HTTP POST Request. Works Perfectly. I am Logged In to the API
const authenticationRequest = () => axios.post(authenticationUrl, {
  user: username, password
})
  .then((response) => response.data)
  .catch((error) => console.error(console.error('Error Response', error)));

// WS Request. I need to wait for this to return my Data and then operate on them
const wsRequest = async () => {
  // Getting the Auth Token. Working Perfectly.
  const reqToken = await authenticationRequest();

  // Hitting the ws Endplint. Working Perfectly.
  const webSocketRequest = new WebSocket(topicDataUrl);

  // Setting the Data for the First Message. Works Perfectly.
  const firstMessage = {
    token: reqToken,
    stats: 2,
    sql: "SELECT * FROM cc_payments LIMIT 100",
    live: false
  };

  // Initialising an Empty Array. Works.
  let websocketData = [];

  // Opening the Endpoint
  webSocketRequest.onopen = () => {

    // Sending the first Message
    webSocketRequest.send(JSON.stringify(firstMessage));

    // On Each Message
    webSocketRequest.onmessage = (streamEvent) => {
      of(streamEvent).pipe(
        map(event => JSON.parse(event.data)), // Parse the Data
        filter(message => message.type === 'RECORD') // Filter the Data
      ).subscribe(
        message => websocketData.push(message.data.value)// Adding each Value from each message to the Array.
      );
    };
  };
  console.log(JSON.stringify(websocketData), 'Websocket DATA'); // Empty Array
  return websocketData;
};

Здесь я называю это несколькими строками вниз, но все еще без Результаты. Я получаю пустой массив.

(async function () {
  const data = await wsRequest();
  console.log(JSON.stringify(data), 'Data');  // Still Empty
}());

Итак, что я делаю не так? Может кто-нибудь, объясните мне проблему? Я имею в виду, я получаю асинхронность вещей, но я жду. Я даже попытался установить время ожидания, но не сработало.

Правильно ли отображается мой поток? Может быть, там есть проблема ??

Итак, действия RX JS являются асинхронными. Итак, мне нужно 2 вещи. - Закройте поток после завершения операции. (Пробовал takeUntil, takeWhile, но, очевидно, что-то делал не так) - Подождите, чтобы вернуть фактические данные (WebsocketData).

ОБНОВЛЕНИЕ:

async function authenticationRequest() {
  const AuthenticateWith = await axios.post(authenticationUrl, {
    user: username,
    password
  })
    .then(response => response.data)
    .catch((error) => console.error('Error:', error));

  return AuthenticateWith;
}
const webSocketRequest = new WebSocket(topicDataUrl);
const websocketData = new Array;
const subject = new Subject();

async function requestToWSEndpoint() {
  const reqToken = await authenticationRequest();

  const firstMessage = {
    token: reqToken,
    stats: 2,
    sql: "SELECT * FROM cc_payments LIMIT 100",
    live: false
  };

  webSocketRequest.onopen = () => {
    webSocketRequest.send(JSON.stringify(firstMessage));

    webSocketRequest.onmessage = (streamEvent) => {
      JSON.parse(streamEvent.data).type === 'RECORD' && websocketData.push(JSON.parse(streamEvent.data).data.value);
      subject.next(websocketData);
      JSON.parse(streamEvent.data).type === 'END' && subject.complete();
    };
  };
};

(async function () {
  requestToWSEndpoint();

  const chartData = subject.subscribe((event) => console.log(event, 'Event')); // Event Adds to the Array and at the End I have all my Items(Filtered). It printed 100 Times.
  console.log('ARRAY', chartData); // This returns [Subscriber {closed: false, _parentOrParents: null, _subscriptions: Array(1), syncErrorValue: null, syncErrorThrown: false, …}]. This is what I want. The Array.
}());

1 Ответ

1 голос
/ 09 апреля 2020

Мое предложение, изложенное в моем комментарии:

const subject = new Subject();
...
}
(async function () {
   wsRequest();
   subject.pipe(finalize(()=> {console.log('ARRAY', websocketData);})).subscribe();
 }());

На самом деле вам даже не нужна функция для того, что вы делаете в wsRequest, за исключением const reqToken = await authenticationRequest();. Остальные могут быть легко распределены по всему миру.

Вы должны взглянуть на документацию для rxjs operators.

...