Home » Как запускать внешние программы с помощью subprocess в Python 3
Как запускать внешние программы с помощью subprocess в Python 3

Как запускать внешние программы с помощью subprocess в Python 3

Когда администрируешь серверы, часто нужно запускать внешние команды из Python-скриптов. Backup’ы, мониторинг, управление процессами — без этого никуда. Модуль subprocess в Python 3 — это твой швейцарский нож для таких задач. Он позволяет выполнять системные команды, получать их вывод и контролировать выполнение. В этой статье разберём, как правильно использовать subprocess, избежать распространённых ошибок и построить надёжную автоматизацию.

Основы subprocess — как это работает

Модуль subprocess заменил старые os.system() и os.popen(), предоставив более мощный и безопасный интерфейс. Основная идея — создать новый процесс, выполнить в нём команду и получить результат.

Главные функции для работы:

  • subprocess.run() — основная функция для большинства задач
  • subprocess.Popen() — низкоуровневый интерфейс для сложных сценариев
  • subprocess.call() — устаревшая, но иногда встречается в legacy коде

Простейший пример:

import subprocess

# Выполнение команды и получение результата
result = subprocess.run(['ls', '-la'], capture_output=True, text=True)
print(result.stdout)
print(result.stderr)
print(f"Exit code: {result.returncode}")

Пошаговая настройка и основные приёмы

Для серверного администрирования нужно понимать несколько ключевых моментов:

1. Выбор правильной функции

# Для простых задач — subprocess.run()
result = subprocess.run(['systemctl', 'status', 'nginx'], 
                       capture_output=True, text=True)

# Для интерактивных команд — subprocess.Popen()
process = subprocess.Popen(['top'], 
                          stdout=subprocess.PIPE, 
                          stderr=subprocess.PIPE,
                          text=True)

2. Обработка аргументов

# Правильно — список аргументов
subprocess.run(['git', 'clone', 'https://github.com/user/repo.git'])

# Неправильно — строка (небезопасно!)
subprocess.run('git clone https://github.com/user/repo.git', shell=True)

# Если нужен shell — используй shlex
import shlex
command = 'find /var/log -name "*.log" -size +100M'
subprocess.run(shlex.split(command))

3. Управление выводом

# Захват вывода
result = subprocess.run(['df', '-h'], capture_output=True, text=True)
if result.returncode == 0:
    print("Disk usage:")
    print(result.stdout)
else:
    print("Error:", result.stderr)

# Перенаправление в файл
with open('/var/log/backup.log', 'a') as log_file:
    subprocess.run(['rsync', '-av', '/data/', '/backup/'], 
                   stdout=log_file, stderr=log_file)

Практические примеры и кейсы

Мониторинг системы

import subprocess
import json

def get_system_info():
    """Получение информации о системе"""
    info = {}
    
    # CPU информация
    result = subprocess.run(['top', '-bn1'], capture_output=True, text=True)
    if result.returncode == 0:
        lines = result.stdout.split('\n')
        for line in lines:
            if 'load average' in line:
                info['load_average'] = line.split('load average: ')[1]
                break
    
    # Дисковое пространство
    result = subprocess.run(['df', '-h', '/'], capture_output=True, text=True)
    if result.returncode == 0:
        lines = result.stdout.strip().split('\n')
        if len(lines) > 1:
            fields = lines[1].split()
            info['disk_usage'] = {
                'total': fields[1],
                'used': fields[2],
                'available': fields[3],
                'percent': fields[4]
            }
    
    # Память
    result = subprocess.run(['free', '-h'], capture_output=True, text=True)
    if result.returncode == 0:
        lines = result.stdout.split('\n')
        for line in lines:
            if line.startswith('Mem:'):
                fields = line.split()
                info['memory'] = {
                    'total': fields[1],
                    'used': fields[2],
                    'available': fields[6]
                }
                break
    
    return info

# Использование
system_info = get_system_info()
print(json.dumps(system_info, indent=2))

Управление сервисами

def service_manager(service_name, action):
    """Управление системными сервисами"""
    valid_actions = ['start', 'stop', 'restart', 'status', 'enable', 'disable']
    
    if action not in valid_actions:
        return False, f"Invalid action. Use: {', '.join(valid_actions)}"
    
    try:
        result = subprocess.run(['systemctl', action, service_name], 
                               capture_output=True, text=True, timeout=30)
        
        if result.returncode == 0:
            return True, result.stdout
        else:
            return False, result.stderr
    
    except subprocess.TimeoutExpired:
        return False, "Command timed out"
    except Exception as e:
        return False, f"Error: {str(e)}"

# Примеры использования
success, output = service_manager('nginx', 'status')
if success:
    print("Nginx status:", output)
else:
    print("Error:", output)

# Перезапуск сервиса
success, output = service_manager('nginx', 'restart')
if success:
    print("Nginx restarted successfully")

Backup скрипт

import subprocess
import datetime
import os

def create_backup(source_dir, backup_dir, exclude_patterns=None):
    """Создание backup'а с помощью rsync"""
    timestamp = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')
    backup_path = os.path.join(backup_dir, f'backup_{timestamp}')
    
    # Создаём директорию для backup'а
    os.makedirs(backup_path, exist_ok=True)
    
    # Формируем команду rsync
    cmd = ['rsync', '-av', '--progress']
    
    # Добавляем исключения
    if exclude_patterns:
        for pattern in exclude_patterns:
            cmd.extend(['--exclude', pattern])
    
    cmd.extend([source_dir, backup_path])
    
    # Выполняем backup
    try:
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=3600)
        
        if result.returncode == 0:
            return True, f"Backup created: {backup_path}"
        else:
            return False, result.stderr
    
    except subprocess.TimeoutExpired:
        return False, "Backup timeout (1 hour exceeded)"

# Использование
exclude_list = ['*.log', '*.tmp', '__pycache__', '.git']
success, message = create_backup('/var/www/', '/backup/', exclude_list)
print(message)

Обработка ошибок и best practices

Проблема Плохой подход Хороший подход
Обработка ошибок Игнорирование return code Всегда проверяй result.returncode
Безопасность shell=True с пользовательским вводом Список аргументов или shlex.split()
Блокировка Без timeout’ов Всегда используй timeout
Кодировка Работа с bytes text=True для строк

Правильная обработка ошибок:

def safe_command(cmd, timeout=30):
    """Безопасное выполнение команды"""
    try:
        result = subprocess.run(cmd, 
                               capture_output=True, 
                               text=True, 
                               timeout=timeout,
                               check=False)  # Не raise exception при ненулевом exit code
        
        return {
            'success': result.returncode == 0,
            'stdout': result.stdout,
            'stderr': result.stderr,
            'returncode': result.returncode
        }
    
    except subprocess.TimeoutExpired:
        return {
            'success': False,
            'error': 'Command timed out',
            'timeout': True
        }
    except FileNotFoundError:
        return {
            'success': False,
            'error': 'Command not found'
        }
    except Exception as e:
        return {
            'success': False,
            'error': str(e)
        }

# Использование
result = safe_command(['ping', '-c', '3', 'google.com'])
if result['success']:
    print("Ping successful:", result['stdout'])
else:
    print("Ping failed:", result.get('error', result['stderr']))

Продвинутые техники

Streaming output

def stream_command(cmd):
    """Потоковый вывод команды"""
    process = subprocess.Popen(cmd, 
                              stdout=subprocess.PIPE, 
                              stderr=subprocess.STDOUT,
                              text=True,
                              bufsize=1,
                              universal_newlines=True)
    
    for line in process.stdout:
        print(f"[{datetime.datetime.now()}] {line.strip()}")
    
    process.wait()
    return process.returncode

# Пример с tail -f
stream_command(['tail', '-f', '/var/log/nginx/access.log'])

Параллельное выполнение

import concurrent.futures
import subprocess

def check_host(host):
    """Проверка доступности хоста"""
    result = subprocess.run(['ping', '-c', '1', '-W', '2', host], 
                           capture_output=True, text=True)
    return host, result.returncode == 0

def check_multiple_hosts(hosts):
    """Параллельная проверка нескольких хостов"""
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        future_to_host = {executor.submit(check_host, host): host for host in hosts}
        results = {}
        
        for future in concurrent.futures.as_completed(future_to_host):
            host, is_alive = future.result()
            results[host] = is_alive
    
    return results

# Проверка списка серверов
servers = ['google.com', 'github.com', 'stackoverflow.com', '8.8.8.8']
results = check_multiple_hosts(servers)
for host, status in results.items():
    print(f"{host}: {'UP' if status else 'DOWN'}")

Интеграция с другими инструментами

Логирование

import logging
import subprocess

# Настройка логирования
logging.basicConfig(level=logging.INFO, 
                   format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def logged_subprocess(cmd, description="Command"):
    """Выполнение команды с логированием"""
    logger.info(f"Executing: {description}")
    logger.debug(f"Command: {' '.join(cmd)}")
    
    try:
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=60)
        
        if result.returncode == 0:
            logger.info(f"Success: {description}")
            logger.debug(f"Output: {result.stdout}")
        else:
            logger.error(f"Failed: {description}")
            logger.error(f"Error: {result.stderr}")
        
        return result
    
    except Exception as e:
        logger.error(f"Exception in {description}: {str(e)}")
        raise

# Использование
result = logged_subprocess(['systemctl', 'reload', 'nginx'], "Nginx reload")

Конфигурация через JSON

import json
import subprocess

# config.json
config = {
    "commands": {
        "backup": {
            "cmd": ["rsync", "-av"],
            "timeout": 3600,
            "critical": True
        },
        "cleanup": {
            "cmd": ["find", "/tmp", "-type", "f", "-mtime", "+7", "-delete"],
            "timeout": 300,
            "critical": False
        }
    }
}

def execute_from_config(config_file, command_name, args=None):
    """Выполнение команды из конфигурации"""
    with open(config_file, 'r') as f:
        config = json.load(f)
    
    if command_name not in config['commands']:
        return False, f"Command {command_name} not found"
    
    cmd_config = config['commands'][command_name]
    cmd = cmd_config['cmd'][:]
    
    if args:
        cmd.extend(args)
    
    try:
        result = subprocess.run(cmd, 
                               capture_output=True, 
                               text=True, 
                               timeout=cmd_config.get('timeout', 60))
        
        return result.returncode == 0, result.stdout if result.returncode == 0 else result.stderr
    
    except subprocess.TimeoutExpired:
        return False, "Command timed out"

# Использование
success, output = execute_from_config('config.json', 'backup', ['/data/', '/backup/'])

Альтернативы и сравнения

Хотя subprocess — стандартный выбор, есть альтернативы:

  • sh — более питоничный синтаксис для shell команд
  • plumbum — shell-like синтаксис в Python
  • invoke — фреймворк для task automation
  • fabric — для удалённого выполнения команд

Сравнение производительности:

import time
import subprocess

# Тест производительности
def benchmark_methods():
    commands = [['echo', 'hello'] for _ in range(100)]
    
    # subprocess.run
    start = time.time()
    for cmd in commands:
        subprocess.run(cmd, capture_output=True)
    print(f"subprocess.run: {time.time() - start:.2f}s")
    
    # subprocess.Popen
    start = time.time()
    for cmd in commands:
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
        p.wait()
    print(f"subprocess.Popen: {time.time() - start:.2f}s")

benchmark_methods()

Автоматизация и DevOps интеграция

Для серверного администрирования subprocess открывает массу возможностей:

  • Мониторинг — создание custom метрик и алертов
  • Деплой — автоматизация развёртывания приложений
  • Backup — гибкие стратегии резервного копирования
  • Управление конфигурацией — динамическая настройка сервисов

Пример интеграции с мониторингом:

import subprocess
import requests
import time

def monitor_service(service_name, webhook_url):
    """Мониторинг сервиса с уведомлениями"""
    while True:
        result = subprocess.run(['systemctl', 'is-active', service_name], 
                               capture_output=True, text=True)
        
        if result.returncode != 0:
            # Сервис не работает, отправляем уведомление
            message = f"Service {service_name} is down!"
            requests.post(webhook_url, json={'text': message})
            
            # Пытаемся перезапустить
            restart_result = subprocess.run(['systemctl', 'restart', service_name], 
                                          capture_output=True, text=True)
            
            if restart_result.returncode == 0:
                requests.post(webhook_url, json={'text': f"Service {service_name} restarted"})
        
        time.sleep(60)  # Проверяем каждую минуту

# Запуск мониторинга
monitor_service('nginx', 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL')

Безопасность и производительность

Важные аспекты для production использования:

  • Валидация входных данных — никогда не передавай пользовательский ввод напрямую в команды
  • Ограничения ресурсов — используй timeout и контроль памяти
  • Права доступа — запускай с минимальными привилегиями
  • Аудит — логируй все выполняемые команды

Пример безопасного выполнения:

import subprocess
import re
import resource

def secure_execute(cmd, allowed_commands=None, max_memory=100*1024*1024):
    """Безопасное выполнение команды"""
    if allowed_commands and cmd[0] not in allowed_commands:
        raise ValueError(f"Command {cmd[0]} not allowed")
    
    # Валидация аргументов
    for arg in cmd:
        if not re.match(r'^[a-zA-Z0-9/_.-]+$', arg):
            raise ValueError(f"Invalid argument: {arg}")
    
    # Ограничение памяти
    def limit_memory():
        resource.setrlimit(resource.RLIMIT_AS, (max_memory, max_memory))
    
    try:
        result = subprocess.run(cmd, 
                               capture_output=True, 
                               text=True, 
                               timeout=30,
                               preexec_fn=limit_memory)
        return result
    except Exception as e:
        raise SecurityError(f"Command execution failed: {str(e)}")

# Использование
allowed_cmds = ['ls', 'ps', 'df', 'free']
result = secure_execute(['ls', '-la'], allowed_cmds)

Заключение и рекомендации

subprocess — мощный инструмент для системного администрирования в Python. Для эффективного использования:

  • Всегда используй subprocess.run() для простых задач
  • Обрабатывай ошибки — проверяй returncode и stderr
  • Используй timeout’ы — предотвращай зависание скриптов
  • Избегай shell=True — это небезопасно
  • Логируй выполнение — помогает при отладке

Если администрируешь серверы, обязательно изучи subprocess — он существенно упростит автоматизацию рутинных задач. Для development и staging окружений можешь попробовать VPS, а для production нагрузок лучше взять выделенный сервер.

subprocess превращает Python в полноценный инструмент системного администрирования, позволяя создавать сложные скрипты автоматизации с минимальными усилиями. Главное — помни о безопасности и правильной обработке ошибок.


В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.

Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.

Leave a reply

Your email address will not be published. Required fields are marked