我的节点面临内存泄漏问题。js代码。我试图流式读取一个有100k行的CSV(链接中的示例文件)文件,并处理文件中的每个条目。进程在一段时间后发生内存分配错误。
"致命错误:堆限制附近无效的标记压缩分配失败-JavaScript堆内存溢出"
示例csv:
我的代码示例
const fs = require('fs');
const config = require('../config/config');
const csv = require('csv-parser');
const tls = require('../services/tls');
processCSV('crawler', 'sample-csv.csv');
流进程csv文件与100k条目
async function processCSV (jobName, fileName) {
return new Promise((resolve, reject) => {
let filePath = config.api.basePath + fileName;
fs.createReadStream(filePath)
.on('error', () => {
// handle error
console.log('error processing csv');
reject();
})
.pipe(csv())
.on('data', (row) => {
createJob(jobName, row);
})
.on('end', () => {
// handle end of CSV
console.log('Finished processing csv');
resolve(filePath);
})
});
}
验证csv文件中的每个url
async function createJob (name, data) {
let {hostname, port, ip} = data;
let protocol = 'https';
if (port === 80) {
protocol = 'http';
}
let url = protocol + '://' + hostname;
try {
await tls.getHostData(url); // call an external api to get details of hostname
return url;
} catch (error) {
return error;
}
}
我不知道哪个函数导致了内存泄漏。
在我看来,您正在为CSV文件中的每一行调用createJobs()
,您可能会使这些作业中的每一个都同时处于进程和内存中。这可能会耗尽系统资源,特别是在文件中有大量行的情况下。
解决这个问题的一个方法是调整代码,以便只有NcreateJob()
操作同时处于“运行中”。有一种方法可以做到这一点,即当您同时到达最大请求数时暂停流,然后在有更多空间时恢复流:
async function processCSV (jobName, fileName) {
return new Promise((resolve, reject) => {
let filePath = config.api.basePath + fileName;
let numConcurrent = 0;
let paused = false;
const maxConcurrent = 10;
let stream = fs.createReadStream(filePath)
.on('error', (err) => {
// handle error
console.log('error processing csv');
reject(err);
})
.pipe(csv())
.on('data', (row) => {
function checkResume() {
--numConcurrent;
if (paused && numConcurrent < maxConcurrent) {
// restart the stream, there's room for more
paused = false;
stream.resume();
}
}
++numConcurrent;
createJob(jobName, row).then(checkResume, checkResume);
if (numConcurrent >= maxConcurrent) {
// pause the stream because we have max number of operations going
stream.pause();
paused = true;
}
})
.on('end', () => {
// handle end of CSV
console.log('Finished processing csv');
resolve(filePath);
})
});
}
async function createJob (name, data) {
let {hostname, port, ip} = data;
let protocol = 'https';
if (port === 80) {
protocol = 'http';
}
let url = protocol + '://' + hostname;
try {
await tls.getHostData(url); // call an external api to get details of hostname
return url;
} catch (error) {
// make sure returned promise is rejected
throw error;
}
}
注意:如果在处理给定行时出现错误,这个实现(就像您在问题中显示的那样)会继续运行。这种行为可以根据欲望而改变。