- Home »

Как тестировать данные с помощью Great Expectations
Привет! Сегодня разберём одну из самых недооценённых тем в DevOps — тестирование данных. Если вы когда-нибудь сталкивались с ситуацией, когда ваш ML-пайплайн развалился из-за того, что данные пришли в неожиданном формате, или когда ETL-процесс начал жрать терабайты места из-за дублей, то вы знаете, о чём я говорю. Great Expectations — это фреймворк для валидации данных, который поможет вам избежать головной боли и создать надёжные data pipelines.
Эта статья покажет вам, как настроить и использовать Great Expectations для тестирования данных в продакшене. Мы пройдём от установки до создания сложных тестовых сценариев, рассмотрим интеграцию с популярными базами данных и облачными сервисами, а также разберём реальные кейсы использования.
Что такое Great Expectations и зачем оно нужно
Great Expectations — это open-source фреймворк для валидации, документирования и профилирования данных. Если простыми словами, это юнит-тесты для ваших данных. Вместо того чтобы проверять, что функция возвращает правильное значение, вы проверяете, что ваши данные соответствуют ожидаемым критериям.
Основные возможности:
- Валидация данных с помощью декларативных тестов
- Автоматическое профилирование данных
- Генерация документации
- Интеграция с популярными инструментами (Airflow, Spark, Pandas)
- Красивые HTML-отчёты
Установка и первоначальная настройка
Для начала работы понадобится сервер с Python 3.7+. Если у вас ещё нет подходящего сервера, можете арендовать VPS или выделенный сервер.
Устанавливаем Great Expectations:
pip install great-expectations
# Для работы с SQL-базами
pip install great-expectations[sqlalchemy]
# Для работы с Spark
pip install great-expectations[spark]
# Для работы с S3
pip install great-expectations[s3]
Инициализируем новый проект:
mkdir my_data_validation
cd my_data_validation
great_expectations init
Эта команда создаст структуру проекта:
great_expectations/
├── checkpoints/
├── expectations/
├── plugins/
├── profilers/
├── uncommitted/
├── .gitignore
└── great_expectations.yml
Создание первого Expectation Suite
Expectation Suite — это набор правил (expectations), которые должны выполняться для ваших данных. Создадим простой пример:
great_expectations datasource new
Выберите тип источника данных. Для начала рассмотрим файловый источник:
# Создаём тестовый CSV-файл
echo "user_id,age,email
1,25,user1@example.com
2,30,user2@example.com
3,22,user3@example.com" > users.csv
# Создаём Expectation Suite
great_expectations suite new
Откроется Jupyter Notebook с интерактивным интерфейсом для создания expectations. Вот основные типы проверок:
import great_expectations as ge
# Загружаем данные
df = ge.read_csv('users.csv')
# Основные expectations
df.expect_column_to_exist('user_id')
df.expect_column_values_to_be_unique('user_id')
df.expect_column_values_to_not_be_null('email')
df.expect_column_values_to_be_between('age', min_value=18, max_value=100)
df.expect_column_values_to_match_regex('email', r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
# Сохраняем suite
df.save_expectation_suite()
Работа с базами данных
Для работы с PostgreSQL настроим соединение:
# great_expectations.yml
datasources:
my_postgres_db:
class_name: Datasource
execution_engine:
class_name: SqlAlchemyExecutionEngine
connection_string: postgresql://user:password@localhost:5432/mydb
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
default_inferred_data_connector_name:
class_name: InferredAssetSqlDataConnector
include_schema_name: true
Создаём expectations для таблицы:
import great_expectations as ge
# Подключаемся к базе
context = ge.get_context()
validator = context.get_validator(
datasource_name="my_postgres_db",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="users_table",
runtime_parameters={"query": "SELECT * FROM users"},
expectation_suite_name="users_suite"
)
# Проверки специфичные для SQL
validator.expect_column_to_exist("user_id")
validator.expect_table_row_count_to_be_between(min_value=1, max_value=1000000)
validator.expect_column_values_to_be_unique("user_id")
validator.expect_column_values_to_not_be_null("email")
# Сохраняем
validator.save_expectation_suite()
Создание Checkpoints и автоматизация
Checkpoints позволяют запускать валидацию автоматически:
great_expectations checkpoint new my_checkpoint
Конфигурируем checkpoint:
# checkpoints/my_checkpoint.yml
name: my_checkpoint
config_version: 1.0
template_name:
module_name: great_expectations.checkpoint
class_name: Checkpoint
run_name_template: "%Y%m%d-%H%M%S-my-run-name-template"
expectation_suite_name: users_suite
batch_request:
datasource_name: my_postgres_db
data_connector_name: default_runtime_data_connector_name
data_asset_name: users_table
runtime_parameters:
query: "SELECT * FROM users"
action_list:
- name: store_validation_result
action:
class_name: StoreValidationResultAction
- name: update_data_docs
action:
class_name: UpdateDataDocsAction
Запуск checkpoint:
great_expectations checkpoint run my_checkpoint
Интеграция с Airflow
Для автоматизации в production среде интегрируем с Apache Airflow:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import great_expectations as ge
def validate_data():
context = ge.get_context()
result = context.run_checkpoint(
checkpoint_name="my_checkpoint",
run_name=f"airflow_run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
if not result["success"]:
raise Exception("Data validation failed!")
return result
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'data_validation_pipeline',
default_args=default_args,
description='Data validation with Great Expectations',
schedule_interval=timedelta(hours=1),
catchup=False
)
validate_task = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
dag=dag
)
Продвинутые возможности
Great Expectations поддерживает кастомные expectations:
from great_expectations.execution_engine import PandasExecutionEngine
from great_expectations.expectations.expectation import ColumnMapExpectation
class ExpectColumnValuesToBeValidPhoneNumber(ColumnMapExpectation):
"""Проверка на валидный номер телефона"""
map_metric = "column_values.match_regex"
success_keys = ("regex",)
default_kwarg_values = {
"regex": r"^\+?1?\d{9,15}$",
"mostly": 1
}
def validate_configuration(self, configuration):
super().validate_configuration(configuration)
return True
# Использование
validator.expect_column_values_to_be_valid_phone_number("phone_number")
Мониторинг и алерты
Настроим интеграцию с Slack для уведомлений:
# validation_action.py
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.actions import SlackNotificationAction
class CustomSlackAction(SlackNotificationAction):
def __init__(self, webhook_url, notify_on="failure"):
super().__init__(webhook_url, notify_on)
def run(self, validation_result_suite, validation_result_suite_identifier, data_asset, **kwargs):
if not validation_result_suite.success:
failed_expectations = [
exp for exp in validation_result_suite.results
if not exp.success
]
message = f"🚨 Data validation failed!\n"
message += f"Dataset: {data_asset.name}\n"
message += f"Failed expectations: {len(failed_expectations)}\n"
for exp in failed_expectations[:5]: # Показываем первые 5
message += f"• {exp.expectation_config.expectation_type}\n"
self.send_slack_notification(message)
Сравнение с альтернативами
Решение | Плюсы | Минусы | Лучше для |
---|---|---|---|
Great Expectations | Богатый функционал, хорошая документация, активное сообщество | Может быть избыточным для простых случаев | Комплексных data pipelines |
Deequ (Amazon) | Интеграция с AWS, оптимизирован для Spark | Привязка к экосистеме AWS | AWS-окружения |
Cerberus | Простота, легковесность | Ограниченный функционал | Простой валидации |
Pandera | Pandas-нативный, типизация | Только для Pandas | Аналитических задач |
Практические кейсы и рекомендации
Кейс 1: Валидация данных в реальном времени
# streaming_validation.py
import great_expectations as ge
from kafka import KafkaConsumer
import json
def validate_streaming_data():
consumer = KafkaConsumer(
'user_events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
context = ge.get_context()
for message in consumer:
df = pd.DataFrame([message.value])
validator = context.get_validator(
datasource_name="my_datasource",
data_connector_name="runtime_data_connector",
data_asset_name="streaming_data",
runtime_parameters={"batch_data": df},
expectation_suite_name="streaming_suite"
)
result = validator.validate()
if not result.success:
# Логируем или отправляем в DLQ
print(f"Invalid data: {message.value}")
Кейс 2: Мониторинг дрифта данных
# data_drift_monitoring.py
import great_expectations as ge
from scipy import stats
def monitor_data_drift():
context = ge.get_context()
# Получаем исторические данные
historical_validator = context.get_validator(
datasource_name="my_db",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="users",
runtime_parameters={"query": "SELECT * FROM users WHERE created_at < '2023-01-01'"},
expectation_suite_name="drift_monitoring"
)
# Получаем текущие данные
current_validator = context.get_validator(
datasource_name="my_db",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="users",
runtime_parameters={"query": "SELECT * FROM users WHERE created_at >= '2023-01-01'"},
expectation_suite_name="drift_monitoring"
)
# Проверяем дрифт в распределении возраста
hist_ages = historical_validator.get_column_values("age")
curr_ages = current_validator.get_column_values("age")
# Kolmogorov-Smirnov test
ks_stat, p_value = stats.ks_2samp(hist_ages, curr_ages)
if p_value < 0.05:
print("Data drift detected!")
# Отправляем алерт
Производительность и оптимизация
Для больших объёмов данных важно оптимизировать валидацию:
# Используем семплинг для больших таблиц
batch_request = RuntimeBatchRequest(
datasource_name="my_postgres_db",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="large_table",
runtime_parameters={
"query": "SELECT * FROM large_table TABLESAMPLE BERNOULLI(1)" # 1% выборка
},
batch_identifiers={"default_identifier_name": "sampled_batch"}
)
# Параллельная валидация
from concurrent.futures import ThreadPoolExecutor
def validate_partition(partition_query):
validator = context.get_validator(
datasource_name="my_postgres_db",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="partitioned_table",
runtime_parameters={"query": partition_query},
expectation_suite_name="partition_suite"
)
return validator.validate()
# Валидируем партиции параллельно
partitions = [
"SELECT * FROM users WHERE created_at >= '2023-01-01' AND created_at < '2023-02-01'",
"SELECT * FROM users WHERE created_at >= '2023-02-01' AND created_at < '2023-03-01'",
# ...
]
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(validate_partition, partitions))
Интеграция с CI/CD
Добавим валидацию данных в CI/CD пайплайн:
# .github/workflows/data-validation.yml
name: Data Validation
on:
schedule:
- cron: '0 */6 * * *' # Каждые 6 часов
push:
paths:
- 'great_expectations/**'
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.9'
- name: Install dependencies
run: |
pip install great-expectations[postgresql,s3]
- name: Run data validation
run: |
great_expectations checkpoint run production_checkpoint
env:
DATABASE_URL: ${{ secrets.DATABASE_URL }}
S3_BUCKET: ${{ secrets.S3_BUCKET }}
Заключение и рекомендации
Great Expectations — мощный инструмент для обеспечения качества данных в современных data pipelines. Основные рекомендации по использованию:
- Начните с простого — создайте базовые expectations для критически важных полей
- Автоматизируйте — интегрируйте валидацию в ваши ETL-процессы
- Мониторьте — настройте алерты для быстрого реагирования на проблемы
- Документируйте — используйте Data Docs для создания понятной документации
- Оптимизируйте — для больших объёмов данных используйте семплинг и партиционирование
Great Expectations особенно полезен в следующих сценариях:
- Валидация данных в ETL-пайплайнах
- Мониторинг качества данных в ML-моделях
- Проверка данных при миграции между системами
- Контроль качества в data lakes
Для небольших проектов может быть избыточным, но для enterprise-уровня это must-have инструмент. Активное сообщество и регулярные обновления делают его отличным выбором для долгосрочных проектов.
Официальная документация доступна на docs.greatexpectations.io, а исходный код — на GitHub.
В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.
Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.