- Home »

Построение пайплайна обработки данных с Luigi на Python в Ubuntu 24
Если ты занимаешься обработкой данных на серверах, то наверняка сталкивался с задачей создания надежных пайплайнов. Ручное управление цепочками задач быстро превращается в головную боль, особенно когда нужно обрабатывать терабайты данных ежедневно. Luigi от Spotify — это именно то решение, которое сделает твою жизнь проще. Этот фреймворк позволяет создавать сложные pipeline’ы с зависимостями между задачами, автоматической обработкой ошибок и удобным мониторингом. Покажу, как быстро развернуть и настроить Luigi на Ubuntu 24, чтобы ты мог сразу начать автоматизировать свои процессы обработки данных.
Как работает Luigi и зачем он нужен
Luigi — это Python-фреймворк для создания пайплайнов обработки данных, который решает три основные проблемы:
- Зависимости задач — автоматически определяет порядок выполнения
- Отказоустойчивость — умеет перезапускать только провалившиеся задачи
- Мониторинг — предоставляет веб-интерфейс для отслеживания прогресса
Основная фишка Luigi в том, что он работает по принципу “цели” (targets). Каждая задача производит определенный результат (файл, запись в БД), и Luigi автоматически строит граф зависимостей, выполняя только те задачи, которые действительно нужны.
Быстрая установка и настройка на Ubuntu 24
Начнем с подготовки окружения. Для полноценной работы понадобится VPS или выделенный сервер с Ubuntu 24:
# Обновляем систему
sudo apt update && sudo apt upgrade -y
# Устанавливаем Python и pip
sudo apt install python3 python3-pip python3-venv -y
# Создаем виртуальное окружение
python3 -m venv luigi_env
source luigi_env/bin/activate
# Устанавливаем Luigi
pip install luigi
# Дополнительные пакеты для работы с данными
pip install pandas numpy requests boto3 psycopg2-binary
Теперь создаем рабочую директорию и базовую конфигурацию:
# Создаем структуру проекта
mkdir -p ~/luigi_pipeline/{tasks,data,logs,config}
cd ~/luigi_pipeline
# Создаем конфигурационный файл
cat > config/luigi.cfg << 'EOF'
[core]
log_level=INFO
logging_conf_file=
[scheduler]
record_task_history=True
state_path=/tmp/luigi-state.pickle
[task_history]
db_connection=sqlite:///luigi_history.db
[worker]
keep_alive=True
count=1
EOF
Создание первого пайплайна
Давайте создадим простой, но реалистичный пример — пайплайн для обработки логов веб-сервера:
cat > tasks/log_pipeline.py << 'EOF'
import luigi
import pandas as pd
from datetime import datetime, timedelta
import os
import requests
from luigi.util import requires
class DownloadLogs(luigi.Task):
"""Скачивание логов с удаленного сервера"""
date = luigi.DateParameter(default=datetime.now().date())
def output(self):
return luigi.LocalTarget(f'data/raw_logs_{self.date}.txt')
def run(self):
# Симулируем скачивание логов
log_data = f"""
192.168.1.1 - - [{self.date} 10:30:45] "GET /api/users HTTP/1.1" 200 1234
192.168.1.2 - - [{self.date} 10:31:15] "POST /api/orders HTTP/1.1" 201 567
192.168.1.3 - - [{self.date} 10:32:30] "GET /api/products HTTP/1.1" 404 89
"""
with self.output().open('w') as f:
f.write(log_data.strip())
@requires(DownloadLogs)
class ParseLogs(luigi.Task):
"""Парсинг логов в структурированный формат"""
def output(self):
return luigi.LocalTarget(f'data/parsed_logs_{self.date}.csv')
def run(self):
# Простой парсер логов
logs = []
with self.input().open('r') as f:
for line in f:
if line.strip():
parts = line.split(' ')
logs.append({
'ip': parts[0],
'method': parts[5].strip('"'),
'url': parts[6],
'status': parts[8],
'size': parts[9]
})
df = pd.DataFrame(logs)
df.to_csv(self.output().path, index=False)
@requires(ParseLogs)
class GenerateReport(luigi.Task):
"""Генерация отчета по логам"""
def output(self):
return luigi.LocalTarget(f'data/report_{self.date}.html')
def run(self):
df = pd.read_csv(self.input().path)
# Базовая аналитика
status_counts = df['status'].value_counts()
method_counts = df['method'].value_counts()
html_report = f"""
Log Report for {self.date}
Log Analysis Report
Status Codes
Status Count
{''.join(f'{status} {count} '
for status, count in status_counts.items())}
HTTP Methods
Method Count
{''.join(f'{method} {count} '
for method, count in method_counts.items())}
"""
with self.output().open('w') as f:
f.write(html_report)
# Главная задача, которая запускает весь пайплайн
class LogPipeline(luigi.WrapperTask):
date = luigi.DateParameter(default=datetime.now().date())
def requires(self):
return GenerateReport(date=self.date)
EOF
Запуск и мониторинг
Теперь запускаем наш пайплайн. Luigi можно запускать в двух режимах:
# Запуск с локальным планировщиком (для тестирования)
cd ~/luigi_pipeline
python -m luigi --module tasks.log_pipeline LogPipeline --local-scheduler
# Запуск центрального планировщика (для продакшена)
# В одном терминале:
luigid --background --pidfile /tmp/luigi.pid --logdir logs/
# В другом терминале:
python -m luigi --module tasks.log_pipeline LogPipeline
Веб-интерфейс Luigi доступен по адресу http://localhost:8082
. Здесь можно отслеживать выполнение задач, просматривать логи и анализировать зависимости.
Продвинутые возможности
Luigi становится по-настоящему мощным при работе с внешними системами. Вот несколько полезных паттернов:
# Работа с PostgreSQL
class DatabaseTask(luigi.Task):
def output(self):
return luigi.postgres.PostgresTarget(
host='localhost',
database='analytics',
user='luigi',
password='password',
table='processed_data',
update_id=f'task_{self.date}'
)
# Работа с S3
class S3Task(luigi.Task):
def output(self):
return luigi.contrib.s3.S3Target(
's3://my-bucket/data/processed_data.csv',
aws_access_key_id='your_key',
aws_secret_access_key='your_secret'
)
# Параллельное выполнение задач
class ParallelProcessing(luigi.Task):
def requires(self):
return [ProcessChunk(chunk_id=i) for i in range(10)]
def run(self):
# Объединяем результаты всех чанков
pass
Сравнение с альтернативами
Решение | Плюсы | Минусы | Лучше для |
---|---|---|---|
Luigi | Простота, веб-интерфейс, отказоустойчивость | Нет динамических DAG | Статичные пайплайны |
Airflow | Динамические DAG, мощные возможности | Сложность настройки | Комплексные системы |
Prefect | Современный API, облачная интеграция | Относительно новый | Cloud-native решения |
Dagster | Типизация, тестирование | Крутая кривая обучения | Data engineering |
Интеграция с другими инструментами
Luigi отлично интегрируется с популярными инструментами data science:
# Интеграция с Apache Spark
class SparkTask(luigi.Task):
def run(self):
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("LuigiSpark").getOrCreate()
df = spark.read.csv(self.input().path, header=True)
# Обработка данных
df.write.csv(self.output().path)
# Интеграция с Docker
class DockerTask(luigi.Task):
def run(self):
import subprocess
subprocess.run([
'docker', 'run', '--rm',
'-v', f'{os.getcwd()}:/data',
'python:3.9',
'python', '/data/processing_script.py'
])
# Интеграция с Kubernetes Jobs
class KubernetesTask(luigi.Task):
def run(self):
import yaml
job_spec = {
'apiVersion': 'batch/v1',
'kind': 'Job',
'metadata': {'name': f'luigi-job-{self.date}'},
'spec': {
'template': {
'spec': {
'containers': [{
'name': 'data-processor',
'image': 'my-processor:latest'
}],
'restartPolicy': 'Never'
}
}
}
}
# Запуск job в Kubernetes
Автоматизация и продакшен
Для продакшена создаем systemd service:
sudo tee /etc/systemd/system/luigi-scheduler.service << 'EOF'
[Unit]
Description=Luigi Scheduler
After=network.target
[Service]
Type=simple
User=luigi
Group=luigi
WorkingDirectory=/home/luigi/luigi_pipeline
Environment=PATH=/home/luigi/luigi_pipeline/luigi_env/bin
ExecStart=/home/luigi/luigi_pipeline/luigi_env/bin/luigid --port 8082 --background false
Restart=always
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl daemon-reload
sudo systemctl enable luigi-scheduler
sudo systemctl start luigi-scheduler
Настраиваем cron для автоматического запуска:
# Добавляем в crontab
crontab -e
# Запуск каждый день в 2:00
0 2 * * * cd /home/luigi/luigi_pipeline && ./luigi_env/bin/python -m luigi --module tasks.log_pipeline LogPipeline
Мониторинг и алерты
Для мониторинга можно использовать Prometheus метрики:
# Добавляем в пайплайн
from prometheus_client import Counter, Histogram, start_http_server
task_counter = Counter('luigi_tasks_total', 'Total Luigi tasks', ['status'])
task_duration = Histogram('luigi_task_duration_seconds', 'Task duration')
class MonitoredTask(luigi.Task):
def run(self):
start_time = time.time()
try:
# Основная логика
task_counter.labels(status='success').inc()
except Exception as e:
task_counter.labels(status='failure').inc()
raise
finally:
task_duration.observe(time.time() - start_time)
Полезные ссылки
Заключение и рекомендации
Luigi — это отличный выбор для создания надежных пайплайнов обработки данных, особенно если тебе нужно быстро начать работать без сложных настроек. Он идеально подходит для:
- ETL процессов — извлечение, трансформация и загрузка данных
- Batch обработки — регулярная обработка больших объемов данных
- Аналитических пайплайнов — создание отчетов и дашбордов
- ML пайплайнов — обучение и инференс моделей
Главные преимущества Luigi — простота освоения и надежность. Если твои задачи не требуют динамических DAG и сложной логики, Luigi справится на отлично. Для более комплексных сценариев стоит рассмотреть Airflow или Prefect.
Помни про мониторинг — всегда настраивай алерты на критические задачи и следи за производительностью через веб-интерфейс. Luigi хорошо масштабируется, поэтому не бойся использовать его даже для серьезных продакшен нагрузок.
В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.
Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.