Используйте API Fetch Streams для асинхронного потребления фрагментированных данных без использования рекурсии - PullRequest
0 голосов
/ 12 октября 2018

Я использую JavaScript fetch streams API для асинхронного потребления фрагментированного JSON, как в этот ответ .

Мое приложение может получать до 25небольшие объекты JSON в секунду (по одному на каждый кадр в видео) в течение часа.

Когда входящие блоки большие (1000+ объектов JSON на блок), мой код работает хорошо - быстро, минимальноиспользование памяти - он может легко получать 1 000 000 объектов JSON.

Когда входящие порции меньше (5 объектов JSON на порцию), мой код работает плохо - медленно, много потребления памяти.Браузер умирает примерно с 50 000 объектов JSON.

После большой отладки в инструментах разработчика, похоже, проблема заключается в рекурсивной природе кода.

Я пытался удалитьрекурсия, но кажется необходимой, потому что API зависит от моего кода, возвращающего обещание цепочки?!

Как удалить эту рекурсию, или я должен использовать что-то отличное от fetch?


Код с рекурсией (работает)

String.prototype.replaceAll = function(search, replacement) {
    var target = this;
    return target.replace(new RegExp(search, 'g'), replacement);
};

results = []

fetch('http://localhost:9999/').then(response => {
    const reader = response.body.getReader();
    td = new TextDecoder("utf-8");
    buffer = "";

    reader.read().then(function processText({ done, value }) {
        if (done) {
          console.log("Stream done.");
          return;
        }

        try {
            decoded = td.decode(value);
            buffer += decoded;
            if (decoded.length != 65536){
                toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
                result = JSON.parse(toParse);
                results.push(...result);
                console.log("Received " + results.length.toString() + " objects")
                buffer = "";
            }
        }
        catch(e){
            // Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
            //console.log("EXCEPTION:"+e);
        }

        return reader.read().then(processText);
    })
});

Код без рекурсии (не работает)

String.prototype.replaceAll = function(search, replacement) {
    var target = this;
    return target.replace(new RegExp(search, 'g'), replacement);
};

results = []
finished = false

fetch('http://localhost:9999/').then(response => {
    const reader = response.body.getReader();
    td = new TextDecoder("utf-8");
    buffer = "";
    lastResultSize = -1

    while (!finished)
        if (lastResultSize < results.length)
        {
            lastResultSize = results.length;
            reader.read().then(function processText({ done, value }) {

                if (done) {
                  console.log("Stream done.");
                  finished = true;
                  return;
                }
                else
                    try {
                        decoded = td.decode(value);
                        //console.log("Received chunk " + decoded.length.toString() + " in length");
                        buffer += decoded;
                        if (decoded.length != 65536){
                            toParse = "["+buffer.trim().replaceAll("\n",",")+"]";
                            result = JSON.parse(toParse);
                            results.push(...result);
                            console.log("Received " + results.length.toString() + " objects")
                            buffer = "";
                            //console.log("Parsed chunk " + toParse.length.toString() + " in length");
                        }
                    }
                    catch(e) {
                        // Doesn't need to be reported, because partial JSON result will be parsed next time around (from buffer).
                        //console.log("EXCEPTION:"+e);
                    }
            })
        }
});

Для полноты вот код Python, который я использую на тестовом сервере.Обратите внимание на строку, содержащую sleep , которая изменяет поведение чанкинга:

import io
import urllib
import inspect
from http.server import HTTPServer,BaseHTTPRequestHandler
from time import sleep


class TestServer(BaseHTTPRequestHandler):

    def do_GET(self):
        args = urllib.parse.parse_qs(self.path[2:])
        args = {i:args[i][0] for i in args}
        response = ''

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.send_header('Access-Control-Allow-Origin', '*')
        self.send_header('Transfer-Encoding', 'chunked')
        self.end_headers()

        for i in range (1000000):
            self.wfile.write(bytes(f'{{"x":{i}, "text":"fred!"}}\n','utf-8'))
            sleep(0.001)  # Comment this out for bigger chunks sent to the client!

def main(server_port:"Port to serve on."=9999,server_address:"Local server name."=''):
    httpd = HTTPServer((server_address, server_port), TestServer)
    print(f'Serving on http://{httpd.server_name}:{httpd.server_port} ...')
    httpd.serve_forever()


if __name__ == '__main__':
    main()

1 Ответ

0 голосов
/ 15 октября 2018

Часть, которую вы пропустили, состоит в том, что функция, переданная в .then(), всегда вызывается асинхронно, то есть с пустым стеком.Так что здесь нет настоящей рекурсии.Именно поэтому ваша версия «без рекурсии» не работает.

Простое решение - использовать асинхронные функции и оператор await.Если вы вызываете read () следующим образом:

const {value, done} = await reader.read();

... тогда вы можете вызвать его в цикле, и он будет работать так, как вы ожидаете.

Я не знаю конкретногде ваша утечка памяти, но использование глобальных переменных выглядит как проблема.Я рекомендую вам всегда ставить 'use strict'; в начале вашего кода, чтобы компилятор поймал эти проблемы за вас.Затем используйте let или const всякий раз, когда вы объявляете переменную.

Я рекомендую использовать TextDecoderStream , чтобы избежать проблем при разделении символа между несколькими частями.У вас также будут проблемы, когда объект JSON будет разбит на несколько фрагментов.

См. Добавление демоверсии дочернего записываемого потока , чтобы узнать, как это сделать безопасно (но учтите, что вам нужен TextDecoderStream там, где эта демонстрация имеет"TextDecoder").

Обратите внимание также на использование WritableStream в этой демонстрации.Firefox пока не поддерживает AFAIK, но WritableStream предоставляет гораздо более простой синтаксис для использования кусков без явного зацикливания или рекурсии.Вы можете найти веб-потоки polyfill здесь .

...