Back to Blog
API & Developer Guides

Building a Lead Enrichment Pipeline with Node.js

Published February 4, 2026

Why Build an Enrichment Pipeline?

When you have a list of business websites but need their email addresses, a simple loop might work for 10 leads. But when you are processing hundreds or thousands, you need a proper pipeline with concurrency control, error handling, and retry logic. In this guide, we will build one using Node.js and the Easy Email Finder API.

Project Setup

mkdir lead-pipeline && cd lead-pipeline
npm init -y
npm install p-queue csv-parse csv-stringify dotenv

Create a .env file:

EEF_API_KEY=eef_live_your_api_key_here

The Pipeline Architecture

Our pipeline reads a CSV of business websites, processes them through a rate-limited queue using the /enrich-batch endpoint, and writes enriched results to an output CSV. The key components are:

  • CSV Reader: Streams input data to avoid loading everything into memory
  • Batch Processor: Groups websites into batches of 20 for the /enrich-batch endpoint
  • Rate-Limited Queue: Uses p-queue to control concurrency and respect API limits
  • Retry Handler: Automatically retries failed batches with exponential backoff
  • CSV Writer: Streams enriched results to the output file

Full Implementation

import 'dotenv/config';
import { createReadStream, createWriteStream } from 'fs';
import { parse } from 'csv-parse';
import { stringify } from 'csv-stringify';
import PQueue from 'p-queue';

const API_KEY = process.env.EEF_API_KEY;
const BASE = "https://easyemailfinder.com/api/v1";
const BATCH_SIZE = 20;
const MAX_RETRIES = 3;

// Rate-limited queue: 1 concurrent request, 1 second between
const queue = new PQueue({ concurrency: 1, interval: 1000, intervalCap: 1 });

interface Lead {
  website: string;
  name?: string;
  email?: string;
  phone?: string;
  address?: string;
  [key: string]: string | undefined;
}

async function enrichBatch(websites: string[], retries = 0): Promise<Lead[]> {
  try {
    const resp = await fetch(`${BASE}/enrich-batch`, {
      method: "POST",
      headers: {
        "Authorization": `Bearer ${API_KEY}`,
        "Content-Type": "application/json"
      },
      body: JSON.stringify({ websites })
    });

    if (resp.status === 429) {
      const retryAfter = parseInt(resp.headers.get("Retry-After") || "60");
      console.log(`Rate limited. Waiting ${retryAfter}s...`);
      await new Promise(r => setTimeout(r, retryAfter * 1000));
      return enrichBatch(websites, retries);
    }

    if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
    const data = await resp.json();
    return data.results || [];
  } catch (err) {
    if (retries < MAX_RETRIES) {
      const delay = Math.pow(2, retries) * 1000;
      console.log(`Retry ${retries + 1}/${MAX_RETRIES} in ${delay}ms...`);
      await new Promise(r => setTimeout(r, delay));
      return enrichBatch(websites, retries + 1);
    }
    console.error(`Failed after ${MAX_RETRIES} retries:`, err);
    return [];
  }
}

async function runPipeline(inputFile: string, outputFile: string) {
  console.log(`Reading from ${inputFile}...`);

  // Read all websites from CSV
  const websites: string[] = [];
  const parser = createReadStream(inputFile).pipe(
    parse({ columns: true, trim: true })
  );

  for await (const row of parser) {
    if (row.website) websites.push(row.website);
  }

  console.log(`Loaded ${websites.length} websites`);

  // Check balance first
  const balanceResp = await fetch(`${BASE}/balance`, {
    headers: { "Authorization": `Bearer ${API_KEY}` }
  });
  const { credits } = await balanceResp.json();
  console.log(`Credits available: ${credits}`);
  console.log(`Credits needed: ~${websites.length}`);

  if (credits < websites.length) {
    console.warn(`Warning: may not have enough credits for all websites`);
  }

  // Set up output CSV
  const stringifier = stringify({ header: true });
  const output = createWriteStream(outputFile);
  stringifier.pipe(output);

  let processed = 0;
  let found = 0;

  // Process in batches of 20
  const batches: string[][] = [];
  for (let i = 0; i < websites.length; i += BATCH_SIZE) {
    batches.push(websites.slice(i, i + BATCH_SIZE));
  }

  for (const batch of batches) {
    await queue.add(async () => {
      const results = await enrichBatch(batch);
      for (const result of results) {
        stringifier.write({
          website: result.website || "",
          email: result.email || "",
          name: result.name || "",
          phone: result.phone || "",
          address: result.address || ""
        });
        if (result.email) found++;
      }
      processed += batch.length;
      console.log(`Progress: ${processed}/${websites.length} (${found} emails found)`);
    });
  }

  await queue.onIdle();
  stringifier.end();
  console.log(`\nDone! ${found} emails saved to ${outputFile}`);
}

runPipeline("input_leads.csv", "enriched_leads.csv");

Input CSV Format

Your input CSV needs a website column. Additional columns are preserved but not required:

website,name,city
https://example-plumbing.com,Example Plumbing,Austin
https://bestdentist.com,Best Dentist,Dallas

Performance Characteristics

With the default settings (1 request per second, batches of 20), this pipeline processes approximately 1,200 websites per hour. For most use cases, this is plenty fast. If you need higher throughput, the Easy Email Finder API supports up to 60 requests per minute on the standard rate limit. Refer to our rate limits guide for details.

Extending the Pipeline

This pipeline is designed to be composable. You can plug in additional processing steps like pushing results to a CRM, sending Slack notifications, or scoring leads based on quality signals. For CRM integration patterns, see our CRM integration guide. For the complete API reference, visit the developer documentation.

Ready to find business emails?

Try Easy Email Finder free — get 5 credits to start.

Start Finding Emails

Related Posts