- Home »

Работа с классами в Tkinter: учебное пособие
Тестирование скриптов и утилит администрирования всегда проще с графическим интерфейсом. Когда нужно быстро накидать GUI для мониторинга серверов, управления конфигурациями или автоматизации развёртывания, Tkinter становится палочкой-выручалочкой. Но монолитный код с кучей виджетов превращается в неуправляемый месив. Объектно-ориентированный подход с классами спасает положение — код становится структурированным, переиспользуемым и легко расширяемым. Разберём, как правильно организовать архитектуру Tkinter-приложений для админских задач.
Зачем классы в Tkinter
Процедурный подход работает для простых скриптов, но когда приложение растёт — становится кошмаром. Классы решают несколько критических проблем:
- Инкапсуляция — виджеты, методы и данные логически группируются
- Переиспользование — создал класс окна подключения к серверу, используй везде
- Наследование — базовый класс для всех админских окон с общим функционалом
- Полиморфизм — разные типы серверов обрабатываются единообразно
Базовая архитектура приложения
Начнём с простейшего примера — класса основного окна:
import tkinter as tk
from tkinter import ttk, messagebox
import subprocess
import threading
class ServerManager:
def __init__(self, root):
self.root = root
self.root.title("Server Manager")
self.root.geometry("800x600")
# Настройка основного интерфейса
self.setup_ui()
def setup_ui(self):
# Главное меню
menubar = tk.Menu(self.root)
self.root.config(menu=menubar)
# Меню серверов
server_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="Servers", menu=server_menu)
server_menu.add_command(label="Add Server", command=self.add_server)
server_menu.add_command(label="Connect", command=self.connect_server)
# Основная рабочая область
self.main_frame = ttk.Frame(self.root)
self.main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# Список серверов
self.server_listbox = tk.Listbox(self.main_frame, height=15)
self.server_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# Панель управления
self.control_panel = ttk.Frame(self.main_frame)
self.control_panel.pack(side=tk.RIGHT, fill=tk.Y, padx=(10, 0))
# Кнопки управления
ttk.Button(self.control_panel, text="Ping Server",
command=self.ping_server).pack(pady=5)
ttk.Button(self.control_panel, text="Check Uptime",
command=self.check_uptime).pack(pady=5)
ttk.Button(self.control_panel, text="Monitor Resources",
command=self.monitor_resources).pack(pady=5)
def add_server(self):
# Открываем диалог добавления сервера
AddServerDialog(self.root, self.on_server_added)
def on_server_added(self, server_info):
# Обработка добавленного сервера
self.server_listbox.insert(tk.END, f"{server_info['name']} ({server_info['ip']})")
def ping_server(self):
selected = self.server_listbox.curselection()
if not selected:
messagebox.showwarning("Warning", "Select a server first")
return
# Асинхронная проверка ping
threading.Thread(target=self._ping_worker, daemon=True).start()
def _ping_worker(self):
try:
# Здесь будет реальная логика ping
result = subprocess.run(['ping', '-c', '4', '8.8.8.8'],
capture_output=True, text=True)
self.root.after(0, self._show_ping_result, result.stdout)
except Exception as e:
self.root.after(0, self._show_ping_result, f"Error: {str(e)}")
def _show_ping_result(self, result):
messagebox.showinfo("Ping Result", result)
def check_uptime(self):
# Проверка uptime сервера
pass
def monitor_resources(self):
# Открытие окна мониторинга ресурсов
ResourceMonitor(self.root)
def connect_server(self):
# SSH подключение к серверу
pass
if __name__ == "__main__":
root = tk.Tk()
app = ServerManager(root)
root.mainloop()
Диалоговые окна как отдельные классы
Каждое диалоговое окно должно быть отдельным классом. Это упрощает управление и переиспользование:
class AddServerDialog:
def __init__(self, parent, callback):
self.callback = callback
self.dialog = tk.Toplevel(parent)
self.dialog.title("Add Server")
self.dialog.geometry("400x300")
self.dialog.transient(parent)
self.dialog.grab_set()
self.setup_ui()
def setup_ui(self):
# Основная форма
form_frame = ttk.LabelFrame(self.dialog, text="Server Information")
form_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
# Поля ввода
ttk.Label(form_frame, text="Server Name:").grid(row=0, column=0, sticky=tk.W, pady=5)
self.name_entry = ttk.Entry(form_frame, width=30)
self.name_entry.grid(row=0, column=1, padx=10, pady=5)
ttk.Label(form_frame, text="IP Address:").grid(row=1, column=0, sticky=tk.W, pady=5)
self.ip_entry = ttk.Entry(form_frame, width=30)
self.ip_entry.grid(row=1, column=1, padx=10, pady=5)
ttk.Label(form_frame, text="SSH Port:").grid(row=2, column=0, sticky=tk.W, pady=5)
self.port_entry = ttk.Entry(form_frame, width=30)
self.port_entry.grid(row=2, column=1, padx=10, pady=5)
self.port_entry.insert(0, "22")
ttk.Label(form_frame, text="Username:").grid(row=3, column=0, sticky=tk.W, pady=5)
self.username_entry = ttk.Entry(form_frame, width=30)
self.username_entry.grid(row=3, column=1, padx=10, pady=5)
ttk.Label(form_frame, text="SSH Key Path:").grid(row=4, column=0, sticky=tk.W, pady=5)
key_frame = ttk.Frame(form_frame)
key_frame.grid(row=4, column=1, padx=10, pady=5)
self.key_entry = ttk.Entry(key_frame, width=25)
self.key_entry.pack(side=tk.LEFT)
ttk.Button(key_frame, text="Browse",
command=self.browse_key_file).pack(side=tk.RIGHT, padx=(5, 0))
# Кнопки управления
button_frame = ttk.Frame(self.dialog)
button_frame.pack(fill=tk.X, padx=10, pady=10)
ttk.Button(button_frame, text="Test Connection",
command=self.test_connection).pack(side=tk.LEFT, padx=(0, 5))
ttk.Button(button_frame, text="Cancel",
command=self.dialog.destroy).pack(side=tk.RIGHT)
ttk.Button(button_frame, text="Add Server",
command=self.add_server).pack(side=tk.RIGHT, padx=(0, 5))
def browse_key_file(self):
from tkinter import filedialog
filename = filedialog.askopenfilename(
title="Select SSH Key File",
filetypes=[("SSH Keys", "*.pem *.key"), ("All Files", "*.*")]
)
if filename:
self.key_entry.delete(0, tk.END)
self.key_entry.insert(0, filename)
def test_connection(self):
# Тестирование подключения к серверу
server_info = self.get_server_info()
if self.validate_input(server_info):
# Асинхронная проверка подключения
threading.Thread(target=self._test_connection_worker,
args=(server_info,), daemon=True).start()
def _test_connection_worker(self, server_info):
try:
# Здесь будет реальная проверка SSH подключения
cmd = [
'ssh', '-o', 'ConnectTimeout=5',
'-o', 'StrictHostKeyChecking=no',
f"{server_info['username']}@{server_info['ip']}",
'echo "Connection successful"'
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
self.dialog.after(0, self._show_connection_result, result.returncode == 0)
except Exception as e:
self.dialog.after(0, self._show_connection_result, False, str(e))
def _show_connection_result(self, success, error_msg=None):
if success:
messagebox.showinfo("Connection Test", "Connection successful!")
else:
messagebox.showerror("Connection Test",
f"Connection failed: {error_msg or 'Unknown error'}")
def add_server(self):
server_info = self.get_server_info()
if self.validate_input(server_info):
self.callback(server_info)
self.dialog.destroy()
def get_server_info(self):
return {
'name': self.name_entry.get().strip(),
'ip': self.ip_entry.get().strip(),
'port': self.port_entry.get().strip(),
'username': self.username_entry.get().strip(),
'key_path': self.key_entry.get().strip()
}
def validate_input(self, server_info):
if not server_info['name']:
messagebox.showerror("Error", "Server name is required")
return False
if not server_info['ip']:
messagebox.showerror("Error", "IP address is required")
return False
if not server_info['username']:
messagebox.showerror("Error", "Username is required")
return False
try:
port = int(server_info['port'])
if port < 1 or port > 65535:
raise ValueError()
except ValueError:
messagebox.showerror("Error", "Invalid port number")
return False
return True
Наследование для специализированных окон
Создаём базовый класс для всех окон мониторинга:
class BaseMonitorWindow:
def __init__(self, parent, title="Monitor"):
self.parent = parent
self.window = tk.Toplevel(parent)
self.window.title(title)
self.window.geometry("600x400")
self.window.protocol("WM_DELETE_WINDOW", self.on_close)
self.is_monitoring = False
self.monitor_thread = None
self.setup_base_ui()
self.setup_monitor_ui()
def setup_base_ui(self):
# Общие элементы интерфейса
self.toolbar = ttk.Frame(self.window)
self.toolbar.pack(fill=tk.X, padx=5, pady=5)
self.start_button = ttk.Button(self.toolbar, text="Start",
command=self.start_monitoring)
self.start_button.pack(side=tk.LEFT, padx=(0, 5))
self.stop_button = ttk.Button(self.toolbar, text="Stop",
command=self.stop_monitoring, state=tk.DISABLED)
self.stop_button.pack(side=tk.LEFT, padx=(0, 5))
self.clear_button = ttk.Button(self.toolbar, text="Clear",
command=self.clear_data)
self.clear_button.pack(side=tk.LEFT, padx=(0, 5))
self.auto_refresh_var = tk.BooleanVar()
self.auto_refresh_cb = ttk.Checkbutton(self.toolbar,
text="Auto Refresh",
variable=self.auto_refresh_var)
self.auto_refresh_cb.pack(side=tk.RIGHT)
# Интервал обновления
ttk.Label(self.toolbar, text="Interval (sec):").pack(side=tk.RIGHT, padx=(0, 5))
self.interval_var = tk.StringVar(value="5")
self.interval_entry = ttk.Entry(self.toolbar, textvariable=self.interval_var, width=5)
self.interval_entry.pack(side=tk.RIGHT, padx=(0, 5))
def setup_monitor_ui(self):
# Переопределяется в наследниках
pass
def start_monitoring(self):
self.is_monitoring = True
self.start_button.config(state=tk.DISABLED)
self.stop_button.config(state=tk.NORMAL)
self.monitor_thread = threading.Thread(target=self._monitor_worker, daemon=True)
self.monitor_thread.start()
def stop_monitoring(self):
self.is_monitoring = False
self.start_button.config(state=tk.NORMAL)
self.stop_button.config(state=tk.DISABLED)
def _monitor_worker(self):
while self.is_monitoring:
try:
data = self.collect_data()
self.window.after(0, self.update_display, data)
interval = int(self.interval_var.get())
for _ in range(interval * 10): # Проверяем каждые 0.1 сек
if not self.is_monitoring:
break
time.sleep(0.1)
except Exception as e:
self.window.after(0, self.show_error, str(e))
break
def collect_data(self):
# Переопределяется в наследниках
return {}
def update_display(self, data):
# Переопределяется в наследниках
pass
def clear_data(self):
# Переопределяется в наследниках
pass
def show_error(self, error_msg):
messagebox.showerror("Monitoring Error", error_msg)
self.stop_monitoring()
def on_close(self):
self.stop_monitoring()
self.window.destroy()
Практический пример — монитор ресурсов
Наследуем от базового класса для создания монитора ресурсов:
import time
import psutil
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
class ResourceMonitor(BaseMonitorWindow):
def __init__(self, parent):
self.cpu_data = []
self.memory_data = []
self.disk_data = []
self.timestamps = []
super().__init__(parent, "System Resource Monitor")
def setup_monitor_ui(self):
# Notebook для вкладок
self.notebook = ttk.Notebook(self.window)
self.notebook.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Вкладка с графиками
self.chart_frame = ttk.Frame(self.notebook)
self.notebook.add(self.chart_frame, text="Charts")
# Создание графиков
self.fig = Figure(figsize=(10, 6), dpi=100)
self.canvas = FigureCanvasTkAgg(self.fig, self.chart_frame)
self.canvas.get_tk_widget().pack(fill=tk.BOTH, expand=True)
# Подграфики
self.cpu_subplot = self.fig.add_subplot(3, 1, 1)
self.memory_subplot = self.fig.add_subplot(3, 1, 2)
self.disk_subplot = self.fig.add_subplot(3, 1, 3)
# Настройка подграфиков
self.cpu_subplot.set_title("CPU Usage (%)")
self.cpu_subplot.set_ylim(0, 100)
self.memory_subplot.set_title("Memory Usage (%)")
self.memory_subplot.set_ylim(0, 100)
self.disk_subplot.set_title("Disk Usage (%)")
self.disk_subplot.set_ylim(0, 100)
self.fig.tight_layout()
# Вкладка с таблицей процессов
self.process_frame = ttk.Frame(self.notebook)
self.notebook.add(self.process_frame, text="Processes")
# Treeview для процессов
columns = ('PID', 'Name', 'CPU %', 'Memory %', 'Status')
self.process_tree = ttk.Treeview(self.process_frame, columns=columns, show='headings')
for col in columns:
self.process_tree.heading(col, text=col)
self.process_tree.column(col, width=100)
# Scrollbar для таблицы
scrollbar = ttk.Scrollbar(self.process_frame, orient=tk.VERTICAL,
command=self.process_tree.yview)
self.process_tree.configure(yscrollcommand=scrollbar.set)
self.process_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Контекстное меню для процессов
self.process_menu = tk.Menu(self.process_tree, tearoff=0)
self.process_menu.add_command(label="Kill Process", command=self.kill_process)
self.process_menu.add_command(label="Process Details", command=self.show_process_details)
self.process_tree.bind("", self.show_process_menu)
# Вкладка с системной информацией
self.sysinfo_frame = ttk.Frame(self.notebook)
self.notebook.add(self.sysinfo_frame, text="System Info")
self.sysinfo_text = tk.Text(self.sysinfo_frame, wrap=tk.WORD)
self.sysinfo_text.pack(fill=tk.BOTH, expand=True, padx=5, pady=5)
# Заполняем системную информацию
self.update_system_info()
def collect_data(self):
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent', 'status']):
try:
processes.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return {
'cpu': cpu_percent,
'memory': memory.percent,
'disk': disk.percent,
'processes': sorted(processes, key=lambda x: x['cpu_percent'], reverse=True)[:20]
}
def update_display(self, data):
current_time = time.time()
# Обновляем данные для графиков
self.cpu_data.append(data['cpu'])
self.memory_data.append(data['memory'])
self.disk_data.append(data['disk'])
self.timestamps.append(current_time)
# Ограничиваем количество точек на графике
max_points = 50
if len(self.timestamps) > max_points:
self.cpu_data = self.cpu_data[-max_points:]
self.memory_data = self.memory_data[-max_points:]
self.disk_data = self.disk_data[-max_points:]
self.timestamps = self.timestamps[-max_points:]
# Обновляем графики
self.cpu_subplot.clear()
self.memory_subplot.clear()
self.disk_subplot.clear()
if self.timestamps:
times = [(t - self.timestamps[0]) for t in self.timestamps]
self.cpu_subplot.plot(times, self.cpu_data, 'b-', linewidth=2)
self.cpu_subplot.set_title(f"CPU Usage: {data['cpu']:.1f}%")
self.cpu_subplot.set_ylim(0, 100)
self.cpu_subplot.grid(True, alpha=0.3)
self.memory_subplot.plot(times, self.memory_data, 'r-', linewidth=2)
self.memory_subplot.set_title(f"Memory Usage: {data['memory']:.1f}%")
self.memory_subplot.set_ylim(0, 100)
self.memory_subplot.grid(True, alpha=0.3)
self.disk_subplot.plot(times, self.disk_data, 'g-', linewidth=2)
self.disk_subplot.set_title(f"Disk Usage: {data['disk']:.1f}%")
self.disk_subplot.set_ylim(0, 100)
self.disk_subplot.grid(True, alpha=0.3)
self.fig.tight_layout()
self.canvas.draw()
# Обновляем таблицу процессов
self.process_tree.delete(*self.process_tree.get_children())
for proc in data['processes']:
self.process_tree.insert('', tk.END, values=(
proc['pid'],
proc['name'][:20],
f"{proc['cpu_percent']:.1f}",
f"{proc['memory_percent']:.1f}",
proc['status']
))
def clear_data(self):
self.cpu_data.clear()
self.memory_data.clear()
self.disk_data.clear()
self.timestamps.clear()
self.cpu_subplot.clear()
self.memory_subplot.clear()
self.disk_subplot.clear()
self.canvas.draw()
self.process_tree.delete(*self.process_tree.get_children())
def show_process_menu(self, event):
item = self.process_tree.selection()[0]
if item:
self.process_menu.post(event.x_root, event.y_root)
def kill_process(self):
selected = self.process_tree.selection()
if selected:
item = self.process_tree.item(selected[0])
pid = int(item['values'][0])
try:
proc = psutil.Process(pid)
proc.terminate()
messagebox.showinfo("Success", f"Process {pid} terminated")
except Exception as e:
messagebox.showerror("Error", f"Failed to kill process: {str(e)}")
def show_process_details(self):
selected = self.process_tree.selection()
if selected:
item = self.process_tree.item(selected[0])
pid = int(item['values'][0])
ProcessDetailsDialog(self.window, pid)
def update_system_info(self):
info = []
# CPU информация
info.append("=== CPU Information ===")
info.append(f"Physical cores: {psutil.cpu_count(logical=False)}")
info.append(f"Total cores: {psutil.cpu_count(logical=True)}")
info.append(f"Max frequency: {psutil.cpu_freq().max:.2f} MHz")
info.append("")
# Memory информация
memory = psutil.virtual_memory()
info.append("=== Memory Information ===")
info.append(f"Total: {memory.total / (1024**3):.1f} GB")
info.append(f"Available: {memory.available / (1024**3):.1f} GB")
info.append(f"Used: {memory.used / (1024**3):.1f} GB")
info.append(f"Percentage: {memory.percent}%")
info.append("")
# Disk информация
info.append("=== Disk Information ===")
partitions = psutil.disk_partitions()
for partition in partitions:
try:
usage = psutil.disk_usage(partition.mountpoint)
info.append(f"Device: {partition.device}")
info.append(f" Mountpoint: {partition.mountpoint}")
info.append(f" File system: {partition.fstype}")
info.append(f" Total size: {usage.total / (1024**3):.1f} GB")
info.append(f" Used: {usage.used / (1024**3):.1f} GB")
info.append(f" Free: {usage.free / (1024**3):.1f} GB")
info.append(f" Percentage: {usage.percent}%")
info.append("")
except PermissionError:
info.append(f"Device: {partition.device} - Permission denied")
# Network информация
info.append("=== Network Information ===")
net_io = psutil.net_io_counters()
info.append(f"Bytes sent: {net_io.bytes_sent / (1024**2):.1f} MB")
info.append(f"Bytes received: {net_io.bytes_recv / (1024**2):.1f} MB")
info.append(f"Packets sent: {net_io.packets_sent}")
info.append(f"Packets received: {net_io.packets_recv}")
self.sysinfo_text.delete(1.0, tk.END)
self.sysinfo_text.insert(tk.END, "\n".join(info))
Паттерн MVC в Tkinter
Для сложных приложений используем паттерн Model-View-Controller:
# models.py
class ServerModel:
def __init__(self):
self.servers = []
self.observers = []
def add_observer(self, observer):
self.observers.append(observer)
def remove_observer(self, observer):
self.observers.remove(observer)
def notify_observers(self, event, data):
for observer in self.observers:
observer.update(event, data)
def add_server(self, server_info):
self.servers.append(server_info)
self.notify_observers('server_added', server_info)
def remove_server(self, index):
if 0 <= index < len(self.servers):
removed = self.servers.pop(index)
self.notify_observers('server_removed', removed)
def get_servers(self):
return self.servers.copy()
def ping_server(self, server_info):
# Асинхронная проверка сервера
import threading
threading.Thread(target=self._ping_worker, args=(server_info,), daemon=True).start()
def _ping_worker(self, server_info):
try:
result = subprocess.run(['ping', '-c', '4', server_info['ip']],
capture_output=True, text=True, timeout=30)
self.notify_observers('ping_result', {
'server': server_info,
'success': result.returncode == 0,
'output': result.stdout
})
except Exception as e:
self.notify_observers('ping_result', {
'server': server_info,
'success': False,
'error': str(e)
})
# controllers.py
class ServerController:
def __init__(self, model, view):
self.model = model
self.view = view
# Подписываемся на события модели
self.model.add_observer(self)
# Подписываемся на события представления
self.view.set_controller(self)
def update(self, event, data):
# Обновления от модели
if event == 'server_added':
self.view.add_server_to_list(data)
elif event == 'server_removed':
self.view.remove_server_from_list(data)
elif event == 'ping_result':
self.view.show_ping_result(data)
def add_server(self, server_info):
# Валидация данных
if self.validate_server_info(server_info):
self.model.add_server(server_info)
return True
return False
def remove_server(self, index):
self.model.remove_server(index)
def ping_server(self, server_info):
self.model.ping_server(server_info)
def validate_server_info(self, server_info):
# Валидация данных сервера
if not server_info.get('name'):
self.view.show_error("Server name is required")
return False
if not server_info.get('ip'):
self.view.show_error("IP address is required")
return False
# Проверка формата IP
import ipaddress
try:
ipaddress.ip_address(server_info['ip'])
except ValueError:
self.view.show_error("Invalid IP address format")
return False
return True
# views.py
class ServerView:
def __init__(self, root):
self.root = root
self.controller = None
self.setup_ui()
def set_controller(self, controller):
self.controller = controller
def setup_ui(self):
# Интерфейс аналогичен предыдущим примерам
pass
def add_server_to_list(self, server_info):
self.server_listbox.insert(tk.END, f"{server_info['name']} ({server_info['ip']})")
def remove_server_from_list(self, server_info):
# Находим и удаляем элемент из списка
for i in range(self.server_listbox.size()):
if server_info['name'] in self.server_listbox.get(i):
self.server_listbox.delete(i)
break
def show_ping_result(self, result):
if result['success']:
messagebox.showinfo("Ping Result", f"Server {result['server']['name']} is online")
else:
error_msg = result.get('error', 'Ping failed')
messagebox.showerror("Ping Result", f"Server {result['server']['name']} is offline: {error_msg}")
def show_error(self, message):
messagebox.showerror("Error", message)
def on_add_server_clicked(self):
# Получаем данные из формы и передаём контроллеру
server_info = self.get_server_form_data()
self.controller.add_server(server_info)
def on_ping_server_clicked(self):
selected_index = self.server_listbox.curselection()
if selected_index:
servers = self.controller.model.get_servers()
server = servers[selected_index[0]]
self.controller.ping_server(server)
Интеграция с системными утилитами
Для админских задач часто нужно интегрировать GUI с системными командами:
class SystemCommandExecutor:
def __init__(self):
self.process_queue = []
self.observers = []
def add_observer(self, observer):
self.observers.append(observer)
def notify_observers(self, event, data):
for observer in self.observers:
observer.command_update(event, data)
def execute_command(self, command, server_info=None, timeout=30):
"""Выполнение команды локально или на удалённом сервере"""
if server_info:
# SSH команда
ssh_command = [
'ssh', '-o', 'ConnectTimeout=10',
'-o', 'StrictHostKeyChecking=no',
'-p', str(server_info.get('port', 22))
]
if server_info.get('key_path'):
ssh_command.extend(['-i', server_info['key_path']])
ssh_command.append(f"{server_info['username']}@{server_info['ip']}")
ssh_command.append(command)
final_command = ssh_command
else:
# Локальная команда
final_command = command.split() if isinstance(command, str) else command
# Асинхронное выполнение
threading.Thread(target=self._execute_worker,
args=(final_command, timeout), daemon=True).start()
def _execute_worker(self, command, timeout):
try:
self.notify_observers('command_started', {'command': command})
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
universal_newlines=True
)
self.process_queue.append(process)
# Читаем вывод построчно
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
self.notify_observers('command_output', {'output': output.strip()})
# Получаем оставшийся вывод
stdout, stderr = process.communicate(timeout=timeout)
if stdout:
self.notify_observers('command_output', {'output': stdout})
if stderr:
self.notify_observers('command_error', {'error': stderr})
self.notify_observers('command_finished', {
'returncode': process.returncode,
'success': process.returncode == 0
})
except subprocess.TimeoutExpired:
process.kill()
self.notify_observers('command_timeout', {'timeout': timeout})
except Exception as e:
self.notify_observers('command_exception', {'exception': str(e)})
finally:
if process in self.process_queue:
self.process_queue.remove(process)
def kill_all_processes(self):
"""Убиваем все активные процессы"""
for process in self.process_queue[:]:
try:
process.kill()
self.process_queue.remove(process)
except:
pass
class CommandExecutorDialog:
def __init__(self, parent, server_info=None):
self.server_info = server_info
self.executor = SystemCommandExecutor()
self.executor.add_observer(self)
self.dialog = tk.Toplevel(parent)
self.dialog.title(f"Command Executor - {server_info['name'] if server_info else 'Local'}")
self.dialog.geometry("800x600")
self.dialog.protocol("WM_DELETE_WINDOW", self.on_close)
self.setup_ui()
def setup_ui(self):
# Панель команд
command_frame = ttk.LabelFrame(self.dialog, text="Command")
command_frame.pack(fill=tk.X, padx=10, pady=5)
# Поле ввода команды
self.command_entry = ttk.Entry(command_frame, font=('Courier', 10))
self.command_entry.pack(fill=tk.X, padx=5, pady=5)
self.command_entry.bind('', self.execute_command)
# Кнопки управления
button_frame = ttk.Frame(command_frame)
button_frame.pack(fill=tk.X, padx=5, pady=5)
self.execute_button = ttk.Button(button_frame, text="Execute",
command=self.execute_command)
self.execute_button.pack(side=tk.LEFT, padx=(0, 5))
self.kill_button = ttk.Button(button_frame, text="Kill All",
command=self.kill_processes)
self.kill_button.pack(side=tk.LEFT, padx=(0, 5))
self.clear_button = ttk.Button(button_frame, text="Clear Output",
command=self.clear_output)
self.clear_button.pack(side=tk.LEFT, padx=(0, 5))
# Быстрые команды
quick_frame = ttk.LabelFrame(self.dialog, text="Quick Commands")
quick_frame.pack(fill=tk.X, padx=10, pady=5)
quick_commands = [
("System Info", "uname -a"),
("Disk Usage", "df -h"),
("Memory Usage", "free -h"),
("Running Processes", "ps aux"),
("Network Interfaces", "ip addr show"),
("Service Status", "systemctl status"),
("Docker Containers", "docker ps -a"),
("Nginx Status", "systemctl status nginx")
]
quick_button_frame = ttk.Frame(quick_frame)
quick_button_frame.pack(fill=tk.X, padx=5, pady=5)
for i, (name, command) in enumerate(quick_commands):
button = ttk.Button(quick_button_frame, text=name,
command=lambda cmd=command: self.set_command(cmd))
button.grid(row=i//4, column=i%4, sticky=tk.W+tk.E, padx=2, pady=2)
# Конфигурируем колонки
for i in range(4):
quick_button_frame.columnconfigure(i, weight=1)
# Панель вывода
output_frame = ttk.LabelFrame(self.dialog, text="Output")
output_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
# Текстовое поле с прокруткой
self.output_text = tk.Text(output_frame, font=('Courier', 9),
wrap=tk.WORD, bg='black', fg='lightgreen')
scrollbar = ttk.Scrollbar(output_frame, orient=tk.VERTICAL,
command=self.output_text.yview)
self.output_text.configure(yscrollcommand=scrollbar.set)
self.output_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# Статус бар
self.status_var = tk.StringVar(value="Ready")
self.status_bar = ttk.Label(self.dialog, textvariable=self.status_var,
relief=tk.SUNKEN, anchor=tk.W)
self.status_bar.pack(fill=tk.X, padx=10, pady=5)
# Фокус на поле ввода
self.command_entry.focus()
def set_command(self, command):
self.command_entry.delete(0, tk.END)
self.command_entry.insert(0, command)
def execute_command(self, event=None):
command = self.command_entry.get().strip()
if command:
self.executor.execute_command(command, self.server_info)
def kill_processes(self):
self.executor.kill_all_processes()
def clear_output(self):
self.output_text.delete(1.0, tk.END)
def command_update(self, event, data):
if event == 'command_started':
self.status_var.set("Executing...")
self.execute_button.config(state=tk.DISABLED)
elif event == 'command_output':
self.output_text.insert(tk.END, data['output'] + '\n')
self.output_text.see(tk.END)
elif event == 'command_error':
self.output_text.insert(tk.END, f"ERROR: {data['error']}\n")
self.output_text.see(tk.END)
elif event == 'command_finished':
status = "Success" if data['success'] else f"Failed (code: {data['returncode']})"
self.status_var.set(status)
self.execute_button.config(state=tk.NORMAL)
elif event == 'command_timeout':
self.status_var.set(f"Timeout ({data['timeout']}s)")
self.execute_button.config(state=tk.NORMAL)
elif event == 'command_exception':
self.status_var.set(f"Exception: {data['exception']}")
self.execute_button.config(state=tk.NORMAL)
def on_close(self):
self.executor.kill_all_processes()
self.dialog.destroy()
Сравнение с альтернативами
Критерий | Tkinter | PyQt5/PySide2 | Kivy | Dear PyGui |
---|---|---|---|---|
Встроенность | ✅ Входит в Python | ❌ Отдельная установка | ❌ Отдельная установка | ❌ Отдельная установка |
Производительность | ⚠️ Средняя | ✅ Высокая | ✅ Высокая | ✅ Очень высокая |
Сложность изучения | ✅ Простая | ❌ Сложная | ⚠️ Средняя | ✅ Простая |
Размер приложения | ✅ Минимальный | ❌ Большой | ❌ Большой | ⚠️ Средний |
Кроссплатформенность | ✅ Полная | ✅ Полная | ✅ Полная | ✅ Полная |
Нативный вид | ⚠️ Частично | ✅ Полный | ❌ Собственный стиль | ❌ Собственный стиль |
Для админских утилит Tkinter остаётся оптимальным выбором — не требует дополнительных зависимостей, быстро разрабатывается, легко интегрируется с системными командами.
Продвинутые техники
Кастомные виджеты
class ServerStatusWidget(ttk.Frame):
def __init__(self, parent, server_info, **kwargs):
super().__init__(parent, **kwargs)
self.server_info = server_info
self
В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.
Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.