My co-worker is an expert in backend node javascript and we build a program to parse very large csv files and process each line concurrently. I wanted to share.
Add csv-parse:
yarn add csv-parse
Add the node async lib:
yarn add async
Now you can use these two libraries together:
// File: async-csv-parse.js
import { parse } from 'csv-parse'
import eachLimit from 'async/eachLimit.js'
import fs from 'fs'
const file = process.argv[2]
const DEFAULT_CONCURRENCY = 25;
const concurrency = parseInt(process.argv[3] || DEFAULT_CONCURRENCY);
console.log('Parsing csv file ', file)
const parseCsvAsync = async (file, concurrency, doWork) => {
const parser = parse()
const stream = fs.createReadStream(file).pipe(parser)
const finalize = () => {
console.log('all done with async parsing');
};
eachLimit(stream, concurrency, async (row) => {
const record = { uuid: row[1], amount: row[2], concurrency: row[3] }
doWork(record)
},finalize);
}
const doHeavyOperationOnRecord = async (record) => {
// TODO: Your heavy operation code goes here
console.log('HEAVY OP', record)
}
parseCsvAsync(file, concurrency, doHeavyOperationOnRecord)
With this you can run:
yarn async-csv-parse.js transactions-small.csv 30
The above JavaScript code will parse a CSV (Comma-Separated Values) file asynchronously and perform a heavy operation on each row of the CSV file using a specified concurrency level. Here’s how it works step by step:
-
Import necessary libraries:
parse
from csv-parse: This library is used for parsing CSV data. IteachLimit
from async: This library provides an asynchronous iterator that processes elements in parallel with a specified concurrency limit.fs
from ‘fs’: The Node.js File System module is used for reading the CSV file and creating a read stream.
-
Pass the CSV file path as a command line arguments:
const file = process.argv[2]
:
-
The
parseCsvAsync
function:async (file, concurrency, doWork)
: This function takes three parameters: the file path to the CSV file, the concurrency level (maximum number of concurrent operations), and a functiondoWork
that will perform heavy operations on each CSV row.
-
Create a parser and stream for the CSV file:
const parser = parse()
: Creates a CSV parser.const stream = fs.createReadStream(file).pipe(parser)
: Reads the CSV file as a stream and pipes it into the parser.
-
Define a
finalize
function to pass to the Async lib’seachLimit
:const finalize = () => { ... }
: This function will be called when all asynchronous parsing and heavy operations are completed.
-
Use the
eachLimit
function to process CSV rows in parallel:eachLimit(stream, concurrency, async (row) => { ... }, finalize)
: It iterates through the rows in the CSV file using a maximum concurrency ofconcurrency
.- For each row, it creates a
record
object with specific fields extracted from the row. - The
doWork
function is called for eachrecord
, which is responsible for performing the heavy operation on the record.
-
Define the
doHeavyOperationOnRecord
function:async (record)
: This function represents the heavy operation that needs to be performed on each CSV row.- In the code provided, it’s a placeholder and should be replaced with your actual heavy operation logic.
-
Finally, call
parseCsvAsync
with the specified parameters:parseCsvAsync(file, concurrency, doHeavyOperationOnRecord)
: This initiates the CSV parsing process with a concurrency limit and passes thedoHeavyOperationOnRecord
function as the operation to be performed on each row.
When googling for a solution like this, I couldn’t find it so I hope this helps future noders.