- Home »

Алгоритм ближайших соседей (KNN) на Python
Когда поднимаешь мониторинг серверов или разворачиваешь систему аналитики, часто возникает потребность в быстром и эффективном алгоритме классификации. Один из самых простых, но при этом мощных инструментов для этой задачи — алгоритм K-ближайших соседей (KNN). Он отлично подходит для определения аномалий в метриках, классификации логов, предсказания нагрузки на сервер и многих других задач администрирования.
В этой статье разберём, как быстро настроить KNN на Python, рассмотрим практические примеры для серверных задач, и покажем, как интегрировать это решение в ваши скрипты автоматизации. Поехали!
Как работает алгоритм KNN
KNN — это ленивый алгоритм машинного обучения, который не строит модель заранее, а просто запоминает все обучающие данные. Когда нужно классифицировать новый объект, он:
- Находит k ближайших соседей в пространстве признаков
- Для классификации — выбирает самый популярный класс среди соседей
- Для регрессии — усредняет значения соседей
Простота алгоритма делает его идеальным для быстрых прототипов и задач, где нужно оперативно получить результат без долгого обучения модели.
Быстрая настройка KNN: пошаговое руководство
Для работы с KNN нам понадобится Python с библиотеками scikit-learn, numpy и pandas. Если разворачиваете на свежем VPS, начните с установки зависимостей:
# Обновляем систему
sudo apt update && sudo apt upgrade -y
# Устанавливаем Python и pip
sudo apt install python3 python3-pip python3-venv -y
# Создаём виртуальное окружение
python3 -m venv knn_env
source knn_env/bin/activate
# Устанавливаем необходимые пакеты
pip install scikit-learn numpy pandas matplotlib seaborn
Теперь создадим базовый скрипт для классификации:
#!/usr/bin/env python3
import numpy as np
import pandas as pd
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from sklearn.preprocessing import StandardScaler
# Пример данных: метрики сервера (CPU, RAM, Load Average)
# 0 - нормальное состояние, 1 - аномалия
data = {
'cpu_usage': [20, 25, 30, 85, 90, 95, 22, 28, 88, 92],
'ram_usage': [40, 45, 50, 80, 85, 90, 42, 48, 82, 88],
'load_avg': [0.5, 0.6, 0.7, 2.5, 3.0, 3.5, 0.55, 0.65, 2.8, 3.2],
'status': [0, 0, 0, 1, 1, 1, 0, 0, 1, 1]
}
df = pd.DataFrame(data)
X = df[['cpu_usage', 'ram_usage', 'load_avg']]
y = df['status']
# Разделяем данные на обучающую и тестовую выборки
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# Нормализуем данные
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Создаём и обучаем модель KNN
knn = KNeighborsClassifier(n_neighbors=3)
knn.fit(X_train_scaled, y_train)
# Делаем предсказания
predictions = knn.predict(X_test_scaled)
# Оценка качества
accuracy = accuracy_score(y_test, predictions)
print(f"Accuracy: {accuracy:.2f}")
print("\nClassification Report:")
print(classification_report(y_test, predictions))
Практические примеры и кейсы
Мониторинг аномалий сервера
Один из самых полезных сценариев — детекция аномалий в реальном времени. Вот скрипт, который мониторит систему и использует KNN для определения необычного поведения:
#!/usr/bin/env python3
import psutil
import time
import numpy as np
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
import pickle
class ServerMonitor:
def __init__(self, model_path='server_knn_model.pkl'):
self.model_path = model_path
self.scaler = StandardScaler()
self.knn = KNeighborsClassifier(n_neighbors=5)
self.is_trained = False
def get_metrics(self):
"""Получаем текущие метрики сервера"""
cpu = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory().percent
load_avg = psutil.getloadavg()[0] if hasattr(psutil, 'getloadavg') else 0
disk_io = psutil.disk_io_counters()
network_io = psutil.net_io_counters()
return np.array([
cpu,
memory,
load_avg,
disk_io.read_bytes / 1024 / 1024, # MB/s
network_io.bytes_sent / 1024 / 1024 # MB/s
])
def train_baseline(self, hours=1):
"""Обучаем модель на базовых метриках"""
print("Collecting baseline metrics...")
baseline_data = []
for i in range(hours * 60): # каждую минуту
metrics = self.get_metrics()
baseline_data.append(metrics)
time.sleep(60)
baseline_data = np.array(baseline_data)
labels = np.zeros(len(baseline_data)) # все базовые метрики помечаем как нормальные
# Обучаем модель
self.scaler.fit(baseline_data)
baseline_scaled = self.scaler.transform(baseline_data)
self.knn.fit(baseline_scaled, labels)
self.is_trained = True
# Сохраняем модель
with open(self.model_path, 'wb') as f:
pickle.dump((self.knn, self.scaler), f)
def load_model(self):
"""Загружаем предобученную модель"""
try:
with open(self.model_path, 'rb') as f:
self.knn, self.scaler = pickle.load(f)
self.is_trained = True
return True
except FileNotFoundError:
return False
def predict_anomaly(self, threshold=0.7):
"""Предсказываем аномалию"""
if not self.is_trained:
print("Model not trained!")
return None
current_metrics = self.get_metrics().reshape(1, -1)
current_scaled = self.scaler.transform(current_metrics)
# Получаем расстояния до ближайших соседей
distances, indices = self.knn.kneighbors(current_scaled)
avg_distance = np.mean(distances)
# Если среднее расстояние больше порога - аномалия
is_anomaly = avg_distance > threshold
return {
'is_anomaly': is_anomaly,
'distance': avg_distance,
'metrics': current_metrics[0],
'timestamp': time.time()
}
# Пример использования
if __name__ == "__main__":
monitor = ServerMonitor()
# Пытаемся загрузить существующую модель
if not monitor.load_model():
print("No existing model found. Training new model...")
monitor.train_baseline(hours=1) # обучаем на данных за час
# Мониторим в реальном времени
while True:
result = monitor.predict_anomaly()
if result and result['is_anomaly']:
print(f"ANOMALY DETECTED! Distance: {result['distance']:.2f}")
print(f"Metrics: CPU={result['metrics'][0]:.1f}%, RAM={result['metrics'][1]:.1f}%")
# Здесь можно добавить отправку уведомлений
time.sleep(30) # проверяем каждые 30 секунд
Классификация логов
Ещё один полезный кейс — автоматическая классификация типов ошибок в логах:
#!/usr/bin/env python3
import re
from collections import Counter
from sklearn.neighbors import KNeighborsClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
class LogClassifier:
def __init__(self):
self.pipeline = Pipeline([
('tfidf', TfidfVectorizer(max_features=1000, stop_words='english')),
('knn', KNeighborsClassifier(n_neighbors=5))
])
def extract_features(self, log_line):
"""Извлекаем признаки из строки лога"""
# Убираем timestamp и IP
cleaned = re.sub(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', '', log_line)
cleaned = re.sub(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', 'IP', cleaned)
cleaned = re.sub(r'\d+', 'NUM', cleaned)
return cleaned.lower().strip()
def train(self, log_samples):
"""Обучаем классификатор на примерах логов"""
X = [self.extract_features(log['message']) for log in log_samples]
y = [log['category'] for log in log_samples]
self.pipeline.fit(X, y)
def predict(self, log_line):
"""Классифицируем новую строку лога"""
features = self.extract_features(log_line)
prediction = self.pipeline.predict([features])[0]
probabilities = self.pipeline.predict_proba([features])[0]
return {
'category': prediction,
'confidence': max(probabilities),
'features': features
}
# Пример обучающих данных
training_data = [
{'message': '2024-01-01 10:00:00 ERROR: Database connection failed', 'category': 'database'},
{'message': '2024-01-01 10:01:00 WARNING: High memory usage detected', 'category': 'performance'},
{'message': '2024-01-01 10:02:00 INFO: User login successful', 'category': 'auth'},
{'message': '2024-01-01 10:03:00 ERROR: Permission denied for user', 'category': 'auth'},
{'message': '2024-01-01 10:04:00 ERROR: Disk space low', 'category': 'system'},
# Добавьте больше примеров для лучшего качества
]
classifier = LogClassifier()
classifier.train(training_data)
# Тестируем
test_log = "2024-01-01 10:05:00 ERROR: Cannot connect to MySQL server"
result = classifier.predict(test_log)
print(f"Category: {result['category']}, Confidence: {result['confidence']:.2f}")
Сравнение с другими алгоритмами
Алгоритм | Скорость обучения | Скорость предсказания | Точность | Интерпретируемость | Память |
---|---|---|---|---|---|
KNN | Мгновенная | Медленная | Средняя-высокая | Высокая | Высокая |
SVM | Медленная | Быстрая | Высокая | Средняя | Низкая |
Random Forest | Средняя | Быстрая | Высокая | Средняя | Средняя |
Naive Bayes | Быстрая | Быстрая | Средняя | Высокая | Низкая |
Оптимизация производительности
Для серверных задач критична скорость работы. Вот несколько способов ускорить KNN:
1. Использование KD-Tree и Ball Tree
# Для низкоразмерных данных (< 20 признаков)
knn_kd = KNeighborsClassifier(n_neighbors=5, algorithm='kd_tree')
# Для высокоразмерных данных
knn_ball = KNeighborsClassifier(n_neighbors=5, algorithm='ball_tree')
# Автоматический выбор оптимального алгоритма
knn_auto = KNeighborsClassifier(n_neighbors=5, algorithm='auto')
2. Уменьшение размерности с помощью PCA
from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline
# Создаём pipeline с PCA
pipeline = Pipeline([
('pca', PCA(n_components=10)),
('knn', KNeighborsClassifier(n_neighbors=5))
])
pipeline.fit(X_train, y_train)
3. Использование Approximate Nearest Neighbors
Для больших датасетов можно использовать библиотеку Annoy или FAISS:
# Установка: pip install annoy
from annoy import AnnoyIndex
# Создаём индекс для быстрого поиска
f = 3 # размерность векторов
t = AnnoyIndex(f, 'angular')
# Добавляем векторы
for i, vector in enumerate(training_vectors):
t.add_item(i, vector)
t.build(10) # 10 деревьев
# Быстрый поиск ближайших соседей
similar_items = t.get_nns_by_vector(query_vector, 5)
Интеграция с системами мониторинга
KNN отлично интегрируется с популярными системами мониторинга. Вот пример интеграции с Prometheus:
#!/usr/bin/env python3
import requests
import numpy as np
from prometheus_client import start_http_server, Gauge
from sklearn.neighbors import KNeighborsClassifier
# Метрики для Prometheus
anomaly_gauge = Gauge('server_anomaly_score', 'Anomaly score from KNN')
prediction_gauge = Gauge('server_anomaly_prediction', 'Anomaly prediction (0=normal, 1=anomaly)')
class PrometheusKNN:
def __init__(self, prometheus_url='http://localhost:9090'):
self.prometheus_url = prometheus_url
self.knn = KNeighborsClassifier(n_neighbors=5)
def query_prometheus(self, query):
"""Запрашиваем метрики из Prometheus"""
url = f"{self.prometheus_url}/api/v1/query"
response = requests.get(url, params={'query': query})
return response.json()
def get_current_metrics(self):
"""Получаем текущие метрики сервера"""
queries = {
'cpu': 'cpu_usage_percent',
'memory': 'memory_usage_percent',
'disk': 'disk_usage_percent',
'network': 'network_bytes_total'
}
metrics = []
for name, query in queries.items():
result = self.query_prometheus(query)
if result['data']['result']:
value = float(result['data']['result'][0]['value'][1])
metrics.append(value)
return np.array(metrics)
def run_monitoring(self):
"""Запускаем мониторинг с экспортом в Prometheus"""
start_http_server(8000) # Prometheus endpoint
while True:
try:
current_metrics = self.get_current_metrics()
if len(current_metrics) > 0:
# Здесь должна быть логика предсказания
anomaly_score = self.predict_anomaly(current_metrics)
# Обновляем метрики
anomaly_gauge.set(anomaly_score)
prediction_gauge.set(1 if anomaly_score > 0.7 else 0)
time.sleep(30)
except Exception as e:
print(f"Error: {e}")
time.sleep(30)
# Запуск мониторинга
monitor = PrometheusKNN()
monitor.run_monitoring()
Интересные факты и нестандартные применения
Несколько неочевидных способов использования KNN в серверном администрировании:
- Предсказание времени деплоя: На основе размера коммита, количества изменённых файлов и других метрик можно предсказывать время развёртывания
- Автоматическая балансировка нагрузки: KNN может помочь в выборе оптимального сервера для обработки запроса на основе текущих метрик
- Детекция DDoS: Анализ паттернов трафика для выявления аномальной активности
- Оптимизация кэширования: Предсказание того, какие данные стоит кэшировать на основе паттернов доступа
Автоматизация и скрипты
Для автоматизации задач с KNN создайте универсальную CLI-утилиту:
#!/usr/bin/env python3
import argparse
import json
import pandas as pd
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
import joblib
class KNNUtility:
def __init__(self):
self.model = None
self.scaler = None
def train(self, data_file, target_column, k=5):
"""Обучение модели"""
df = pd.read_csv(data_file)
X = df.drop(columns=[target_column])
y = df[target_column]
self.scaler = StandardScaler()
X_scaled = self.scaler.fit_transform(X)
self.model = KNeighborsClassifier(n_neighbors=k)
self.model.fit(X_scaled, y)
print(f"Model trained with {len(X)} samples")
def predict(self, input_data):
"""Предсказание"""
if isinstance(input_data, str):
with open(input_data, 'r') as f:
data = json.load(f)
else:
data = input_data
X = pd.DataFrame([data])
X_scaled = self.scaler.transform(X)
prediction = self.model.predict(X_scaled)[0]
probabilities = self.model.predict_proba(X_scaled)[0]
return {
'prediction': prediction,
'confidence': max(probabilities),
'probabilities': probabilities.tolist()
}
def save_model(self, filename):
"""Сохранение модели"""
joblib.dump((self.model, self.scaler), filename)
print(f"Model saved to {filename}")
def load_model(self, filename):
"""Загрузка модели"""
self.model, self.scaler = joblib.load(filename)
print(f"Model loaded from {filename}")
def main():
parser = argparse.ArgumentParser(description='KNN Utility for Server Administration')
parser.add_argument('action', choices=['train', 'predict', 'save', 'load'])
parser.add_argument('--data', help='Data file path')
parser.add_argument('--target', help='Target column name')
parser.add_argument('--model', help='Model file path')
parser.add_argument('--k', type=int, default=5, help='Number of neighbors')
parser.add_argument('--input', help='Input data for prediction')
args = parser.parse_args()
knn_util = KNNUtility()
if args.action == 'train':
knn_util.train(args.data, args.target, args.k)
elif args.action == 'predict':
knn_util.load_model(args.model)
result = knn_util.predict(args.input)
print(json.dumps(result, indent=2))
elif args.action == 'save':
knn_util.save_model(args.model)
elif args.action == 'load':
knn_util.load_model(args.model)
if __name__ == "__main__":
main()
Использование:
# Обучение модели
python knn_utility.py train --data server_metrics.csv --target anomaly --k 5
# Сохранение модели
python knn_utility.py save --model server_knn.pkl
# Предсказание
echo '{"cpu": 85, "memory": 90, "disk": 75}' | python knn_utility.py predict --model server_knn.pkl --input /dev/stdin
Масштабирование и развёртывание
Для продакшена на выделенном сервере рекомендуется использовать контейнеризацию:
# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["python", "knn_service.py"]
# docker-compose.yml
version: '3.8'
services:
knn-service:
build: .
ports:
- "8000:8000"
volumes:
- ./models:/app/models
- ./logs:/app/logs
environment:
- MODEL_PATH=/app/models/server_knn.pkl
- LOG_LEVEL=INFO
restart: unless-stopped
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
Заключение и рекомендации
KNN — это мощный инструмент для серверного администрирования, который особенно эффективен в следующих случаях:
- Быстрое прототипирование: Когда нужно быстро создать систему мониторинга или детекции аномалий
- Небольшие датасеты: До 10-50 тысяч записей KNN работает отлично
- Интерпретируемость: Легко понять, почему алгоритм принял определённое решение
- Динамические данные: Можно легко добавлять новые примеры без переобучения
Не рекомендуется использовать KNN для:
- Больших датасетов (> 100k записей) без оптимизации
- Высокоразмерных данных (> 50 признаков) без предварительной обработки
- Задач реального времени с жёсткими требованиями к латентности
Для максимальной эффективности комбинируйте KNN с другими инструментами: используйте PCA для уменьшения размерности, Annoy для ускорения поиска, и интегрируйте с системами мониторинга типа Prometheus и Grafana. Это даст вам мощную и гибкую систему для решения широкого спектра задач администрирования серверов.
В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.
Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.