- Home »

Обнаружение аномалий с Isolation Forest — учебник по машинному обучению
Отличный вопрос! Представь: у тебя есть сервер с важным приложением, оно работает, логи идут рекой, метрики мониторинга в норме. Всё круто! Но иногда происходят странные вещи — внезапные скачки CPU, аномальные обращения к базе данных, подозрительная активность в сетевом трафике. Isolation Forest — это ML-алгоритм, который может автоматически выявлять такие нестандартные события в твоих данных. Это как автомат для поиска иголки в стоге сена серверных логов. Метод работает по принципу “изолировать аномалии легче, чем нормальные данные”, что делает его быстрым и эффективным для мониторинга систем в реальном времени.
Зачем это нужно системному администратору? Простой пример: у тебя есть веб-сервер с обычным трафиком, но иногда появляются подозрительные запросы от ботов или DDoS-атаки. Isolation Forest поможет автоматически обнаружить эти аномалии без настройки сложных правил. Алгоритм учится на обычных данных и потом сам находит всё, что выбивается из паттерна.
Как это работает под капотом
Isolation Forest использует довольно хитрый подход. Вместо того чтобы описывать “нормальные” данные, он пытается изолировать аномалии. Принцип простой: аномальные точки данных стоят особняком, поэтому их легче отделить от основной массы.
Алгоритм строит случайные деревья (как в Random Forest, но с другой логикой):
- Случайно выбирается признак из dataset’а
- Случайно выбирается точка разделения между минимальным и максимальным значением этого признака
- Данные разделяются на две группы
- Процесс повторяется рекурсивно до тех пор, пока каждая точка не будет изолирована
Аномалии изолируются быстрее (меньше разбиений), чем нормальные данные. Это означает, что они будут располагаться ближе к корню дерева. Гениально просто!
Практическая настройка: делаем всё по шагам
Для начала нужно настроить рабочее окружение. Я рекомендую использовать виртуальный сервер — будет удобнее экспериментировать с разными конфигурациями. Можешь взять VPS или если данных много и нужна производительность — выделенный сервер.
Устанавливаем необходимые пакеты:
# Обновляем систему
sudo apt update && sudo apt upgrade -y
# Устанавливаем Python и pip
sudo apt install python3 python3-pip python3-venv -y
# Создаём виртуальное окружение
python3 -m venv isolation_forest_env
source isolation_forest_env/bin/activate
# Устанавливаем необходимые библиотеки
pip install scikit-learn pandas numpy matplotlib seaborn
pip install psutil # для системных метрик
pip install flask # если будем делать API
Теперь создаём базовый скрипт для обнаружения аномалий:
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import psutil
import time
import json
class ServerAnomalyDetector:
def __init__(self, contamination=0.1, random_state=42):
"""
contamination: ожидаемая доля аномалий в данных (0.1 = 10%)
random_state: для воспроизводимости результатов
"""
self.model = IsolationForest(
contamination=contamination,
random_state=random_state,
n_estimators=100
)
self.scaler = StandardScaler()
self.is_fitted = False
def collect_system_metrics(self):
"""Собираем системные метрики"""
return {
'cpu_percent': psutil.cpu_percent(),
'memory_percent': psutil.virtual_memory().percent,
'disk_io_read': psutil.disk_io_counters().read_bytes,
'disk_io_write': psutil.disk_io_counters().write_bytes,
'network_sent': psutil.net_io_counters().bytes_sent,
'network_recv': psutil.net_io_counters().bytes_recv,
'load_avg': psutil.getloadavg()[0],
'timestamp': time.time()
}
def prepare_data(self, data):
"""Подготавливаем данные для обучения"""
df = pd.DataFrame(data)
# Убираем timestamp для обучения
features = df.drop(['timestamp'], axis=1)
# Нормализуем данные
scaled_features = self.scaler.fit_transform(features)
return scaled_features, df
def train(self, training_data):
"""Обучаем модель на исторических данных"""
scaled_data, _ = self.prepare_data(training_data)
self.model.fit(scaled_data)
self.is_fitted = True
print(f"Модель обучена на {len(training_data)} образцах")
def predict(self, new_data):
"""Предсказываем аномалии"""
if not self.is_fitted:
raise Exception("Модель не обучена!")
if isinstance(new_data, dict):
new_data = [new_data]
scaled_data, df = self.prepare_data(new_data)
# Получаем предсказания (-1 = аномалия, 1 = норма)
predictions = self.model.predict(scaled_data)
# Получаем scores (чем меньше, тем более аномальная точка)
scores = self.model.decision_function(scaled_data)
results = []
for i, (pred, score) in enumerate(zip(predictions, scores)):
results.append({
'is_anomaly': pred == -1,
'anomaly_score': score,
'timestamp': df.iloc[i]['timestamp'],
'metrics': df.iloc[i].to_dict()
})
return results
# Пример использования
if __name__ == "__main__":
detector = ServerAnomalyDetector(contamination=0.05)
# Собираем данные для обучения (имитируем сбор за несколько дней)
print("Собираем данные для обучения...")
training_data = []
# В реальной ситуации здесь был бы код для загрузки исторических данных
# Для демонстрации генерируем нормальные данные
for _ in range(1000):
metrics = detector.collect_system_metrics()
training_data.append(metrics)
time.sleep(0.01) # небольшая задержка
# Обучаем модель
detector.train(training_data)
# Тестируем на новых данных
print("Мониторинг аномалий...")
while True:
current_metrics = detector.collect_system_metrics()
try:
results = detector.predict(current_metrics)
for result in results:
if result['is_anomaly']:
print(f"⚠️ АНОМАЛИЯ ОБНАРУЖЕНА!")
print(f" Время: {time.ctime(result['timestamp'])}")
print(f" Оценка: {result['anomaly_score']:.4f}")
print(f" CPU: {result['metrics']['cpu_percent']:.1f}%")
print(f" Memory: {result['metrics']['memory_percent']:.1f}%")
print(f" Load: {result['metrics']['load_avg']:.2f}")
print(" " + "-"*50)
else:
print(f"✅ Нормальная активность (score: {result['anomaly_score']:.4f})")
except Exception as e:
print(f"Ошибка: {e}")
time.sleep(5) # проверяем каждые 5 секунд
Настройка параметров: что важно знать
Isolation Forest имеет несколько ключевых параметров, которые сильно влияют на результат:
Параметр | Описание | Рекомендуемые значения | Влияние на результат |
---|---|---|---|
contamination |
Ожидаемая доля аномалий | 0.05-0.1 для серверов 0.01-0.05 для стабильных систем |
Чем больше, тем больше аномалий будет найдено |
n_estimators |
Количество деревьев | 100-200 для общего случая 50-100 для real-time |
Больше деревьев = стабильнее, но медленнее |
max_samples |
Размер выборки для обучения | “auto” или 256-512 | Меньше = быстрее, но может хуже работать |
max_features |
Количество признаков на дерево | 1.0 (все признаки) | Влияет на разнообразие деревьев |
Вот улучшенная версия с гибкими настройками:
class AdvancedServerAnomalyDetector:
def __init__(self, config=None):
if config is None:
config = {
'contamination': 0.05,
'n_estimators': 100,
'max_samples': 'auto',
'random_state': 42,
'sensitivity': 'medium' # low, medium, high
}
self.config = config
self.model = IsolationForest(
contamination=config['contamination'],
n_estimators=config['n_estimators'],
max_samples=config['max_samples'],
random_state=config.get('random_state', 42)
)
self.scaler = StandardScaler()
self.is_fitted = False
# Настройки чувствительности
sensitivity_thresholds = {
'low': -0.1,
'medium': -0.05,
'high': 0.0
}
self.threshold = sensitivity_thresholds.get(config['sensitivity'], -0.05)
def predict_with_threshold(self, new_data):
"""Предсказание с настраиваемым порогом"""
if not self.is_fitted:
raise Exception("Модель не обучена!")
if isinstance(new_data, dict):
new_data = [new_data]
scaled_data, df = self.prepare_data(new_data)
scores = self.model.decision_function(scaled_data)
results = []
for i, score in enumerate(scores):
is_anomaly = score < self.threshold
results.append({
'is_anomaly': is_anomaly,
'anomaly_score': score,
'confidence': abs(score - self.threshold),
'timestamp': df.iloc[i]['timestamp'],
'metrics': df.iloc[i].to_dict()
})
return results
def get_feature_importance(self, data_sample):
"""Анализ важности признаков для аномалии"""
if not self.is_fitted:
raise Exception("Модель не обучена!")
baseline_score = self.model.decision_function([data_sample])[0]
feature_importance = {}
original_data = data_sample.copy()
for i, feature in enumerate(['cpu_percent', 'memory_percent', 'disk_io_read',
'disk_io_write', 'network_sent', 'network_recv', 'load_avg']):
# Заменяем признак на медианное значение
modified_data = original_data.copy()
modified_data[i] = 0 # нормализованное значение
modified_score = self.model.decision_function([modified_data])[0]
importance = abs(baseline_score - modified_score)
feature_importance[feature] = importance
return sorted(feature_importance.items(), key=lambda x: x[1], reverse=True)
Реальные кейсы использования
Давайте разберём несколько практических сценариев, где Isolation Forest показывает себя с лучшей стороны:
Кейс 1: Обнаружение DDoS-атак
class DDoSDetector:
def __init__(self):
self.detector = IsolationForest(contamination=0.02, n_estimators=200)
self.scaler = StandardScaler()
def extract_network_features(self, log_line):
"""Извлекаем признаки из логов веб-сервера"""
# Парсим строку лога nginx/apache
parts = log_line.split(' ')
return {
'requests_per_second': self.count_requests_in_window(),
'unique_ips': self.count_unique_ips(),
'avg_response_time': self.calculate_avg_response_time(),
'error_rate': self.calculate_error_rate(),
'bandwidth_usage': self.calculate_bandwidth(),
'suspicious_user_agents': self.count_suspicious_agents()
}
def is_ddos_attack(self, current_metrics):
"""Определяем, является ли текущая активность DDoS-атакой"""
features = [list(current_metrics.values())]
scaled_features = self.scaler.transform(features)
score = self.detector.decision_function(scaled_features)[0]
prediction = self.detector.predict(scaled_features)[0]
if prediction == -1:
return {
'is_attack': True,
'severity': 'high' if score < -0.2 else 'medium',
'score': score,
'metrics': current_metrics
}
return {'is_attack': False, 'score': score}
Кейс 2: Мониторинг производительности базы данных
class DatabaseAnomalyDetector:
def __init__(self):
self.detector = IsolationForest(contamination=0.08, n_estimators=150)
def collect_db_metrics(self, connection):
"""Собираем метрики из PostgreSQL/MySQL"""
return {
'active_connections': self.get_active_connections(connection),
'query_duration_avg': self.get_avg_query_duration(connection),
'deadlocks_per_minute': self.get_deadlocks_count(connection),
'cache_hit_ratio': self.get_cache_hit_ratio(connection),
'index_usage': self.get_index_usage_stats(connection),
'table_scans': self.get_seq_scans_count(connection),
'locks_waiting': self.get_waiting_locks(connection)
}
def analyze_slow_queries(self, metrics):
"""Анализируем медленные запросы"""
if metrics['query_duration_avg'] > 1000: # больше 1 секунды
return {
'issue': 'slow_queries',
'recommendation': 'Проверьте план выполнения запросов',
'priority': 'high'
}
return None
Кейс 3: Детекция вторжений в файловую систему
import os
import hashlib
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class FileSystemAnomalyDetector(FileSystemEventHandler):
def __init__(self, watch_paths):
self.detector = IsolationForest(contamination=0.03)
self.watch_paths = watch_paths
self.file_stats = {}
def collect_file_metrics(self, filepath):
"""Собираем метрики файла"""
try:
stat = os.stat(filepath)
return {
'file_size': stat.st_size,
'access_time': stat.st_atime,
'modify_time': stat.st_mtime,
'permissions': stat.st_mode,
'owner_uid': stat.st_uid,
'file_type': self.get_file_type(filepath),
'is_executable': os.access(filepath, os.X_OK)
}
except OSError:
return None
def on_modified(self, event):
if not event.is_directory:
metrics = self.collect_file_metrics(event.src_path)
if metrics:
result = self.detector.predict([list(metrics.values())])
if result[0] == -1:
self.alert_suspicious_activity(event.src_path, metrics)
def alert_suspicious_activity(self, filepath, metrics):
"""Оповещение о подозрительной активности"""
print(f"🚨 Подозрительная активность: {filepath}")
print(f" Размер: {metrics['file_size']} байт")
print(f" Права: {oct(metrics['permissions'])}")
# Здесь можно добавить отправку в Slack, Telegram, etc.
Сравнение с другими решениями
Isolation Forest — не единственный инструмент для обнаружения аномалий. Давайте сравним его с альтернативами:
Метод | Скорость обучения | Скорость предсказания | Память | Точность | Лучше всего для |
---|---|---|---|---|---|
Isolation Forest | Быстрая | Очень быстрая | Низкая | Высокая | Многомерные данные, real-time |
One-Class SVM | Медленная | Средняя | Высокая | Высокая | Небольшие dataset’ы |
Local Outlier Factor | Быстрая | Медленная | Очень высокая | Очень высокая | Локальные аномалии |
Autoencoder | Очень медленная | Быстрая | Высокая | Средняя | Сложные паттерны |
Statistical methods | Очень быстрая | Очень быстрая | Очень низкая | Низкая | Одномерные данные |
Пример сравнения производительности:
import time
from sklearn.ensemble import IsolationForest
from sklearn.svm import OneClassSVM
from sklearn.neighbors import LocalOutlierFactor
import numpy as np
def benchmark_anomaly_detectors(data_size=10000):
"""Бенчмарк различных детекторов аномалий"""
# Генерируем тестовые данные
np.random.seed(42)
normal_data = np.random.normal(0, 1, (data_size, 5))
anomalous_data = np.random.normal(5, 1, (int(data_size * 0.05), 5))
X = np.vstack([normal_data, anomalous_data])
detectors = {
'Isolation Forest': IsolationForest(contamination=0.05, random_state=42),
'One-Class SVM': OneClassSVM(gamma='scale', nu=0.05),
'Local Outlier Factor': LocalOutlierFactor(contamination=0.05, novelty=True)
}
results = {}
for name, detector in detectors.items():
# Замеряем время обучения
start_time = time.time()
detector.fit(normal_data)
training_time = time.time() - start_time
# Замеряем время предсказания
start_time = time.time()
predictions = detector.predict(X)
prediction_time = time.time() - start_time
# Считаем точность (упрощённо)
true_anomalies = len(anomalous_data)
detected_anomalies = np.sum(predictions == -1)
results[name] = {
'training_time': training_time,
'prediction_time': prediction_time,
'detected_anomalies': detected_anomalies,
'true_anomalies': true_anomalies
}
return results
# Запускаем бенчмарк
benchmark_results = benchmark_anomaly_detectors()
for name, metrics in benchmark_results.items():
print(f"{name}:")
print(f" Время обучения: {metrics['training_time']:.4f}s")
print(f" Время предсказания: {metrics['prediction_time']:.4f}s")
print(f" Найдено аномалий: {metrics['detected_anomalies']}")
print()
Интеграция с системами мониторинга
Isolation Forest отлично интегрируется с популярными системами мониторинга. Вот несколько примеров:
Интеграция с Prometheus и Grafana
from prometheus_client import start_http_server, Gauge, Counter
import time
# Метрики для Prometheus
anomaly_score_gauge = Gauge('anomaly_score', 'Current anomaly score')
anomalies_total = Counter('anomalies_total', 'Total number of anomalies detected')
normal_events_total = Counter('normal_events_total', 'Total number of normal events')
class PrometheusAnomalyDetector:
def __init__(self, port=8000):
self.detector = IsolationForest(contamination=0.05)
self.port = port
start_http_server(port)
def update_metrics(self, is_anomaly, score):
"""Обновляем метрики для Prometheus"""
anomaly_score_gauge.set(score)
if is_anomaly:
anomalies_total.inc()
else:
normal_events_total.inc()
def run_monitoring(self):
"""Основной цикл мониторинга"""
while True:
metrics = self.collect_system_metrics()
results = self.predict(metrics)
for result in results:
self.update_metrics(
result['is_anomaly'],
result['anomaly_score']
)
time.sleep(10)
Интеграция с ELK Stack
import json
import logging
from elasticsearch import Elasticsearch
class ElasticsearchAnomalyDetector:
def __init__(self, es_host='localhost', es_port=9200):
self.es = Elasticsearch([{'host': es_host, 'port': es_port}])
self.detector = IsolationForest(contamination=0.05)
def index_anomaly(self, result):
"""Индексируем аномалию в Elasticsearch"""
doc = {
'timestamp': result['timestamp'],
'is_anomaly': result['is_anomaly'],
'anomaly_score': result['anomaly_score'],
'metrics': result['metrics'],
'severity': self.calculate_severity(result['anomaly_score'])
}
self.es.index(
index='server-anomalies',
doc_type='anomaly',
body=doc
)
def calculate_severity(self, score):
"""Рассчитываем серьёзность аномалии"""
if score < -0.3:
return 'critical'
elif score < -0.1:
return 'high'
elif score < 0:
return 'medium'
else:
return 'low'
def search_anomalies(self, time_range='1h'):
"""Поиск аномалий за определённый период"""
query = {
'query': {
'bool': {
'must': [
{'term': {'is_anomaly': True}},
{'range': {'timestamp': {'gte': f'now-{time_range}'}}}
]
}
},
'sort': [{'timestamp': {'order': 'desc'}}]
}
return self.es.search(index='server-anomalies', body=query)
Автоматизация и скрипты
Одно из главных преимуществ Isolation Forest — возможность легко автоматизировать процесс обнаружения аномалий. Вот несколько готовых скриптов для разных задач:
Скрипт для cron с уведомлениями
#!/usr/bin/env python3
"""
Скрипт для периодической проверки аномалий
Добавить в cron: */5 * * * * /path/to/anomaly_checker.py
"""
import sys
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests
class AnomalyNotifier:
def __init__(self, config_file='anomaly_config.json'):
with open(config_file, 'r') as f:
self.config = json.load(f)
self.detector = IsolationForest(
contamination=self.config['contamination'],
random_state=42
)
def send_email_alert(self, anomaly_details):
"""Отправка уведомления по email"""
msg = MIMEMultipart()
msg['From'] = self.config['email']['from']
msg['To'] = self.config['email']['to']
msg['Subject'] = f"🚨 Аномалия обнаружена на сервере {self.config['server_name']}"
body = f"""
Обнаружена аномалия на сервере {self.config['server_name']}
Время: {anomaly_details['timestamp']}
Оценка аномалии: {anomaly_details['anomaly_score']:.4f}
Метрики:
- CPU: {anomaly_details['metrics']['cpu_percent']:.1f}%
- Memory: {anomaly_details['metrics']['memory_percent']:.1f}%
- Load Average: {anomaly_details['metrics']['load_avg']:.2f}
- Network In: {anomaly_details['metrics']['network_recv']} bytes
- Network Out: {anomaly_details['metrics']['network_sent']} bytes
Проверьте состояние сервера.
"""
msg.attach(MIMEText(body, 'plain'))
try:
server = smtplib.SMTP(self.config['email']['smtp_server'], 587)
server.starttls()
server.login(self.config['email']['username'], self.config['email']['password'])
server.send_message(msg)
server.quit()
print("Email отправлен успешно")
except Exception as e:
print(f"Ошибка отправки email: {e}")
def send_slack_alert(self, anomaly_details):
"""Отправка уведомления в Slack"""
webhook_url = self.config['slack']['webhook_url']
message = {
"text": f"🚨 Аномалия на сервере {self.config['server_name']}",
"attachments": [
{
"color": "danger",
"fields": [
{"title": "Время", "value": anomaly_details['timestamp'], "short": True},
{"title": "Оценка", "value": f"{anomaly_details['anomaly_score']:.4f}", "short": True},
{"title": "CPU", "value": f"{anomaly_details['metrics']['cpu_percent']:.1f}%", "short": True},
{"title": "Memory", "value": f"{anomaly_details['metrics']['memory_percent']:.1f}%", "short": True}
]
}
]
}
try:
response = requests.post(webhook_url, json=message)
if response.status_code == 200:
print("Slack уведомление отправлено")
else:
print(f"Ошибка Slack: {response.status_code}")
except Exception as e:
print(f"Ошибка отправки в Slack: {e}")
def run_check(self):
"""Основная функция проверки"""
try:
# Загружаем обученную модель
detector = self.load_trained_model()
# Собираем текущие метрики
current_metrics = self.collect_system_metrics()
# Проверяем на аномалии
results = detector.predict(current_metrics)
for result in results:
if result['is_anomaly']:
print(f"Аномалия обнаружена: {result['anomaly_score']:.4f}")
# Отправляем уведомления
if self.config.get('email', {}).get('enabled', False):
self.send_email_alert(result)
if self.config.get('slack', {}).get('enabled', False):
self.send_slack_alert(result)
# Логируем в файл
self.log_anomaly(result)
sys.exit(1) # Сигнализируем об аномалии
print("Всё в порядке")
sys.exit(0)
except Exception as e:
print(f"Ошибка при проверке: {e}")
sys.exit(2)
def log_anomaly(self, anomaly_details):
"""Логирование аномалии в файл"""
log_file = self.config.get('log_file', '/var/log/anomaly_detector.log')
with open(log_file, 'a') as f:
f.write(f"{anomaly_details['timestamp']}: {json.dumps(anomaly_details)}\n")
if __name__ == "__main__":
notifier = AnomalyNotifier()
notifier.run_check()
Конфигурационный файл
{
"server_name": "web-server-01",
"contamination": 0.05,
"model_path": "/opt/anomaly_detector/models/server_model.pkl",
"log_file": "/var/log/anomaly_detector.log",
"email": {
"enabled": true,
"smtp_server": "smtp.gmail.com",
"from": "alerts@yourcompany.com",
"to": "admin@yourcompany.com",
"username": "alerts@yourcompany.com",
"password": "your_app_password"
},
"slack": {
"enabled": true,
"webhook_url": "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
}
}
Продвинутые техники и трюки
Несколько интересных способов улучшить работу Isolation Forest:
Ансамблевый подход
class EnsembleAnomalyDetector:
def __init__(self):
"""Создаём ансамбль из нескольких детекторов"""
self.detectors = {
'conservative': IsolationForest(contamination=0.02, n_estimators=200),
'moderate': IsolationForest(contamination=0.05, n_estimators=150),
'aggressive': IsolationForest(contamination=0.1, n_estimators=100)
}
self.weights = {'conservative': 0.5, 'moderate': 0.3, 'aggressive': 0.2}
def predict_ensemble(self, data):
"""Предсказание с учётом всех моделей"""
scores = {}
predictions = {}
for name, detector in self.detectors.items():
scores[name] = detector.decision_function(data)
predictions[name] = detector.predict(data)
# Взвешенная сумма оценок
final_score = sum(scores[name] * self.weights[name] for name in scores)
# Голосование по предсказаниям
votes = sum(predictions[name] for name in predictions)
final_prediction = 1 if votes > 0 else -1
return final_prediction, final_score
Адаптивное обучение
class AdaptiveAnomalyDetector:
def __init__(self, adaptation_window=1000):
self.detector = IsolationForest(contamination=0.05)
self.adaptation_window = adaptation_window
self.data_buffer = []
self.retrain_threshold = 0.8 # Переобучаемся при изменении distribution
def needs_retraining(self, new_data):
"""Определяем, нужно ли переобучение"""
if len(self.data_buffer) < self.adaptation_window: return False # Сравниваем distribution новых данных со старыми old_mean = np.mean(self.data_buffer[-self.adaptation_window//2:], axis=0) new_mean = np.mean(new_data, axis=0) # Если среднее сильно изменилось, нужно переобучение distance = np.linalg.norm(old_mean - new_mean) return distance > self.retrain_threshold
def adaptive_predict(self, new_data):
"""Предсказание с адаптивным переобучением"""
self.data_buffer.extend(new_data)
if self.needs_retraining(new_data):
print("Переобучение модели...")
recent_data = self.data_buffer[-self.adaptation_window:]
self.detector.fit(recent_data)
# Ограничиваем размер буфера
if len(self.data_buffer) > self.adaptation_window * 2:
self.data_buffer = self.data_buffer[-self.adaptation_window:]
return self.detector.predict(new_data)
Создание API для детекции аномалий
from flask import Flask, request, jsonify
import joblib
import numpy as np
app = Flask(__name__)
# Загружаем обученную модель при старте
detector = joblib.load('/opt/models/anomaly_detector.pkl')
@app.route('/detect', methods=['POST'])
def detect_anomaly():
"""API endpoint для детекции аномалий"""
try:
data = request.get_json()
# Проверяем формат данных
required_fields = ['cpu_percent', 'memory_percent', 'load_avg',
'network_sent', 'network_recv', 'disk_io_read', 'disk_io_write']
if not all(field in data for field in required_fields):
return jsonify({'error': 'Missing required fields'}), 400
# Подготавливаем данные для модели
features = np.array([[data[field] for field in required_fields]])
# Получаем предсказание
prediction = detector.predict(features)[0]
score = detector.decision_function(features)[0]
result = {
'is_anomaly': prediction == -1,
'anomaly_score': float(score),
'severity': 'high' if score < -0.2 else 'medium' if score < -0.1 else 'low',
'timestamp': data.get('timestamp', time.time())
}
return jsonify(result)
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/health', methods=['GET'])
def health_check():
"""Проверка здоровья сервиса"""
return jsonify({'status': 'healthy', 'model_loaded': detector is not None})
@app.route('/retrain', methods=['POST'])
def retrain_model():
"""Переобучение модели новыми данными"""
try:
data = request.get_json()
training_data = np.array(data['training_data'])
global detector
detector = IsolationForest(contamination=0.05, random_state=42)
detector.fit(training_data)
# Сохраняем обновлённую модель
joblib.dump(detector, '/opt/models/anomaly_detector.pkl')
return jsonify({'status': 'retrained', 'samples': len(training_data)})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Оптимизация производительности
Несколько советов для оптимизации Isolation Forest в production:
- Используй минимальное количество признаков — больше не всегда лучше
- Настрой max_samples — для больших dataset’ов используй значение 256-512
- Кэшируй модель — не переобучай без необходимости
- Используй batch prediction — обрабатывай данные пачками
- Мониторь drift — отслеживай изменения в данных
class OptimizedAnomalyDetector:
def __init__(self):
self.detector = IsolationForest(
contamination=0.05,
n_estimators=100,
max_samples=256, # Ограничиваем для скорости
n_jobs=-1, # Используем все CPU
random_state=42
)
self.model_cache = {}
self.last_training_time = 0
def batch_predict(self, data_batch, batch_size=1000):
"""Обработка данных батчами"""
results = []
for i in range(0, len(data_batch), batch_size):
batch = data_batch[i:i+batch_size]
predictions = self.detector.predict(batch)
scores = self.detector.decision_function(batch)
for pred, score in zip(predictions, scores):
results.append({
'is_anomaly': pred == -1,
'anomaly_score': score
})
return results
def should_retrain(self, current_time):
"""Проверяем, нужно ли переобучение"""
return current_time - self.last_training_time > 3600 # Раз в час
Интересные факты и нестандартные применения
Несколько интересных способов использования Isolation Forest, о которых мало кто знает:
1. Детекция мошенничества в логах
Isolation Forest отлично подходит для поиска подозрительных паттернов в логах авторизации:
class LoginAnomalyDetector:
def __init__(self):
self.detector = IsolationForest(contamination=0.02)
def extract_login_features(self, log_entry):
"""Извлекаем признаки из лога авторизации"""
return {
'hour_of_day': log_entry['timestamp'].hour,
'day_of_week': log_entry['timestamp'].weekday(),
'login_duration': log_entry['session_duration'],
'failed_attempts': log_entry['failed_attempts'],
'ip_reputation_score': self.get_ip_reputation(log_entry['ip']),
'geolocation_distance': self.calculate_geo_distance(log_entry['ip']),
'user_agent_similarity': self.calculate_ua_similarity(log_entry['user_agent'])
}
2. Мониторинг качества данных
Можно использовать для поиска “плохих” данных в ETL-процессах:
class DataQualityMonitor:
def __init__(self):
self.detector = IsolationForest(contamination=0.03)
def check_data_quality(self, dataframe):
"""Проверяем качество данных"""
quality_metrics = []
for _, row in dataframe.iterrows():
metrics = {
'null_count': row.isnull().sum(),
'string_length_avg': row.astype(str).str.len().mean(),
'numeric_range': row.select_dtypes(include=[np.number]).max() -
row.select_dtypes(include=[np.number]).min(),
'duplicate_values': row.duplicated().sum()
}
quality_metrics.append(list(metrics.values()))
predictions = self.detector.predict(quality_metrics)
return [i for i, pred in enumerate(predictions) if pred == -1]
3. Анализ производительности приложений
Поиск аномалий в метриках приложений:
class AppPerformanceDetector:
def __init__(self):
self.detector = IsolationForest(contamination=0.05)
def analyze_app_metrics(self, metrics):
“””Анализ метрик приложения”””
features = {
‘response_time_p95’: metrics[‘response_time_p95’],
‘error_rate’: metrics[‘error_rate’],
‘throughput’: metrics[‘requests_per_second’],
‘memory_usage’: metrics[‘memory_usage’],
‘gc_time’: metrics[‘gc_time’],
‘db_connection_pool’: metrics[‘db_connections’],
‘cache_hit_ratio’: metrics[‘cache_hit_ratio’]
}
prediction = self.detector.predict([list(features.values())])[0]
if prediction == -1:
return {
‘performance_issue
В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.
Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.