Home » Как использовать многопоточность в Node.js
Как использовать многопоточность в Node.js

Как использовать многопоточность в Node.js

Если ты админ сервера под нагрузкой, то знаешь, что в определенный момент твой Node.js-сервис начинает кашлять. Однопоточность — это боль, когда у тебя тысячи запросов в минуту. Хорошая новость: Node.js давно уже не живёт в каменном веке, и многопоточность там работает вполне достойно. Эта статья покажет, как правильно использовать Worker Threads, настроить кластеризацию и не наступить на те же грабли, что и тысячи других разработчиков. Мы разберём практические примеры, покажем реальные команды и объясним, почему иногда лучше взять VPS с большим количеством ядер, чем пытаться оптимизировать код.

Как это работает: архитектура многопоточности в Node.js

Node.js использует event loop в основном потоке, но это не значит, что он не может использовать другие потоки. Есть несколько способов добиться многопоточности:

  • Worker Threads — встроенный модуль для создания отдельных потоков
  • Cluster Module — форкает процессы, а не потоки
  • Child Process — запускает отдельные процессы
  • Libuv Thread Pool — используется автоматически для I/O операций

Основное отличие Worker Threads от других решений — они делят память, что делает их более эффективными для CPU-интенсивных задач.

Настройка Worker Threads: пошаговое руководство

Начнём с простого примера. Создаём основной файл main.js:

const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

if (isMainThread) {
    // Основной поток
    console.log('Создаём воркер...');
    
    const worker = new Worker(__filename, {
        workerData: { num: 42 }
    });
    
    worker.on('message', (result) => {
        console.log('Результат:', result);
    });
    
    worker.on('error', (error) => {
        console.error('Ошибка воркера:', error);
    });
    
    worker.on('exit', (code) => {
        console.log(`Воркер завершился с кодом ${code}`);
    });
    
} else {
    // Код воркера
    const { num } = workerData;
    
    // Имитация тяжёлой задачи
    function heavyTask(n) {
        let result = 0;
        for (let i = 0; i < n * 1000000; i++) {
            result += Math.sqrt(i);
        }
        return result;
    }
    
    const result = heavyTask(num);
    parentPort.postMessage(result);
}

Запускаем и проверяем:

node main.js

Практические примеры: где и как использовать

Разберём несколько кейсов, где многопоточность реально помогает:

Кейс 1: Обработка изображений

Создаём файл image-processor.js:

const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const path = require('path');

if (isMainThread) {
    const express = require('express');
    const app = express();
    
    app.use(express.json());
    
    app.post('/process-image', async (req, res) => {
        const { imagePath, operation } = req.body;
        
        try {
            const worker = new Worker(__filename, {
                workerData: { imagePath, operation }
            });
            
            worker.on('message', (result) => {
                res.json({ success: true, result });
            });
            
            worker.on('error', (error) => {
                res.status(500).json({ error: error.message });
            });
            
        } catch (error) {
            res.status(500).json({ error: error.message });
        }
    });
    
    app.listen(3000, () => {
        console.log('Сервер запущен на порту 3000');
    });
    
} else {
    const { imagePath, operation } = workerData;
    
    // Здесь была бы реальная обработка изображения
    function processImage(path, op) {
        // Имитация обработки
        const operations = {
            'resize': `Изменение размера ${path}`,
            'filter': `Применение фильтра к ${path}`,
            'compress': `Сжатие ${path}`
        };
        
        // Имитация времени обработки
        const delay = Math.random() * 2000 + 1000;
        setTimeout(() => {
            parentPort.postMessage(operations[op] || 'Неизвестная операция');
        }, delay);
    }
    
    processImage(imagePath, operation);
}

Кейс 2: Пул воркеров для высоконагруженных задач

Создаём файл worker-pool.js:

const { Worker } = require('worker_threads');
const os = require('os');

class WorkerPool {
    constructor(workerScript, poolSize = os.cpus().length) {
        this.workerScript = workerScript;
        this.poolSize = poolSize;
        this.workers = [];
        this.queue = [];
        this.init();
    }
    
    init() {
        for (let i = 0; i < this.poolSize; i++) {
            this.createWorker();
        }
    }
    
    createWorker() {
        const worker = new Worker(this.workerScript);
        worker.isIdle = true;
        
        worker.on('message', (result) => {
            const { resolve } = worker.currentTask;
            resolve(result);
            worker.isIdle = true;
            this.processQueue();
        });
        
        worker.on('error', (error) => {
            const { reject } = worker.currentTask;
            reject(error);
            worker.isIdle = true;
            this.processQueue();
        });
        
        this.workers.push(worker);
    }
    
    execute(data) {
        return new Promise((resolve, reject) => {
            const task = { data, resolve, reject };
            
            const idleWorker = this.workers.find(w => w.isIdle);
            if (idleWorker) {
                this.assignTask(idleWorker, task);
            } else {
                this.queue.push(task);
            }
        });
    }
    
    assignTask(worker, task) {
        worker.isIdle = false;
        worker.currentTask = task;
        worker.postMessage(task.data);
    }
    
    processQueue() {
        if (this.queue.length === 0) return;
        
        const idleWorker = this.workers.find(w => w.isIdle);
        if (idleWorker) {
            const task = this.queue.shift();
            this.assignTask(idleWorker, task);
        }
    }
    
    terminate() {
        this.workers.forEach(worker => worker.terminate());
    }
}

module.exports = WorkerPool;

И воркер-скрипт heavy-task-worker.js:

const { parentPort } = require('worker_threads');

parentPort.on('message', (data) => {
    const { operation, payload } = data;
    
    let result;
    switch (operation) {
        case 'fibonacci':
            result = fibonacci(payload);
            break;
        case 'prime':
            result = isPrime(payload);
            break;
        case 'hash':
            result = computeHash(payload);
            break;
        default:
            result = 'Unknown operation';
    }
    
    parentPort.postMessage(result);
});

function fibonacci(n) {
    if (n < 2) return n;
    return fibonacci(n - 1) + fibonacci(n - 2);
}

function isPrime(num) {
    if (num <= 1) return false;
    if (num <= 3) return true;
    if (num % 2 === 0 || num % 3 === 0) return false;
    
    for (let i = 5; i * i <= num; i += 6) {
        if (num % i === 0 || num % (i + 2) === 0) return false;
    }
    return true;
}

function computeHash(str) {
    let hash = 0;
    for (let i = 0; i < str.length; i++) {
        const char = str.charCodeAt(i);
        hash = ((hash << 5) - hash) + char;
        hash = hash & hash; // Convert to 32-bit integer
    }
    return hash;
}

Cluster Module: форкинг процессов

Cluster Module создаёт отдельные процессы, а не потоки. Это полезно для масштабирования HTTP-серверов:

const cluster = require('cluster');
const os = require('os');
const express = require('express');

if (cluster.isMaster) {
    const numCPUs = os.cpus().length;
    
    console.log(`Мастер-процесс ${process.pid} запущен`);
    console.log(`Создаём ${numCPUs} воркеров`);
    
    // Создаём воркеры
    for (let i = 0; i < numCPUs; i++) {
        cluster.fork();
    }
    
    cluster.on('exit', (worker, code, signal) => {
        console.log(`Воркер ${worker.process.pid} завершился`);
        console.log('Создаём нового воркера');
        cluster.fork();
    });
    
} else {
    const app = express();
    
    app.get('/', (req, res) => {
        res.json({
            message: 'Привет из воркера!',
            pid: process.pid,
            worker: cluster.worker.id
        });
    });
    
    app.get('/heavy', (req, res) => {
        // Имитация тяжёлой задачи
        const start = Date.now();
        let counter = 0;
        while (Date.now() - start < 5000) {
            counter++;
        }
        
        res.json({
            result: counter,
            pid: process.pid,
            worker: cluster.worker.id
        });
    });
    
    const server = app.listen(3000, () => {
        console.log(`Воркер ${process.pid} слушает порт 3000`);
    });
    
    process.on('SIGTERM', () => {
        console.log(`Воркер ${process.pid} получил SIGTERM`);
        server.close(() => {
            process.exit(0);
        });
    });
}

Сравнение подходов: таблица выбора

Подход Использование памяти Скорость создания Общение между потоками Лучший случай использования
Worker Threads Низкое (общая память) Быстрое SharedArrayBuffer, postMessage CPU-интенсивные задачи
Cluster Module Высокое (отдельные процессы) Медленное IPC, TCP/UDP Масштабирование HTTP-серверов
Child Process Высокое Медленное stdio, IPC Запуск внешних программ
Libuv Thread Pool Низкое Автоматически Callback I/O операции (автоматически)

Мониторинг и отладка

Для мониторинга многопоточных приложений используй эти инструменты:

# Установка пакетов для мониторинга
npm install --save-dev clinic autocannon

# Профилирование приложения
npx clinic doctor -- node app.js

# Нагрузочное тестирование
npx autocannon -c 100 -d 30s http://localhost:3000

# Мониторинг процессов
htop
# или
top -p $(pgrep -f "node")

Также полезно добавить метрики прямо в код:

const { performance } = require('perf_hooks');

function measureWorkerPerformance() {
    const start = performance.now();
    
    // Твоя логика
    
    const end = performance.now();
    console.log(`Выполнение заняло ${end - start} миллисекунд`);
}

// Мониторинг памяти
setInterval(() => {
    const memUsage = process.memoryUsage();
    console.log('Использование памяти:', {
        rss: Math.round(memUsage.rss / 1024 / 1024) + 'MB',
        heapUsed: Math.round(memUsage.heapUsed / 1024 / 1024) + 'MB',
        heapTotal: Math.round(memUsage.heapTotal / 1024 / 1024) + 'MB'
    });
}, 10000);

Типичные ошибки и как их избежать

Ошибка 1: Блокировка основного потока

Плохо:

// Не делай так!
app.get('/bad', (req, res) => {
    let result = 0;
    for (let i = 0; i < 10000000; i++) {
        result += Math.sqrt(i);
    }
    res.json({ result });
});

Хорошо:

app.get('/good', async (req, res) => {
    try {
        const result = await workerPool.execute({
            operation: 'heavy-calculation',
            iterations: 10000000
        });
        res.json({ result });
    } catch (error) {
        res.status(500).json({ error: error.message });
    }
});

Ошибка 2: Создание слишком многих воркеров

Не создавай воркеры на каждый запрос. Используй пул:

// Инициализация пула при старте приложения
const workerPool = new WorkerPool('./heavy-task-worker.js', os.cpus().length);

// Graceful shutdown
process.on('SIGTERM', () => {
    workerPool.terminate();
    process.exit(0);
});

Интеграция с другими инструментами

Многопоточность отлично работает с популярными пакетами:

  • Redis — для кэширования результатов между воркерами
  • PM2 — для управления кластерами в продакшене
  • Bull — для очередей задач
  • Prometheus — для мониторинга метрик

Пример интеграции с Redis:

const redis = require('redis');
const client = redis.createClient();

// В основном потоке
app.get('/cached-heavy-task/:id', async (req, res) => {
    const { id } = req.params;
    const cacheKey = `heavy-task:${id}`;
    
    try {
        // Проверяем кэш
        const cached = await client.get(cacheKey);
        if (cached) {
            return res.json({ result: JSON.parse(cached), cached: true });
        }
        
        // Выполняем в воркере
        const result = await workerPool.execute({
            operation: 'heavy-task',
            id: id
        });
        
        // Кэшируем результат
        await client.setex(cacheKey, 3600, JSON.stringify(result));
        
        res.json({ result, cached: false });
    } catch (error) {
        res.status(500).json({ error: error.message });
    }
});

Настройка продакшен-окружения

Для продакшена рекомендую использовать PM2 с конфигурацией:

# ecosystem.config.js
module.exports = {
    apps: [{
        name: 'my-app',
        script: './app.js',
        instances: 'max',
        exec_mode: 'cluster',
        env: {
            NODE_ENV: 'production',
            PORT: 3000
        },
        error_file: './logs/err.log',
        out_file: './logs/out.log',
        log_file: './logs/combined.log',
        max_memory_restart: '1G'
    }]
};

Запуск:

# Установка PM2
npm install -g pm2

# Запуск приложения
pm2 start ecosystem.config.js

# Мониторинг
pm2 monit

# Логи
pm2 logs

# Рестарт
pm2 restart my-app

Если планируешь серьёзную нагрузку, стоит рассмотреть выделенный сервер с достаточным количеством ядер.

Автоматизация и скрипты

Многопоточность открывает новые возможности для автоматизации:

Параллельная обработка файлов

const fs = require('fs').promises;
const path = require('path');

async function processFilesInParallel(directory) {
    const files = await fs.readdir(directory);
    
    const chunks = [];
    const chunkSize = Math.ceil(files.length / os.cpus().length);
    
    for (let i = 0; i < files.length; i += chunkSize) {
        chunks.push(files.slice(i, i + chunkSize));
    }
    
    const workers = chunks.map(chunk => {
        return workerPool.execute({
            operation: 'process-files',
            files: chunk.map(f => path.join(directory, f))
        });
    });
    
    const results = await Promise.all(workers);
    return results.flat();
}

Скрипт для бэкапа с распараллеливанием

#!/usr/bin/env node

const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const path = require('path');
const fs = require('fs').promises;

if (isMainThread) {
    async function backup() {
        const directories = [
            '/var/www/site1',
            '/var/www/site2',
            '/var/www/site3'
        ];
        
        const workers = directories.map(dir => {
            return new Promise((resolve, reject) => {
                const worker = new Worker(__filename, {
                    workerData: { directory: dir }
                });
                
                worker.on('message', resolve);
                worker.on('error', reject);
            });
        });
        
        try {
            const results = await Promise.all(workers);
            console.log('Бэкап завершён:', results);
        } catch (error) {
            console.error('Ошибка бэкапа:', error);
        }
    }
    
    backup();
    
} else {
    const { directory } = workerData;
    
    async function createBackup(dir) {
        const backupName = `backup-${path.basename(dir)}-${Date.now()}.tar.gz`;
        const { exec } = require('child_process');
        
        return new Promise((resolve, reject) => {
            exec(`tar -czf /backups/${backupName} ${dir}`, (error, stdout, stderr) => {
                if (error) {
                    reject(error);
                } else {
                    resolve({ directory: dir, backup: backupName });
                }
            });
        });
    }
    
    createBackup(directory)
        .then(result => parentPort.postMessage(result))
        .catch(error => parentPort.postMessage({ error: error.message }));
}

Интересные факты и нестандартные применения

  • SharedArrayBuffer — позволяет воркерам работать с общей памятью, что открывает возможности для реализации низкоуровневых алгоритмов
  • Atomics — для синхронизации между потоками без блокировок
  • Transferable Objects — можно передавать ArrayBuffer между потоками без копирования

Пример с SharedArrayBuffer:

// Создание общей памяти
const sharedBuffer = new SharedArrayBuffer(1024);
const sharedArray = new Int32Array(sharedBuffer);

// В основном потоке
const worker = new Worker('./shared-worker.js', {
    transferList: [sharedBuffer]
});

// shared-worker.js
const { parentPort } = require('worker_threads');

parentPort.on('message', (sharedBuffer) => {
    const sharedArray = new Int32Array(sharedBuffer);
    
    // Атомарные операции
    Atomics.add(sharedArray, 0, 1);
    const value = Atomics.load(sharedArray, 0);
    
    parentPort.postMessage({ value });
});

Заключение и рекомендации

Многопоточность в Node.js — это не магия, а инструмент, который нужно использовать с умом. Вот основные рекомендации:

  • Используй Worker Threads для CPU-интенсивных задач — парсинг больших JSON, обработка изображений, сложные вычисления
  • Cluster Module подходит для масштабирования HTTP-серверов — один процесс на ядро
  • Не создавай воркеры на каждый запрос — используй пулы
  • Мониторь потребление памяти — воркеры могут привести к утечкам
  • Graceful shutdown — всегда правильно завершай воркеры

Для high-load проектов это критично. Правильная настройка многопоточности может увеличить производительность в разы, но неправильная — убить сервер. Тестируй нагрузку, мониторь метрики и помни: иногда проще добавить ещё одно ядро на сервере, чем оптимизировать код до смерти.

Если серьёзно подходишь к вопросу производительности, стоит рассмотреть серверы с хорошими характеристиками — многопоточность раскрывается только на многоядерных системах.

Полезные ссылки:


В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.

Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.

Leave a reply

Your email address will not be published. Required fields are marked