Я звоню Кафке, используя доверенный прокси REST API.Я читаю CSV-файл, создаю объект из всех имеющихся там записей (около 4 миллионов записей) и отправляю запрос в REST-прокси.Я продолжаю получать OutOfMemory
исключение.
Точное сообщение об исключении:
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-81"
У меня только один экземпляр прокси-сервера REST, размещенныйв качестве док-контейнера.Переменная среды имеет значение:
JAVA_OPTIONS=-Xmx1g
Другие конфиги:
CPU - 1
Memory - 1024
Обрабатывает около 1 000 000 до сбоя.Я попытался увеличить его до 4 экземпляров, увеличив процессор до 3 и память до 2046 МБ.Затем он обрабатывает около 5 000 000 записей.
После прочтения CSV я вызываю конечную точку Kafka в пакетах по 5 000 записей.Это написано в узле.Вот код узла
fs.createReadStream(inputFile)
.pipe(parser({skip_lines_with_error: true}))
.on('data', (records) => {
country.push({ 'value' : {
country: records[0],
capital: records[1]
}
});
if (country.length > 5000) {
batch++;
callKafkaProxy(country).then((rec) => {
console.log(`'Batch done!'`);
}).catch((reason) => {
console.log(reason);
});
country = [];
}
})
.on('end', () => {
console.log('All done!');
});
function callKafkaProxy(records) {
const urlAndRequestOptions = {
url: 'http://kafka-rest-proxy.com/topics/test-topic',
headers: {
'content-type' : 'application/vnd.kafka.json.v2+json',
'Accept' : 'application/vnd.kafka.v2+json'
}
};
let recordsObject = {records: records};
//request here is a wrapper on the http package.
return request.post(urlAndRequestOptions, recordsObject);
Я чувствую, что мне не хватает некоторых конфигураций, которые должны помочь решить эту проблему без увеличения числа экземпляров> 1.
Любая помощь будет оценена.