"""
Validation module for Macrodata Refinement (MDR).
This module provides functions and classes for validating macrodata
and ensuring data quality.
"""
from typing import Dict, List, Tuple, Union, Optional, Any
from dataclasses import dataclass
import numpy as np
import pandas as pd
[docs]
@dataclass
class ValidationResult:
"""Result of a data validation operation."""
is_valid: bool
error_messages: List[str]
invalid_indices: Optional[np.ndarray] = None
statistics: Optional[Dict[str, float]] = None
[docs]
def __post_init__(self) -> None:
"""Validate the ValidationResult instance."""
assert isinstance(self.is_valid, bool), "is_valid must be a boolean"
assert isinstance(self.error_messages, list), "error_messages must be a list"
if self.invalid_indices is not None:
assert isinstance(self.invalid_indices, np.ndarray), "invalid_indices must be a numpy ndarray"
if self.statistics is not None:
assert isinstance(self.statistics, dict), "statistics must be a dictionary"
for key, value in self.statistics.items():
assert isinstance(key, str), "statistics keys must be strings"
assert isinstance(value, float), f"statistics value for key '{key}' must be a floating-point number"
[docs]
def check_data_range(
data: np.ndarray,
min_value: float,
max_value: float
) -> ValidationResult:
"""
Check if all values in the data are within the specified range.
Args:
data: Input data array
min_value: Minimum allowed value
max_value: Maximum allowed value
Returns:
ValidationResult object containing validation results
"""
assert isinstance(data, np.ndarray), "data must be a numpy ndarray"
assert isinstance(min_value, float), "min_value must be a floating-point number"
assert isinstance(max_value, float), "max_value must be a floating-point number"
assert min_value <= max_value, "min_value must be less than or equal to max_value"
invalid_indices = np.where((data < min_value) | (data > max_value))[0]
is_valid = len(invalid_indices) == 0
error_messages = []
if not is_valid:
error_messages.append(
f"Data contains {len(invalid_indices)} values outside the range [{min_value}, {max_value}]"
)
# Calculate statistics
statistics = {
"min": float(np.min(data)),
"max": float(np.max(data)),
"mean": float(np.mean(data)),
"median": float(np.median(data)),
"std": float(np.std(data))
}
return ValidationResult(
is_valid=is_valid,
error_messages=error_messages,
invalid_indices=invalid_indices if not is_valid else None,
statistics=statistics
)
[docs]
def check_missing_values(
data: np.ndarray,
threshold: float = 0.1
) -> ValidationResult:
"""
Check for missing values in the data.
Args:
data: Input data array
threshold: Maximum allowed fraction of missing values
Returns:
ValidationResult object containing validation results
"""
assert isinstance(data, np.ndarray), "data must be a numpy ndarray"
assert isinstance(threshold, float), "threshold must be a floating-point number"
assert 0.0 <= threshold <= 1.0, "threshold must be between 0 and 1"
missing_indices = np.where(np.isnan(data))[0]
missing_fraction = len(missing_indices) / len(data)
is_valid = missing_fraction <= threshold
error_messages = []
if not is_valid:
error_messages.append(
f"Data contains {len(missing_indices)} missing values ({missing_fraction:.2%}), "
f"which exceeds the threshold of {threshold:.2%}"
)
# Calculate statistics related to missing values
statistics = {
"missing_count": float(len(missing_indices)),
"missing_fraction": float(missing_fraction),
"threshold": float(threshold)
}
return ValidationResult(
is_valid=is_valid,
error_messages=error_messages,
invalid_indices=missing_indices if not is_valid else None,
statistics=statistics
)
[docs]
def check_outliers(
data: np.ndarray,
threshold: float = 3.0,
method: str = "zscore"
) -> ValidationResult:
"""
Check for outliers in the data.
Args:
data: Input data array
threshold: Threshold for outlier detection
method: Method for outlier detection ('zscore', 'iqr', 'mad')
Returns:
ValidationResult object containing validation results
"""
assert isinstance(data, np.ndarray), "data must be a numpy ndarray"
assert isinstance(threshold, float), "threshold must be a floating-point number"
assert threshold > 0.0, "threshold must be greater than 0"
assert isinstance(method, str), "method must be a string"
# Remove NaN values for outlier detection
valid_data = data[~np.isnan(data)]
if len(valid_data) == 0:
return ValidationResult(
is_valid=False,
error_messages=["Cannot check outliers: all values are missing"],
statistics={"outlier_count": 0.0, "outlier_fraction": 0.0}
)
# Detect outliers based on the specified method
if method == "zscore":
# Z-score method
mean = np.mean(valid_data)
std = np.std(valid_data)
if std == 0:
# All values are the same, no outliers
outlier_indices = np.array([])
else:
z_scores = np.abs((data - mean) / std)
outlier_indices = np.where((~np.isnan(z_scores)) & (z_scores > threshold))[0]
elif method == "iqr":
# IQR method
q1, q3 = np.percentile(valid_data, [25, 75])
iqr = q3 - q1
if iqr == 0:
# IQR is zero, all values are the same or nearly the same
outlier_indices = np.array([])
else:
lower_bound = q1 - threshold * iqr
upper_bound = q3 + threshold * iqr
outlier_indices = np.where((~np.isnan(data)) &
((data < lower_bound) | (data > upper_bound)))[0]
elif method == "mad":
# Median Absolute Deviation method
median = np.median(valid_data)
mad = np.median(np.abs(valid_data - median))
if mad == 0:
# MAD is zero, all values are the same or nearly the same
outlier_indices = np.array([])
else:
z_scores = 0.6745 * np.abs(data - median) / mad # Approximately equivalent to z-scores
outlier_indices = np.where((~np.isnan(z_scores)) & (z_scores > threshold))[0]
else:
raise ValueError(f"Unknown outlier detection method: {method}")
outlier_fraction = len(outlier_indices) / len(data)
is_valid = outlier_fraction <= 0.1 # Consider data invalid if more than 10% are outliers
error_messages = []
if not is_valid:
error_messages.append(
f"Data contains {len(outlier_indices)} outliers ({outlier_fraction:.2%}), "
f"which may indicate data quality issues"
)
# Calculate statistics related to outliers
statistics = {
"outlier_count": float(len(outlier_indices)),
"outlier_fraction": float(outlier_fraction)
}
return ValidationResult(
is_valid=is_valid,
error_messages=error_messages,
invalid_indices=outlier_indices if not is_valid else None,
statistics=statistics
)
[docs]
def check_data_integrity(
data: np.ndarray,
checks: List[str] = ["range", "missing", "outliers"],
params: Dict[str, Any] = None
) -> ValidationResult:
"""
Perform a comprehensive data integrity check.
Args:
data: Input data array
checks: List of checks to perform
params: Parameters for each check
Returns:
ValidationResult object containing validation results
"""
assert isinstance(data, np.ndarray), "data must be a numpy ndarray"
assert isinstance(checks, list), "checks must be a list"
if params is None:
params = {}
assert isinstance(params, dict), "params must be a dictionary"
# Default parameters
default_params = {
"range": {"min_value": -np.inf, "max_value": np.inf},
"missing": {"threshold": 0.1},
"outliers": {"threshold": 3.0, "method": "zscore"}
}
# Update default parameters with provided parameters
for check, check_params in default_params.items():
if check in params:
check_params.update(params[check])
results = []
all_errors = []
invalid_indices_sets = []
all_statistics = {}
# Perform each requested check
for check in checks:
if check == "range":
result = check_data_range(
data,
min_value=float(default_params["range"]["min_value"]),
max_value=float(default_params["range"]["max_value"])
)
elif check == "missing":
result = check_missing_values(
data,
threshold=float(default_params["missing"]["threshold"])
)
elif check == "outliers":
result = check_outliers(
data,
threshold=float(default_params["outliers"]["threshold"]),
method=default_params["outliers"]["method"]
)
else:
raise ValueError(f"Unknown check: {check}")
results.append(result)
all_errors.extend(result.error_messages)
if result.invalid_indices is not None:
invalid_indices_sets.append(result.invalid_indices)
if result.statistics is not None:
all_statistics.update({f"{check}_{k}": v for k, v in result.statistics.items()})
# Combine invalid indices from all checks
combined_invalid_indices = np.unique(np.concatenate(invalid_indices_sets)) if invalid_indices_sets else None
# Data is valid if all checks pass
is_valid = all(result.is_valid for result in results)
return ValidationResult(
is_valid=is_valid,
error_messages=all_errors,
invalid_indices=combined_invalid_indices,
statistics=all_statistics
)
[docs]
def validate_data(
data_dict: Dict[str, np.ndarray],
checks: List[str] = ["range", "missing", "outliers"],
params: Dict[str, Dict[str, Any]] = None
) -> Dict[str, ValidationResult]:
"""
Validate multiple data arrays.
Args:
data_dict: Dictionary mapping variable names to data arrays
checks: List of checks to perform
params: Parameters for each check - can be structured in two ways:
1. Global parameters: {check_name: {parameters}}
2. Variable-specific parameters: {variable_name: {check_name: {parameters}}}
Variable-specific parameters take precedence over global ones.
Returns:
Dictionary mapping variable names to ValidationResult objects
"""
assert isinstance(data_dict, dict), "data_dict must be a dictionary"
assert isinstance(checks, list), "checks must be a list"
if params is None:
params = {}
assert isinstance(params, dict), "params must be a dictionary"
# Check if there are global parameters (directly for check names)
global_params = {}
for check in checks:
if check in params:
global_params[check] = params[check]
validation_results = {}
for key, data in data_dict.items():
assert isinstance(data, np.ndarray), f"Value for key '{key}' must be a numpy ndarray"
# Get variable-specific parameters, if specified
var_params = params.get(key, {})
# Merge global and variable-specific parameters, with variable-specific taking precedence
merged_params = global_params.copy()
for check in checks:
if check in var_params:
if check not in merged_params:
merged_params[check] = {}
merged_params[check].update(var_params[check])
# Validate this variable's data
validation_results[key] = check_data_integrity(data, checks, merged_params)
return validation_results