Обработка огромных записей дает OutOfMemoryException - Kafka REST прокси - PullRequest
2 голосов
/ 03 апреля 2019

Я звоню Кафке, используя доверенный прокси REST API.Я читаю CSV-файл, создаю объект из всех имеющихся там записей (около 4 миллионов записей) и отправляю запрос в REST-прокси.Я продолжаю получать OutOfMemory исключение.

Точное сообщение об исключении:

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-81"

У меня только один экземпляр прокси-сервера REST, размещенныйв качестве док-контейнера.Переменная среды имеет значение:

JAVA_OPTIONS=-Xmx1g

Другие конфиги:

CPU - 1 Memory - 1024

Обрабатывает около 1 000 000 до сбоя.Я попытался увеличить его до 4 экземпляров, увеличив процессор до 3 и память до 2046 МБ.Затем он обрабатывает около 5 000 000 записей.

После прочтения CSV я вызываю конечную точку Kafka в пакетах по 5 000 записей.Это написано в узле.Вот код узла

fs.createReadStream(inputFile)
  .pipe(parser({skip_lines_with_error: true}))
  .on('data', (records) => {
        country.push({ 'value' : {
            country: records[0],
            capital: records[1]
            }
        });

        if (country.length > 5000) {
            batch++;
            callKafkaProxy(country).then((rec) => {
                console.log(`'Batch done!'`);
            }).catch((reason) => {
                console.log(reason);
            });
            country = [];
        }
    })
    .on('end', () => {
        console.log('All done!');
    });
function callKafkaProxy(records) {
    const urlAndRequestOptions = {
        url: 'http://kafka-rest-proxy.com/topics/test-topic',
        headers: {
            'content-type' : 'application/vnd.kafka.json.v2+json',
            'Accept' : 'application/vnd.kafka.v2+json'
        }
    };
let recordsObject = {records: records};
//request here is a wrapper on the http package. 
return request.post(urlAndRequestOptions, recordsObject);

Я чувствую, что мне не хватает некоторых конфигураций, которые должны помочь решить эту проблему без увеличения числа экземпляров> 1.

Любая помощь будет оценена.

Ответы [ 2 ]

1 голос
/ 03 апреля 2019
.on('data', () => {}); ... 

1.Он не справляется с противодавлением.Создайте доступный для записи поток, который будет обрабатывать ваш пакетный процесс.Затем просто используйте pipe.

inputStream
    .pipe(parser)
    .pipe(kafka)

Затем проанализируйте эти строки:

if (country.length > 5000) {
        batch++;
        callKafkaProxy(country).then((rec) => {
            console.log(`'Batch done!'`);
        ).catch((reason) => {
            console.log(reason);
        });
        country = [];
     }
Ваш callKafkaProxy является асинхронным, поэтому массив вашей страны всегда заполняется, независимо от результата функции callKafkaProxy.Массив страны продолжает заполняться и продолжает делать запросы.Вы можете убедиться, войдя в консоль после пакета ++Вы увидите, что вы инициируете множество запросов, и Kafka будет отвечать намного медленнее, чем вы.

Решение:

  1. Создание потока с возможностью записи.
  2. передает данные от вашего парсера.input.pipe (parser) .pipe (yourJustCreatedKafkaWritableStream)
  3. Позвольте вашему доступному для записи потоку подтолкнуть страны к массиву и обратному вызову, когда вы будете готовы получить другую запись.Когда вы достигнете своего края (если country.length> 5000), тогда сделайте запрос к kafka и дождитесь ответа и только потом отзовитесь.Таким образом, ваш поток будет адаптивным.Вы должны прочитать больше о потоках узлов и их мощности.Но помните, что с большой силой приходит большая ответственность, и в этом случае вы должны тщательно разрабатывать код, чтобы избежать таких утечек памяти.
0 голосов
/ 18 мая 2019

С помощью ответа Зильвинаса я понял, как можно использовать потоки для отправки данных партиями.Вот решение:

var stream = fs.createReadStream(file)
                        .pipe(es.split())
                        .pipe(es.mapSync(function (line) {

                            if (line.length) {
                                //read your line and create a record message
                            }

                            //put 5000 in a config constant
                            if (records.length === 5000) {
                                stream.pause();
                                logger.debug(`Got ${records.length} messages. Pushing to Kafka...`);
                                postChunkToKafka(records).then((response) => {     
                                  records = [];
                                  stream.resume();
                                });
                            }
...