Home » Построение пайплайна обработки данных с Luigi на Python в Ubuntu 24
Построение пайплайна обработки данных с Luigi на Python в Ubuntu 24

Построение пайплайна обработки данных с Luigi на Python в Ubuntu 24

Если ты занимаешься обработкой данных на серверах, то наверняка сталкивался с задачей создания надежных пайплайнов. Ручное управление цепочками задач быстро превращается в головную боль, особенно когда нужно обрабатывать терабайты данных ежедневно. Luigi от Spotify — это именно то решение, которое сделает твою жизнь проще. Этот фреймворк позволяет создавать сложные pipeline’ы с зависимостями между задачами, автоматической обработкой ошибок и удобным мониторингом. Покажу, как быстро развернуть и настроить Luigi на Ubuntu 24, чтобы ты мог сразу начать автоматизировать свои процессы обработки данных.

Как работает Luigi и зачем он нужен

Luigi — это Python-фреймворк для создания пайплайнов обработки данных, который решает три основные проблемы:

  • Зависимости задач — автоматически определяет порядок выполнения
  • Отказоустойчивость — умеет перезапускать только провалившиеся задачи
  • Мониторинг — предоставляет веб-интерфейс для отслеживания прогресса

Основная фишка Luigi в том, что он работает по принципу “цели” (targets). Каждая задача производит определенный результат (файл, запись в БД), и Luigi автоматически строит граф зависимостей, выполняя только те задачи, которые действительно нужны.

Быстрая установка и настройка на Ubuntu 24

Начнем с подготовки окружения. Для полноценной работы понадобится VPS или выделенный сервер с Ubuntu 24:

# Обновляем систему
sudo apt update && sudo apt upgrade -y

# Устанавливаем Python и pip
sudo apt install python3 python3-pip python3-venv -y

# Создаем виртуальное окружение
python3 -m venv luigi_env
source luigi_env/bin/activate

# Устанавливаем Luigi
pip install luigi

# Дополнительные пакеты для работы с данными
pip install pandas numpy requests boto3 psycopg2-binary

Теперь создаем рабочую директорию и базовую конфигурацию:

# Создаем структуру проекта
mkdir -p ~/luigi_pipeline/{tasks,data,logs,config}
cd ~/luigi_pipeline

# Создаем конфигурационный файл
cat > config/luigi.cfg << 'EOF'
[core]
log_level=INFO
logging_conf_file=

[scheduler]
record_task_history=True
state_path=/tmp/luigi-state.pickle

[task_history]
db_connection=sqlite:///luigi_history.db

[worker]
keep_alive=True
count=1
EOF

Создание первого пайплайна

Давайте создадим простой, но реалистичный пример — пайплайн для обработки логов веб-сервера:

cat > tasks/log_pipeline.py << 'EOF'
import luigi
import pandas as pd
from datetime import datetime, timedelta
import os
import requests
from luigi.util import requires

class DownloadLogs(luigi.Task):
    """Скачивание логов с удаленного сервера"""
    date = luigi.DateParameter(default=datetime.now().date())
    
    def output(self):
        return luigi.LocalTarget(f'data/raw_logs_{self.date}.txt')
    
    def run(self):
        # Симулируем скачивание логов
        log_data = f"""
192.168.1.1 - - [{self.date} 10:30:45] "GET /api/users HTTP/1.1" 200 1234
192.168.1.2 - - [{self.date} 10:31:15] "POST /api/orders HTTP/1.1" 201 567
192.168.1.3 - - [{self.date} 10:32:30] "GET /api/products HTTP/1.1" 404 89
        """
        
        with self.output().open('w') as f:
            f.write(log_data.strip())

@requires(DownloadLogs)
class ParseLogs(luigi.Task):
    """Парсинг логов в структурированный формат"""
    
    def output(self):
        return luigi.LocalTarget(f'data/parsed_logs_{self.date}.csv')
    
    def run(self):
        # Простой парсер логов
        logs = []
        with self.input().open('r') as f:
            for line in f:
                if line.strip():
                    parts = line.split(' ')
                    logs.append({
                        'ip': parts[0],
                        'method': parts[5].strip('"'),
                        'url': parts[6],
                        'status': parts[8],
                        'size': parts[9]
                    })
        
        df = pd.DataFrame(logs)
        df.to_csv(self.output().path, index=False)

@requires(ParseLogs)
class GenerateReport(luigi.Task):
    """Генерация отчета по логам"""
    
    def output(self):
        return luigi.LocalTarget(f'data/report_{self.date}.html')
    
    def run(self):
        df = pd.read_csv(self.input().path)
        
        # Базовая аналитика
        status_counts = df['status'].value_counts()
        method_counts = df['method'].value_counts()
        
        html_report = f"""
        
        Log Report for {self.date}
        
            

Log Analysis Report

Status Codes

{''.join(f'' for status, count in status_counts.items())}
StatusCount
{status}{count}

HTTP Methods

{''.join(f'' for method, count in method_counts.items())}
MethodCount
{method}{count}
""" with self.output().open('w') as f: f.write(html_report) # Главная задача, которая запускает весь пайплайн class LogPipeline(luigi.WrapperTask): date = luigi.DateParameter(default=datetime.now().date()) def requires(self): return GenerateReport(date=self.date) EOF

Запуск и мониторинг

Теперь запускаем наш пайплайн. Luigi можно запускать в двух режимах:

# Запуск с локальным планировщиком (для тестирования)
cd ~/luigi_pipeline
python -m luigi --module tasks.log_pipeline LogPipeline --local-scheduler

# Запуск центрального планировщика (для продакшена)
# В одном терминале:
luigid --background --pidfile /tmp/luigi.pid --logdir logs/

# В другом терминале:
python -m luigi --module tasks.log_pipeline LogPipeline

Веб-интерфейс Luigi доступен по адресу http://localhost:8082. Здесь можно отслеживать выполнение задач, просматривать логи и анализировать зависимости.

Продвинутые возможности

Luigi становится по-настоящему мощным при работе с внешними системами. Вот несколько полезных паттернов:

# Работа с PostgreSQL
class DatabaseTask(luigi.Task):
    def output(self):
        return luigi.postgres.PostgresTarget(
            host='localhost',
            database='analytics',
            user='luigi',
            password='password',
            table='processed_data',
            update_id=f'task_{self.date}'
        )

# Работа с S3
class S3Task(luigi.Task):
    def output(self):
        return luigi.contrib.s3.S3Target(
            's3://my-bucket/data/processed_data.csv',
            aws_access_key_id='your_key',
            aws_secret_access_key='your_secret'
        )

# Параллельное выполнение задач
class ParallelProcessing(luigi.Task):
    def requires(self):
        return [ProcessChunk(chunk_id=i) for i in range(10)]
    
    def run(self):
        # Объединяем результаты всех чанков
        pass

Сравнение с альтернативами

Решение Плюсы Минусы Лучше для
Luigi Простота, веб-интерфейс, отказоустойчивость Нет динамических DAG Статичные пайплайны
Airflow Динамические DAG, мощные возможности Сложность настройки Комплексные системы
Prefect Современный API, облачная интеграция Относительно новый Cloud-native решения
Dagster Типизация, тестирование Крутая кривая обучения Data engineering

Интеграция с другими инструментами

Luigi отлично интегрируется с популярными инструментами data science:

# Интеграция с Apache Spark
class SparkTask(luigi.Task):
    def run(self):
        from pyspark.sql import SparkSession
        spark = SparkSession.builder.appName("LuigiSpark").getOrCreate()
        df = spark.read.csv(self.input().path, header=True)
        # Обработка данных
        df.write.csv(self.output().path)

# Интеграция с Docker
class DockerTask(luigi.Task):
    def run(self):
        import subprocess
        subprocess.run([
            'docker', 'run', '--rm',
            '-v', f'{os.getcwd()}:/data',
            'python:3.9',
            'python', '/data/processing_script.py'
        ])

# Интеграция с Kubernetes Jobs
class KubernetesTask(luigi.Task):
    def run(self):
        import yaml
        job_spec = {
            'apiVersion': 'batch/v1',
            'kind': 'Job',
            'metadata': {'name': f'luigi-job-{self.date}'},
            'spec': {
                'template': {
                    'spec': {
                        'containers': [{
                            'name': 'data-processor',
                            'image': 'my-processor:latest'
                        }],
                        'restartPolicy': 'Never'
                    }
                }
            }
        }
        # Запуск job в Kubernetes

Автоматизация и продакшен

Для продакшена создаем systemd service:

sudo tee /etc/systemd/system/luigi-scheduler.service << 'EOF'
[Unit]
Description=Luigi Scheduler
After=network.target

[Service]
Type=simple
User=luigi
Group=luigi
WorkingDirectory=/home/luigi/luigi_pipeline
Environment=PATH=/home/luigi/luigi_pipeline/luigi_env/bin
ExecStart=/home/luigi/luigi_pipeline/luigi_env/bin/luigid --port 8082 --background false
Restart=always

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl daemon-reload
sudo systemctl enable luigi-scheduler
sudo systemctl start luigi-scheduler

Настраиваем cron для автоматического запуска:

# Добавляем в crontab
crontab -e

# Запуск каждый день в 2:00
0 2 * * * cd /home/luigi/luigi_pipeline && ./luigi_env/bin/python -m luigi --module tasks.log_pipeline LogPipeline

Мониторинг и алерты

Для мониторинга можно использовать Prometheus метрики:

# Добавляем в пайплайн
from prometheus_client import Counter, Histogram, start_http_server

task_counter = Counter('luigi_tasks_total', 'Total Luigi tasks', ['status'])
task_duration = Histogram('luigi_task_duration_seconds', 'Task duration')

class MonitoredTask(luigi.Task):
    def run(self):
        start_time = time.time()
        try:
            # Основная логика
            task_counter.labels(status='success').inc()
        except Exception as e:
            task_counter.labels(status='failure').inc()
            raise
        finally:
            task_duration.observe(time.time() - start_time)

Полезные ссылки

Заключение и рекомендации

Luigi — это отличный выбор для создания надежных пайплайнов обработки данных, особенно если тебе нужно быстро начать работать без сложных настроек. Он идеально подходит для:

  • ETL процессов — извлечение, трансформация и загрузка данных
  • Batch обработки — регулярная обработка больших объемов данных
  • Аналитических пайплайнов — создание отчетов и дашбордов
  • ML пайплайнов — обучение и инференс моделей

Главные преимущества Luigi — простота освоения и надежность. Если твои задачи не требуют динамических DAG и сложной логики, Luigi справится на отлично. Для более комплексных сценариев стоит рассмотреть Airflow или Prefect.

Помни про мониторинг — всегда настраивай алерты на критические задачи и следи за производительностью через веб-интерфейс. Luigi хорошо масштабируется, поэтому не бойся использовать его даже для серьезных продакшен нагрузок.


В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.

Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.

Leave a reply

Your email address will not be published. Required fields are marked