- Home »

Как использовать многопоточность в Node.js
Если ты админ сервера под нагрузкой, то знаешь, что в определенный момент твой Node.js-сервис начинает кашлять. Однопоточность — это боль, когда у тебя тысячи запросов в минуту. Хорошая новость: Node.js давно уже не живёт в каменном веке, и многопоточность там работает вполне достойно. Эта статья покажет, как правильно использовать Worker Threads, настроить кластеризацию и не наступить на те же грабли, что и тысячи других разработчиков. Мы разберём практические примеры, покажем реальные команды и объясним, почему иногда лучше взять VPS с большим количеством ядер, чем пытаться оптимизировать код.
Как это работает: архитектура многопоточности в Node.js
Node.js использует event loop в основном потоке, но это не значит, что он не может использовать другие потоки. Есть несколько способов добиться многопоточности:
- Worker Threads — встроенный модуль для создания отдельных потоков
- Cluster Module — форкает процессы, а не потоки
- Child Process — запускает отдельные процессы
- Libuv Thread Pool — используется автоматически для I/O операций
Основное отличие Worker Threads от других решений — они делят память, что делает их более эффективными для CPU-интенсивных задач.
Настройка Worker Threads: пошаговое руководство
Начнём с простого примера. Создаём основной файл main.js
:
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
if (isMainThread) {
// Основной поток
console.log('Создаём воркер...');
const worker = new Worker(__filename, {
workerData: { num: 42 }
});
worker.on('message', (result) => {
console.log('Результат:', result);
});
worker.on('error', (error) => {
console.error('Ошибка воркера:', error);
});
worker.on('exit', (code) => {
console.log(`Воркер завершился с кодом ${code}`);
});
} else {
// Код воркера
const { num } = workerData;
// Имитация тяжёлой задачи
function heavyTask(n) {
let result = 0;
for (let i = 0; i < n * 1000000; i++) {
result += Math.sqrt(i);
}
return result;
}
const result = heavyTask(num);
parentPort.postMessage(result);
}
Запускаем и проверяем:
node main.js
Практические примеры: где и как использовать
Разберём несколько кейсов, где многопоточность реально помогает:
Кейс 1: Обработка изображений
Создаём файл image-processor.js
:
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const path = require('path');
if (isMainThread) {
const express = require('express');
const app = express();
app.use(express.json());
app.post('/process-image', async (req, res) => {
const { imagePath, operation } = req.body;
try {
const worker = new Worker(__filename, {
workerData: { imagePath, operation }
});
worker.on('message', (result) => {
res.json({ success: true, result });
});
worker.on('error', (error) => {
res.status(500).json({ error: error.message });
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.listen(3000, () => {
console.log('Сервер запущен на порту 3000');
});
} else {
const { imagePath, operation } = workerData;
// Здесь была бы реальная обработка изображения
function processImage(path, op) {
// Имитация обработки
const operations = {
'resize': `Изменение размера ${path}`,
'filter': `Применение фильтра к ${path}`,
'compress': `Сжатие ${path}`
};
// Имитация времени обработки
const delay = Math.random() * 2000 + 1000;
setTimeout(() => {
parentPort.postMessage(operations[op] || 'Неизвестная операция');
}, delay);
}
processImage(imagePath, operation);
}
Кейс 2: Пул воркеров для высоконагруженных задач
Создаём файл worker-pool.js
:
const { Worker } = require('worker_threads');
const os = require('os');
class WorkerPool {
constructor(workerScript, poolSize = os.cpus().length) {
this.workerScript = workerScript;
this.poolSize = poolSize;
this.workers = [];
this.queue = [];
this.init();
}
init() {
for (let i = 0; i < this.poolSize; i++) {
this.createWorker();
}
}
createWorker() {
const worker = new Worker(this.workerScript);
worker.isIdle = true;
worker.on('message', (result) => {
const { resolve } = worker.currentTask;
resolve(result);
worker.isIdle = true;
this.processQueue();
});
worker.on('error', (error) => {
const { reject } = worker.currentTask;
reject(error);
worker.isIdle = true;
this.processQueue();
});
this.workers.push(worker);
}
execute(data) {
return new Promise((resolve, reject) => {
const task = { data, resolve, reject };
const idleWorker = this.workers.find(w => w.isIdle);
if (idleWorker) {
this.assignTask(idleWorker, task);
} else {
this.queue.push(task);
}
});
}
assignTask(worker, task) {
worker.isIdle = false;
worker.currentTask = task;
worker.postMessage(task.data);
}
processQueue() {
if (this.queue.length === 0) return;
const idleWorker = this.workers.find(w => w.isIdle);
if (idleWorker) {
const task = this.queue.shift();
this.assignTask(idleWorker, task);
}
}
terminate() {
this.workers.forEach(worker => worker.terminate());
}
}
module.exports = WorkerPool;
И воркер-скрипт heavy-task-worker.js
:
const { parentPort } = require('worker_threads');
parentPort.on('message', (data) => {
const { operation, payload } = data;
let result;
switch (operation) {
case 'fibonacci':
result = fibonacci(payload);
break;
case 'prime':
result = isPrime(payload);
break;
case 'hash':
result = computeHash(payload);
break;
default:
result = 'Unknown operation';
}
parentPort.postMessage(result);
});
function fibonacci(n) {
if (n < 2) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
function isPrime(num) {
if (num <= 1) return false;
if (num <= 3) return true;
if (num % 2 === 0 || num % 3 === 0) return false;
for (let i = 5; i * i <= num; i += 6) {
if (num % i === 0 || num % (i + 2) === 0) return false;
}
return true;
}
function computeHash(str) {
let hash = 0;
for (let i = 0; i < str.length; i++) {
const char = str.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash; // Convert to 32-bit integer
}
return hash;
}
Cluster Module: форкинг процессов
Cluster Module создаёт отдельные процессы, а не потоки. Это полезно для масштабирования HTTP-серверов:
const cluster = require('cluster');
const os = require('os');
const express = require('express');
if (cluster.isMaster) {
const numCPUs = os.cpus().length;
console.log(`Мастер-процесс ${process.pid} запущен`);
console.log(`Создаём ${numCPUs} воркеров`);
// Создаём воркеры
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`Воркер ${worker.process.pid} завершился`);
console.log('Создаём нового воркера');
cluster.fork();
});
} else {
const app = express();
app.get('/', (req, res) => {
res.json({
message: 'Привет из воркера!',
pid: process.pid,
worker: cluster.worker.id
});
});
app.get('/heavy', (req, res) => {
// Имитация тяжёлой задачи
const start = Date.now();
let counter = 0;
while (Date.now() - start < 5000) {
counter++;
}
res.json({
result: counter,
pid: process.pid,
worker: cluster.worker.id
});
});
const server = app.listen(3000, () => {
console.log(`Воркер ${process.pid} слушает порт 3000`);
});
process.on('SIGTERM', () => {
console.log(`Воркер ${process.pid} получил SIGTERM`);
server.close(() => {
process.exit(0);
});
});
}
Сравнение подходов: таблица выбора
Подход | Использование памяти | Скорость создания | Общение между потоками | Лучший случай использования |
---|---|---|---|---|
Worker Threads | Низкое (общая память) | Быстрое | SharedArrayBuffer, postMessage | CPU-интенсивные задачи |
Cluster Module | Высокое (отдельные процессы) | Медленное | IPC, TCP/UDP | Масштабирование HTTP-серверов |
Child Process | Высокое | Медленное | stdio, IPC | Запуск внешних программ |
Libuv Thread Pool | Низкое | Автоматически | Callback | I/O операции (автоматически) |
Мониторинг и отладка
Для мониторинга многопоточных приложений используй эти инструменты:
# Установка пакетов для мониторинга
npm install --save-dev clinic autocannon
# Профилирование приложения
npx clinic doctor -- node app.js
# Нагрузочное тестирование
npx autocannon -c 100 -d 30s http://localhost:3000
# Мониторинг процессов
htop
# или
top -p $(pgrep -f "node")
Также полезно добавить метрики прямо в код:
const { performance } = require('perf_hooks');
function measureWorkerPerformance() {
const start = performance.now();
// Твоя логика
const end = performance.now();
console.log(`Выполнение заняло ${end - start} миллисекунд`);
}
// Мониторинг памяти
setInterval(() => {
const memUsage = process.memoryUsage();
console.log('Использование памяти:', {
rss: Math.round(memUsage.rss / 1024 / 1024) + 'MB',
heapUsed: Math.round(memUsage.heapUsed / 1024 / 1024) + 'MB',
heapTotal: Math.round(memUsage.heapTotal / 1024 / 1024) + 'MB'
});
}, 10000);
Типичные ошибки и как их избежать
Ошибка 1: Блокировка основного потока
Плохо:
// Не делай так!
app.get('/bad', (req, res) => {
let result = 0;
for (let i = 0; i < 10000000; i++) {
result += Math.sqrt(i);
}
res.json({ result });
});
Хорошо:
app.get('/good', async (req, res) => {
try {
const result = await workerPool.execute({
operation: 'heavy-calculation',
iterations: 10000000
});
res.json({ result });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
Ошибка 2: Создание слишком многих воркеров
Не создавай воркеры на каждый запрос. Используй пул:
// Инициализация пула при старте приложения
const workerPool = new WorkerPool('./heavy-task-worker.js', os.cpus().length);
// Graceful shutdown
process.on('SIGTERM', () => {
workerPool.terminate();
process.exit(0);
});
Интеграция с другими инструментами
Многопоточность отлично работает с популярными пакетами:
- Redis — для кэширования результатов между воркерами
- PM2 — для управления кластерами в продакшене
- Bull — для очередей задач
- Prometheus — для мониторинга метрик
Пример интеграции с Redis:
const redis = require('redis');
const client = redis.createClient();
// В основном потоке
app.get('/cached-heavy-task/:id', async (req, res) => {
const { id } = req.params;
const cacheKey = `heavy-task:${id}`;
try {
// Проверяем кэш
const cached = await client.get(cacheKey);
if (cached) {
return res.json({ result: JSON.parse(cached), cached: true });
}
// Выполняем в воркере
const result = await workerPool.execute({
operation: 'heavy-task',
id: id
});
// Кэшируем результат
await client.setex(cacheKey, 3600, JSON.stringify(result));
res.json({ result, cached: false });
} catch (error) {
res.status(500).json({ error: error.message });
}
});
Настройка продакшен-окружения
Для продакшена рекомендую использовать PM2 с конфигурацией:
# ecosystem.config.js
module.exports = {
apps: [{
name: 'my-app',
script: './app.js',
instances: 'max',
exec_mode: 'cluster',
env: {
NODE_ENV: 'production',
PORT: 3000
},
error_file: './logs/err.log',
out_file: './logs/out.log',
log_file: './logs/combined.log',
max_memory_restart: '1G'
}]
};
Запуск:
# Установка PM2
npm install -g pm2
# Запуск приложения
pm2 start ecosystem.config.js
# Мониторинг
pm2 monit
# Логи
pm2 logs
# Рестарт
pm2 restart my-app
Если планируешь серьёзную нагрузку, стоит рассмотреть выделенный сервер с достаточным количеством ядер.
Автоматизация и скрипты
Многопоточность открывает новые возможности для автоматизации:
Параллельная обработка файлов
const fs = require('fs').promises;
const path = require('path');
async function processFilesInParallel(directory) {
const files = await fs.readdir(directory);
const chunks = [];
const chunkSize = Math.ceil(files.length / os.cpus().length);
for (let i = 0; i < files.length; i += chunkSize) {
chunks.push(files.slice(i, i + chunkSize));
}
const workers = chunks.map(chunk => {
return workerPool.execute({
operation: 'process-files',
files: chunk.map(f => path.join(directory, f))
});
});
const results = await Promise.all(workers);
return results.flat();
}
Скрипт для бэкапа с распараллеливанием
#!/usr/bin/env node
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
const path = require('path');
const fs = require('fs').promises;
if (isMainThread) {
async function backup() {
const directories = [
'/var/www/site1',
'/var/www/site2',
'/var/www/site3'
];
const workers = directories.map(dir => {
return new Promise((resolve, reject) => {
const worker = new Worker(__filename, {
workerData: { directory: dir }
});
worker.on('message', resolve);
worker.on('error', reject);
});
});
try {
const results = await Promise.all(workers);
console.log('Бэкап завершён:', results);
} catch (error) {
console.error('Ошибка бэкапа:', error);
}
}
backup();
} else {
const { directory } = workerData;
async function createBackup(dir) {
const backupName = `backup-${path.basename(dir)}-${Date.now()}.tar.gz`;
const { exec } = require('child_process');
return new Promise((resolve, reject) => {
exec(`tar -czf /backups/${backupName} ${dir}`, (error, stdout, stderr) => {
if (error) {
reject(error);
} else {
resolve({ directory: dir, backup: backupName });
}
});
});
}
createBackup(directory)
.then(result => parentPort.postMessage(result))
.catch(error => parentPort.postMessage({ error: error.message }));
}
Интересные факты и нестандартные применения
- SharedArrayBuffer — позволяет воркерам работать с общей памятью, что открывает возможности для реализации низкоуровневых алгоритмов
- Atomics — для синхронизации между потоками без блокировок
- Transferable Objects — можно передавать ArrayBuffer между потоками без копирования
Пример с SharedArrayBuffer:
// Создание общей памяти
const sharedBuffer = new SharedArrayBuffer(1024);
const sharedArray = new Int32Array(sharedBuffer);
// В основном потоке
const worker = new Worker('./shared-worker.js', {
transferList: [sharedBuffer]
});
// shared-worker.js
const { parentPort } = require('worker_threads');
parentPort.on('message', (sharedBuffer) => {
const sharedArray = new Int32Array(sharedBuffer);
// Атомарные операции
Atomics.add(sharedArray, 0, 1);
const value = Atomics.load(sharedArray, 0);
parentPort.postMessage({ value });
});
Заключение и рекомендации
Многопоточность в Node.js — это не магия, а инструмент, который нужно использовать с умом. Вот основные рекомендации:
- Используй Worker Threads для CPU-интенсивных задач — парсинг больших JSON, обработка изображений, сложные вычисления
- Cluster Module подходит для масштабирования HTTP-серверов — один процесс на ядро
- Не создавай воркеры на каждый запрос — используй пулы
- Мониторь потребление памяти — воркеры могут привести к утечкам
- Graceful shutdown — всегда правильно завершай воркеры
Для high-load проектов это критично. Правильная настройка многопоточности может увеличить производительность в разы, но неправильная — убить сервер. Тестируй нагрузку, мониторь метрики и помни: иногда проще добавить ещё одно ядро на сервере, чем оптимизировать код до смерти.
Если серьёзно подходишь к вопросу производительности, стоит рассмотреть серверы с хорошими характеристиками — многопоточность раскрывается только на многоядерных системах.
Полезные ссылки:
- Worker Threads API
- Cluster Module
- p-queue — продвинутая библиотека для очередей
В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.
Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.