Home » Алгоритм ближайших соседей (KNN) на Python
Алгоритм ближайших соседей (KNN) на Python

Алгоритм ближайших соседей (KNN) на Python

Когда поднимаешь мониторинг серверов или разворачиваешь систему аналитики, часто возникает потребность в быстром и эффективном алгоритме классификации. Один из самых простых, но при этом мощных инструментов для этой задачи — алгоритм K-ближайших соседей (KNN). Он отлично подходит для определения аномалий в метриках, классификации логов, предсказания нагрузки на сервер и многих других задач администрирования.

В этой статье разберём, как быстро настроить KNN на Python, рассмотрим практические примеры для серверных задач, и покажем, как интегрировать это решение в ваши скрипты автоматизации. Поехали!

Как работает алгоритм KNN

KNN — это ленивый алгоритм машинного обучения, который не строит модель заранее, а просто запоминает все обучающие данные. Когда нужно классифицировать новый объект, он:

  • Находит k ближайших соседей в пространстве признаков
  • Для классификации — выбирает самый популярный класс среди соседей
  • Для регрессии — усредняет значения соседей

Простота алгоритма делает его идеальным для быстрых прототипов и задач, где нужно оперативно получить результат без долгого обучения модели.

Быстрая настройка KNN: пошаговое руководство

Для работы с KNN нам понадобится Python с библиотеками scikit-learn, numpy и pandas. Если разворачиваете на свежем VPS, начните с установки зависимостей:

# Обновляем систему
sudo apt update && sudo apt upgrade -y

# Устанавливаем Python и pip
sudo apt install python3 python3-pip python3-venv -y

# Создаём виртуальное окружение
python3 -m venv knn_env
source knn_env/bin/activate

# Устанавливаем необходимые пакеты
pip install scikit-learn numpy pandas matplotlib seaborn

Теперь создадим базовый скрипт для классификации:

#!/usr/bin/env python3
import numpy as np
import pandas as pd
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from sklearn.preprocessing import StandardScaler

# Пример данных: метрики сервера (CPU, RAM, Load Average)
# 0 - нормальное состояние, 1 - аномалия
data = {
    'cpu_usage': [20, 25, 30, 85, 90, 95, 22, 28, 88, 92],
    'ram_usage': [40, 45, 50, 80, 85, 90, 42, 48, 82, 88],
    'load_avg': [0.5, 0.6, 0.7, 2.5, 3.0, 3.5, 0.55, 0.65, 2.8, 3.2],
    'status': [0, 0, 0, 1, 1, 1, 0, 0, 1, 1]
}

df = pd.DataFrame(data)
X = df[['cpu_usage', 'ram_usage', 'load_avg']]
y = df['status']

# Разделяем данные на обучающую и тестовую выборки
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Нормализуем данные
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# Создаём и обучаем модель KNN
knn = KNeighborsClassifier(n_neighbors=3)
knn.fit(X_train_scaled, y_train)

# Делаем предсказания
predictions = knn.predict(X_test_scaled)

# Оценка качества
accuracy = accuracy_score(y_test, predictions)
print(f"Accuracy: {accuracy:.2f}")
print("\nClassification Report:")
print(classification_report(y_test, predictions))

Практические примеры и кейсы

Мониторинг аномалий сервера

Один из самых полезных сценариев — детекция аномалий в реальном времени. Вот скрипт, который мониторит систему и использует KNN для определения необычного поведения:

#!/usr/bin/env python3
import psutil
import time
import numpy as np
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
import pickle

class ServerMonitor:
    def __init__(self, model_path='server_knn_model.pkl'):
        self.model_path = model_path
        self.scaler = StandardScaler()
        self.knn = KNeighborsClassifier(n_neighbors=5)
        self.is_trained = False
        
    def get_metrics(self):
        """Получаем текущие метрики сервера"""
        cpu = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory().percent
        load_avg = psutil.getloadavg()[0] if hasattr(psutil, 'getloadavg') else 0
        disk_io = psutil.disk_io_counters()
        network_io = psutil.net_io_counters()
        
        return np.array([
            cpu,
            memory,
            load_avg,
            disk_io.read_bytes / 1024 / 1024,  # MB/s
            network_io.bytes_sent / 1024 / 1024  # MB/s
        ])
    
    def train_baseline(self, hours=1):
        """Обучаем модель на базовых метриках"""
        print("Collecting baseline metrics...")
        baseline_data = []
        
        for i in range(hours * 60):  # каждую минуту
            metrics = self.get_metrics()
            baseline_data.append(metrics)
            time.sleep(60)
            
        baseline_data = np.array(baseline_data)
        labels = np.zeros(len(baseline_data))  # все базовые метрики помечаем как нормальные
        
        # Обучаем модель
        self.scaler.fit(baseline_data)
        baseline_scaled = self.scaler.transform(baseline_data)
        self.knn.fit(baseline_scaled, labels)
        self.is_trained = True
        
        # Сохраняем модель
        with open(self.model_path, 'wb') as f:
            pickle.dump((self.knn, self.scaler), f)
            
    def load_model(self):
        """Загружаем предобученную модель"""
        try:
            with open(self.model_path, 'rb') as f:
                self.knn, self.scaler = pickle.load(f)
            self.is_trained = True
            return True
        except FileNotFoundError:
            return False
    
    def predict_anomaly(self, threshold=0.7):
        """Предсказываем аномалию"""
        if not self.is_trained:
            print("Model not trained!")
            return None
            
        current_metrics = self.get_metrics().reshape(1, -1)
        current_scaled = self.scaler.transform(current_metrics)
        
        # Получаем расстояния до ближайших соседей
        distances, indices = self.knn.kneighbors(current_scaled)
        avg_distance = np.mean(distances)
        
        # Если среднее расстояние больше порога - аномалия
        is_anomaly = avg_distance > threshold
        
        return {
            'is_anomaly': is_anomaly,
            'distance': avg_distance,
            'metrics': current_metrics[0],
            'timestamp': time.time()
        }

# Пример использования
if __name__ == "__main__":
    monitor = ServerMonitor()
    
    # Пытаемся загрузить существующую модель
    if not monitor.load_model():
        print("No existing model found. Training new model...")
        monitor.train_baseline(hours=1)  # обучаем на данных за час
    
    # Мониторим в реальном времени
    while True:
        result = monitor.predict_anomaly()
        if result and result['is_anomaly']:
            print(f"ANOMALY DETECTED! Distance: {result['distance']:.2f}")
            print(f"Metrics: CPU={result['metrics'][0]:.1f}%, RAM={result['metrics'][1]:.1f}%")
            # Здесь можно добавить отправку уведомлений
        
        time.sleep(30)  # проверяем каждые 30 секунд

Классификация логов

Ещё один полезный кейс — автоматическая классификация типов ошибок в логах:

#!/usr/bin/env python3
import re
from collections import Counter
from sklearn.neighbors import KNeighborsClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline

class LogClassifier:
    def __init__(self):
        self.pipeline = Pipeline([
            ('tfidf', TfidfVectorizer(max_features=1000, stop_words='english')),
            ('knn', KNeighborsClassifier(n_neighbors=5))
        ])
        
    def extract_features(self, log_line):
        """Извлекаем признаки из строки лога"""
        # Убираем timestamp и IP
        cleaned = re.sub(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', '', log_line)
        cleaned = re.sub(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', 'IP', cleaned)
        cleaned = re.sub(r'\d+', 'NUM', cleaned)
        
        return cleaned.lower().strip()
    
    def train(self, log_samples):
        """Обучаем классификатор на примерах логов"""
        X = [self.extract_features(log['message']) for log in log_samples]
        y = [log['category'] for log in log_samples]
        
        self.pipeline.fit(X, y)
    
    def predict(self, log_line):
        """Классифицируем новую строку лога"""
        features = self.extract_features(log_line)
        prediction = self.pipeline.predict([features])[0]
        probabilities = self.pipeline.predict_proba([features])[0]
        
        return {
            'category': prediction,
            'confidence': max(probabilities),
            'features': features
        }

# Пример обучающих данных
training_data = [
    {'message': '2024-01-01 10:00:00 ERROR: Database connection failed', 'category': 'database'},
    {'message': '2024-01-01 10:01:00 WARNING: High memory usage detected', 'category': 'performance'},
    {'message': '2024-01-01 10:02:00 INFO: User login successful', 'category': 'auth'},
    {'message': '2024-01-01 10:03:00 ERROR: Permission denied for user', 'category': 'auth'},
    {'message': '2024-01-01 10:04:00 ERROR: Disk space low', 'category': 'system'},
    # Добавьте больше примеров для лучшего качества
]

classifier = LogClassifier()
classifier.train(training_data)

# Тестируем
test_log = "2024-01-01 10:05:00 ERROR: Cannot connect to MySQL server"
result = classifier.predict(test_log)
print(f"Category: {result['category']}, Confidence: {result['confidence']:.2f}")

Сравнение с другими алгоритмами

Алгоритм Скорость обучения Скорость предсказания Точность Интерпретируемость Память
KNN Мгновенная Медленная Средняя-высокая Высокая Высокая
SVM Медленная Быстрая Высокая Средняя Низкая
Random Forest Средняя Быстрая Высокая Средняя Средняя
Naive Bayes Быстрая Быстрая Средняя Высокая Низкая

Оптимизация производительности

Для серверных задач критична скорость работы. Вот несколько способов ускорить KNN:

1. Использование KD-Tree и Ball Tree

# Для низкоразмерных данных (< 20 признаков)
knn_kd = KNeighborsClassifier(n_neighbors=5, algorithm='kd_tree')

# Для высокоразмерных данных
knn_ball = KNeighborsClassifier(n_neighbors=5, algorithm='ball_tree')

# Автоматический выбор оптимального алгоритма
knn_auto = KNeighborsClassifier(n_neighbors=5, algorithm='auto')

2. Уменьшение размерности с помощью PCA

from sklearn.decomposition import PCA
from sklearn.pipeline import Pipeline

# Создаём pipeline с PCA
pipeline = Pipeline([
    ('pca', PCA(n_components=10)),
    ('knn', KNeighborsClassifier(n_neighbors=5))
])

pipeline.fit(X_train, y_train)

3. Использование Approximate Nearest Neighbors

Для больших датасетов можно использовать библиотеку Annoy или FAISS:

# Установка: pip install annoy
from annoy import AnnoyIndex

# Создаём индекс для быстрого поиска
f = 3  # размерность векторов
t = AnnoyIndex(f, 'angular')

# Добавляем векторы
for i, vector in enumerate(training_vectors):
    t.add_item(i, vector)

t.build(10)  # 10 деревьев

# Быстрый поиск ближайших соседей
similar_items = t.get_nns_by_vector(query_vector, 5)

Интеграция с системами мониторинга

KNN отлично интегрируется с популярными системами мониторинга. Вот пример интеграции с Prometheus:

#!/usr/bin/env python3
import requests
import numpy as np
from prometheus_client import start_http_server, Gauge
from sklearn.neighbors import KNeighborsClassifier

# Метрики для Prometheus
anomaly_gauge = Gauge('server_anomaly_score', 'Anomaly score from KNN')
prediction_gauge = Gauge('server_anomaly_prediction', 'Anomaly prediction (0=normal, 1=anomaly)')

class PrometheusKNN:
    def __init__(self, prometheus_url='http://localhost:9090'):
        self.prometheus_url = prometheus_url
        self.knn = KNeighborsClassifier(n_neighbors=5)
        
    def query_prometheus(self, query):
        """Запрашиваем метрики из Prometheus"""
        url = f"{self.prometheus_url}/api/v1/query"
        response = requests.get(url, params={'query': query})
        return response.json()
    
    def get_current_metrics(self):
        """Получаем текущие метрики сервера"""
        queries = {
            'cpu': 'cpu_usage_percent',
            'memory': 'memory_usage_percent',
            'disk': 'disk_usage_percent',
            'network': 'network_bytes_total'
        }
        
        metrics = []
        for name, query in queries.items():
            result = self.query_prometheus(query)
            if result['data']['result']:
                value = float(result['data']['result'][0]['value'][1])
                metrics.append(value)
        
        return np.array(metrics)
    
    def run_monitoring(self):
        """Запускаем мониторинг с экспортом в Prometheus"""
        start_http_server(8000)  # Prometheus endpoint
        
        while True:
            try:
                current_metrics = self.get_current_metrics()
                if len(current_metrics) > 0:
                    # Здесь должна быть логика предсказания
                    anomaly_score = self.predict_anomaly(current_metrics)
                    
                    # Обновляем метрики
                    anomaly_gauge.set(anomaly_score)
                    prediction_gauge.set(1 if anomaly_score > 0.7 else 0)
                    
                time.sleep(30)
            except Exception as e:
                print(f"Error: {e}")
                time.sleep(30)

# Запуск мониторинга
monitor = PrometheusKNN()
monitor.run_monitoring()

Интересные факты и нестандартные применения

Несколько неочевидных способов использования KNN в серверном администрировании:

  • Предсказание времени деплоя: На основе размера коммита, количества изменённых файлов и других метрик можно предсказывать время развёртывания
  • Автоматическая балансировка нагрузки: KNN может помочь в выборе оптимального сервера для обработки запроса на основе текущих метрик
  • Детекция DDoS: Анализ паттернов трафика для выявления аномальной активности
  • Оптимизация кэширования: Предсказание того, какие данные стоит кэшировать на основе паттернов доступа

Автоматизация и скрипты

Для автоматизации задач с KNN создайте универсальную CLI-утилиту:

#!/usr/bin/env python3
import argparse
import json
import pandas as pd
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
import joblib

class KNNUtility:
    def __init__(self):
        self.model = None
        self.scaler = None
        
    def train(self, data_file, target_column, k=5):
        """Обучение модели"""
        df = pd.read_csv(data_file)
        X = df.drop(columns=[target_column])
        y = df[target_column]
        
        self.scaler = StandardScaler()
        X_scaled = self.scaler.fit_transform(X)
        
        self.model = KNeighborsClassifier(n_neighbors=k)
        self.model.fit(X_scaled, y)
        
        print(f"Model trained with {len(X)} samples")
        
    def predict(self, input_data):
        """Предсказание"""
        if isinstance(input_data, str):
            with open(input_data, 'r') as f:
                data = json.load(f)
        else:
            data = input_data
            
        X = pd.DataFrame([data])
        X_scaled = self.scaler.transform(X)
        
        prediction = self.model.predict(X_scaled)[0]
        probabilities = self.model.predict_proba(X_scaled)[0]
        
        return {
            'prediction': prediction,
            'confidence': max(probabilities),
            'probabilities': probabilities.tolist()
        }
    
    def save_model(self, filename):
        """Сохранение модели"""
        joblib.dump((self.model, self.scaler), filename)
        print(f"Model saved to {filename}")
        
    def load_model(self, filename):
        """Загрузка модели"""
        self.model, self.scaler = joblib.load(filename)
        print(f"Model loaded from {filename}")

def main():
    parser = argparse.ArgumentParser(description='KNN Utility for Server Administration')
    parser.add_argument('action', choices=['train', 'predict', 'save', 'load'])
    parser.add_argument('--data', help='Data file path')
    parser.add_argument('--target', help='Target column name')
    parser.add_argument('--model', help='Model file path')
    parser.add_argument('--k', type=int, default=5, help='Number of neighbors')
    parser.add_argument('--input', help='Input data for prediction')
    
    args = parser.parse_args()
    
    knn_util = KNNUtility()
    
    if args.action == 'train':
        knn_util.train(args.data, args.target, args.k)
    elif args.action == 'predict':
        knn_util.load_model(args.model)
        result = knn_util.predict(args.input)
        print(json.dumps(result, indent=2))
    elif args.action == 'save':
        knn_util.save_model(args.model)
    elif args.action == 'load':
        knn_util.load_model(args.model)

if __name__ == "__main__":
    main()

Использование:

# Обучение модели
python knn_utility.py train --data server_metrics.csv --target anomaly --k 5

# Сохранение модели
python knn_utility.py save --model server_knn.pkl

# Предсказание
echo '{"cpu": 85, "memory": 90, "disk": 75}' | python knn_utility.py predict --model server_knn.pkl --input /dev/stdin

Масштабирование и развёртывание

Для продакшена на выделенном сервере рекомендуется использовать контейнеризацию:

# Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 8000

CMD ["python", "knn_service.py"]
# docker-compose.yml
version: '3.8'

services:
  knn-service:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./models:/app/models
      - ./logs:/app/logs
    environment:
      - MODEL_PATH=/app/models/server_knn.pkl
      - LOG_LEVEL=INFO
    restart: unless-stopped
    
  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      
  grafana:
    image: grafana/grafana
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin

Заключение и рекомендации

KNN — это мощный инструмент для серверного администрирования, который особенно эффективен в следующих случаях:

  • Быстрое прототипирование: Когда нужно быстро создать систему мониторинга или детекции аномалий
  • Небольшие датасеты: До 10-50 тысяч записей KNN работает отлично
  • Интерпретируемость: Легко понять, почему алгоритм принял определённое решение
  • Динамические данные: Можно легко добавлять новые примеры без переобучения

Не рекомендуется использовать KNN для:

  • Больших датасетов (> 100k записей) без оптимизации
  • Высокоразмерных данных (> 50 признаков) без предварительной обработки
  • Задач реального времени с жёсткими требованиями к латентности

Для максимальной эффективности комбинируйте KNN с другими инструментами: используйте PCA для уменьшения размерности, Annoy для ускорения поиска, и интегрируйте с системами мониторинга типа Prometheus и Grafana. Это даст вам мощную и гибкую систему для решения широкого спектра задач администрирования серверов.


В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.

Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.

Leave a reply

Your email address will not be published. Required fields are marked