Home » Как тестировать данные с помощью Great Expectations
Как тестировать данные с помощью Great Expectations

Как тестировать данные с помощью Great Expectations

Привет! Сегодня разберём одну из самых недооценённых тем в DevOps — тестирование данных. Если вы когда-нибудь сталкивались с ситуацией, когда ваш ML-пайплайн развалился из-за того, что данные пришли в неожиданном формате, или когда ETL-процесс начал жрать терабайты места из-за дублей, то вы знаете, о чём я говорю. Great Expectations — это фреймворк для валидации данных, который поможет вам избежать головной боли и создать надёжные data pipelines.

Эта статья покажет вам, как настроить и использовать Great Expectations для тестирования данных в продакшене. Мы пройдём от установки до создания сложных тестовых сценариев, рассмотрим интеграцию с популярными базами данных и облачными сервисами, а также разберём реальные кейсы использования.

Что такое Great Expectations и зачем оно нужно

Great Expectations — это open-source фреймворк для валидации, документирования и профилирования данных. Если простыми словами, это юнит-тесты для ваших данных. Вместо того чтобы проверять, что функция возвращает правильное значение, вы проверяете, что ваши данные соответствуют ожидаемым критериям.

Основные возможности:

  • Валидация данных с помощью декларативных тестов
  • Автоматическое профилирование данных
  • Генерация документации
  • Интеграция с популярными инструментами (Airflow, Spark, Pandas)
  • Красивые HTML-отчёты

Установка и первоначальная настройка

Для начала работы понадобится сервер с Python 3.7+. Если у вас ещё нет подходящего сервера, можете арендовать VPS или выделенный сервер.

Устанавливаем Great Expectations:

pip install great-expectations

# Для работы с SQL-базами
pip install great-expectations[sqlalchemy]

# Для работы с Spark
pip install great-expectations[spark]

# Для работы с S3
pip install great-expectations[s3]

Инициализируем новый проект:

mkdir my_data_validation
cd my_data_validation
great_expectations init

Эта команда создаст структуру проекта:

great_expectations/
├── checkpoints/
├── expectations/
├── plugins/
├── profilers/
├── uncommitted/
├── .gitignore
└── great_expectations.yml

Создание первого Expectation Suite

Expectation Suite — это набор правил (expectations), которые должны выполняться для ваших данных. Создадим простой пример:

great_expectations datasource new

Выберите тип источника данных. Для начала рассмотрим файловый источник:

# Создаём тестовый CSV-файл
echo "user_id,age,email
1,25,user1@example.com
2,30,user2@example.com
3,22,user3@example.com" > users.csv

# Создаём Expectation Suite
great_expectations suite new

Откроется Jupyter Notebook с интерактивным интерфейсом для создания expectations. Вот основные типы проверок:

import great_expectations as ge

# Загружаем данные
df = ge.read_csv('users.csv')

# Основные expectations
df.expect_column_to_exist('user_id')
df.expect_column_values_to_be_unique('user_id')
df.expect_column_values_to_not_be_null('email')
df.expect_column_values_to_be_between('age', min_value=18, max_value=100)
df.expect_column_values_to_match_regex('email', r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')

# Сохраняем suite
df.save_expectation_suite()

Работа с базами данных

Для работы с PostgreSQL настроим соединение:

# great_expectations.yml
datasources:
  my_postgres_db:
    class_name: Datasource
    execution_engine:
      class_name: SqlAlchemyExecutionEngine
      connection_string: postgresql://user:password@localhost:5432/mydb
    data_connectors:
      default_runtime_data_connector_name:
        class_name: RuntimeDataConnector
        batch_identifiers:
          - default_identifier_name
      default_inferred_data_connector_name:
        class_name: InferredAssetSqlDataConnector
        include_schema_name: true

Создаём expectations для таблицы:

import great_expectations as ge

# Подключаемся к базе
context = ge.get_context()
validator = context.get_validator(
    datasource_name="my_postgres_db",
    data_connector_name="default_runtime_data_connector_name",
    data_asset_name="users_table",
    runtime_parameters={"query": "SELECT * FROM users"},
    expectation_suite_name="users_suite"
)

# Проверки специфичные для SQL
validator.expect_column_to_exist("user_id")
validator.expect_table_row_count_to_be_between(min_value=1, max_value=1000000)
validator.expect_column_values_to_be_unique("user_id")
validator.expect_column_values_to_not_be_null("email")

# Сохраняем
validator.save_expectation_suite()

Создание Checkpoints и автоматизация

Checkpoints позволяют запускать валидацию автоматически:

great_expectations checkpoint new my_checkpoint

Конфигурируем checkpoint:

# checkpoints/my_checkpoint.yml
name: my_checkpoint
config_version: 1.0
template_name:
module_name: great_expectations.checkpoint
class_name: Checkpoint
run_name_template: "%Y%m%d-%H%M%S-my-run-name-template"
expectation_suite_name: users_suite
batch_request:
  datasource_name: my_postgres_db
  data_connector_name: default_runtime_data_connector_name
  data_asset_name: users_table
  runtime_parameters:
    query: "SELECT * FROM users"
action_list:
  - name: store_validation_result
    action:
      class_name: StoreValidationResultAction
  - name: update_data_docs
    action:
      class_name: UpdateDataDocsAction

Запуск checkpoint:

great_expectations checkpoint run my_checkpoint

Интеграция с Airflow

Для автоматизации в production среде интегрируем с Apache Airflow:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import great_expectations as ge

def validate_data():
    context = ge.get_context()
    result = context.run_checkpoint(
        checkpoint_name="my_checkpoint",
        run_name=f"airflow_run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    )
    
    if not result["success"]:
        raise Exception("Data validation failed!")
    
    return result

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'data_validation_pipeline',
    default_args=default_args,
    description='Data validation with Great Expectations',
    schedule_interval=timedelta(hours=1),
    catchup=False
)

validate_task = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    dag=dag
)

Продвинутые возможности

Great Expectations поддерживает кастомные expectations:

from great_expectations.execution_engine import PandasExecutionEngine
from great_expectations.expectations.expectation import ColumnMapExpectation

class ExpectColumnValuesToBeValidPhoneNumber(ColumnMapExpectation):
    """Проверка на валидный номер телефона"""
    
    map_metric = "column_values.match_regex"
    success_keys = ("regex",)
    
    default_kwarg_values = {
        "regex": r"^\+?1?\d{9,15}$",
        "mostly": 1
    }

    def validate_configuration(self, configuration):
        super().validate_configuration(configuration)
        return True

# Использование
validator.expect_column_values_to_be_valid_phone_number("phone_number")

Мониторинг и алерты

Настроим интеграцию с Slack для уведомлений:

# validation_action.py
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.actions import SlackNotificationAction

class CustomSlackAction(SlackNotificationAction):
    def __init__(self, webhook_url, notify_on="failure"):
        super().__init__(webhook_url, notify_on)
        
    def run(self, validation_result_suite, validation_result_suite_identifier, data_asset, **kwargs):
        if not validation_result_suite.success:
            failed_expectations = [
                exp for exp in validation_result_suite.results 
                if not exp.success
            ]
            
            message = f"🚨 Data validation failed!\n"
            message += f"Dataset: {data_asset.name}\n"
            message += f"Failed expectations: {len(failed_expectations)}\n"
            
            for exp in failed_expectations[:5]:  # Показываем первые 5
                message += f"• {exp.expectation_config.expectation_type}\n"
                
            self.send_slack_notification(message)

Сравнение с альтернативами

Решение Плюсы Минусы Лучше для
Great Expectations Богатый функционал, хорошая документация, активное сообщество Может быть избыточным для простых случаев Комплексных data pipelines
Deequ (Amazon) Интеграция с AWS, оптимизирован для Spark Привязка к экосистеме AWS AWS-окружения
Cerberus Простота, легковесность Ограниченный функционал Простой валидации
Pandera Pandas-нативный, типизация Только для Pandas Аналитических задач

Практические кейсы и рекомендации

Кейс 1: Валидация данных в реальном времени

# streaming_validation.py
import great_expectations as ge
from kafka import KafkaConsumer
import json

def validate_streaming_data():
    consumer = KafkaConsumer(
        'user_events',
        bootstrap_servers=['localhost:9092'],
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )
    
    context = ge.get_context()
    
    for message in consumer:
        df = pd.DataFrame([message.value])
        
        validator = context.get_validator(
            datasource_name="my_datasource",
            data_connector_name="runtime_data_connector",
            data_asset_name="streaming_data",
            runtime_parameters={"batch_data": df},
            expectation_suite_name="streaming_suite"
        )
        
        result = validator.validate()
        
        if not result.success:
            # Логируем или отправляем в DLQ
            print(f"Invalid data: {message.value}")

Кейс 2: Мониторинг дрифта данных

# data_drift_monitoring.py
import great_expectations as ge
from scipy import stats

def monitor_data_drift():
    context = ge.get_context()
    
    # Получаем исторические данные
    historical_validator = context.get_validator(
        datasource_name="my_db",
        data_connector_name="default_runtime_data_connector_name",
        data_asset_name="users",
        runtime_parameters={"query": "SELECT * FROM users WHERE created_at < '2023-01-01'"},
        expectation_suite_name="drift_monitoring"
    )
    
    # Получаем текущие данные
    current_validator = context.get_validator(
        datasource_name="my_db",
        data_connector_name="default_runtime_data_connector_name",
        data_asset_name="users",
        runtime_parameters={"query": "SELECT * FROM users WHERE created_at >= '2023-01-01'"},
        expectation_suite_name="drift_monitoring"
    )
    
    # Проверяем дрифт в распределении возраста
    hist_ages = historical_validator.get_column_values("age")
    curr_ages = current_validator.get_column_values("age")
    
    # Kolmogorov-Smirnov test
    ks_stat, p_value = stats.ks_2samp(hist_ages, curr_ages)
    
    if p_value < 0.05:
        print("Data drift detected!")
        # Отправляем алерт

Производительность и оптимизация

Для больших объёмов данных важно оптимизировать валидацию:

# Используем семплинг для больших таблиц
batch_request = RuntimeBatchRequest(
    datasource_name="my_postgres_db",
    data_connector_name="default_runtime_data_connector_name",
    data_asset_name="large_table",
    runtime_parameters={
        "query": "SELECT * FROM large_table TABLESAMPLE BERNOULLI(1)"  # 1% выборка
    },
    batch_identifiers={"default_identifier_name": "sampled_batch"}
)

# Параллельная валидация
from concurrent.futures import ThreadPoolExecutor

def validate_partition(partition_query):
    validator = context.get_validator(
        datasource_name="my_postgres_db",
        data_connector_name="default_runtime_data_connector_name",
        data_asset_name="partitioned_table",
        runtime_parameters={"query": partition_query},
        expectation_suite_name="partition_suite"
    )
    return validator.validate()

# Валидируем партиции параллельно
partitions = [
    "SELECT * FROM users WHERE created_at >= '2023-01-01' AND created_at < '2023-02-01'",
    "SELECT * FROM users WHERE created_at >= '2023-02-01' AND created_at < '2023-03-01'",
    # ...
]

with ThreadPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(validate_partition, partitions))

Интеграция с CI/CD

Добавим валидацию данных в CI/CD пайплайн:

# .github/workflows/data-validation.yml
name: Data Validation
on:
  schedule:
    - cron: '0 */6 * * *'  # Каждые 6 часов
  push:
    paths:
      - 'great_expectations/**'

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: '3.9'
      - name: Install dependencies
        run: |
          pip install great-expectations[postgresql,s3]
      - name: Run data validation
        run: |
          great_expectations checkpoint run production_checkpoint
        env:
          DATABASE_URL: ${{ secrets.DATABASE_URL }}
          S3_BUCKET: ${{ secrets.S3_BUCKET }}

Заключение и рекомендации

Great Expectations — мощный инструмент для обеспечения качества данных в современных data pipelines. Основные рекомендации по использованию:

  • Начните с простого — создайте базовые expectations для критически важных полей
  • Автоматизируйте — интегрируйте валидацию в ваши ETL-процессы
  • Мониторьте — настройте алерты для быстрого реагирования на проблемы
  • Документируйте — используйте Data Docs для создания понятной документации
  • Оптимизируйте — для больших объёмов данных используйте семплинг и партиционирование

Great Expectations особенно полезен в следующих сценариях:

  • Валидация данных в ETL-пайплайнах
  • Мониторинг качества данных в ML-моделях
  • Проверка данных при миграции между системами
  • Контроль качества в data lakes

Для небольших проектов может быть избыточным, но для enterprise-уровня это must-have инструмент. Активное сообщество и регулярные обновления делают его отличным выбором для долгосрочных проектов.

Официальная документация доступна на docs.greatexpectations.io, а исходный код — на GitHub.


В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.

Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.

Leave a reply

Your email address will not be published. Required fields are marked