Using Node Async Eachlimit With Csv Parse

My co-worker is an expert in backend node javascript and we build a program to parse very large csv files and process each line concurrently. I wanted to share.

Using Node Async Eachlimit With Csv Parse

Add csv-parse:

yarn add csv-parse

Add the node async lib:

yarn add async

Now you can use these two libraries together:

// File: async-csv-parse.js

import { parse } from 'csv-parse'
import eachLimit from 'async/eachLimit.js'
import fs from 'fs'

const file = process.argv[2]
const DEFAULT_CONCURRENCY = 25;
const concurrency = parseInt(process.argv[3] || DEFAULT_CONCURRENCY);

console.log('Parsing csv file ', file)

const parseCsvAsync = async (file, concurrency, doWork) => {
  const parser = parse()
  const stream = fs.createReadStream(file).pipe(parser)
  const finalize = () => {
    console.log('all done with async parsing');
  };
  eachLimit(stream, concurrency, async (row) => {
    const record = { uuid: row[1], amount: row[2], concurrency: row[3] }
    doWork(record)
  },finalize);
}

const doHeavyOperationOnRecord = async (record) => {
  // TODO: Your heavy operation code goes here 
  console.log('HEAVY OP', record)
}

parseCsvAsync(file, concurrency, doHeavyOperationOnRecord)

With this you can run:

yarn async-csv-parse.js transactions-small.csv 30

The above JavaScript code will parse a CSV (Comma-Separated Values) file asynchronously and perform a heavy operation on each row of the CSV file using a specified concurrency level. Here’s how it works step by step:

  1. Import necessary libraries:

    • parse from csv-parse: This library is used for parsing CSV data. It
    • eachLimit from async: This library provides an asynchronous iterator that processes elements in parallel with a specified concurrency limit.
    • fs from ‘fs’: The Node.js File System module is used for reading the CSV file and creating a read stream.
  2. Pass the CSV file path as a command line arguments:

    • const file = process.argv[2]:
  3. The parseCsvAsync function:

    • async (file, concurrency, doWork): This function takes three parameters: the file path to the CSV file, the concurrency level (maximum number of concurrent operations), and a function doWork that will perform heavy operations on each CSV row.
  4. Create a parser and stream for the CSV file:

    • const parser = parse(): Creates a CSV parser.
    • const stream = fs.createReadStream(file).pipe(parser): Reads the CSV file as a stream and pipes it into the parser.
  5. Define a finalize function to pass to the Async lib’s eachLimit:

    • const finalize = () => { ... }: This function will be called when all asynchronous parsing and heavy operations are completed.
  6. Use the eachLimit function to process CSV rows in parallel:

    • eachLimit(stream, concurrency, async (row) => { ... }, finalize): It iterates through the rows in the CSV file using a maximum concurrency of concurrency.
    • For each row, it creates a record object with specific fields extracted from the row.
    • The doWork function is called for each record, which is responsible for performing the heavy operation on the record.
  7. Define the doHeavyOperationOnRecord function:

    • async (record): This function represents the heavy operation that needs to be performed on each CSV row.
    • In the code provided, it’s a placeholder and should be replaced with your actual heavy operation logic.
  8. Finally, call parseCsvAsync with the specified parameters:

    • parseCsvAsync(file, concurrency, doHeavyOperationOnRecord): This initiates the CSV parsing process with a concurrency limit and passes the doHeavyOperationOnRecord function as the operation to be performed on each row.

When googling for a solution like this, I couldn’t find it so I hope this helps future noders.