"""
Refinement module for Macrodata Refinement (MDR).
This module provides functions and classes for refining macrodata
through various statistical and analytical methods.
"""
from typing import Dict, List, Optional, Union, Any, Tuple
from dataclasses import dataclass
import numpy as np
import pandas as pd
[docs]
@dataclass
class RefinementConfig:
"""Configuration for data refinement operations."""
smoothing_factor: float
outlier_threshold: float
imputation_method: str
normalization_type: str
[docs]
def __post_init__(self) -> None:
"""Validate the configuration parameters."""
assert isinstance(self.smoothing_factor, float), "smoothing_factor must be a floating-point number"
assert isinstance(self.outlier_threshold, float), "outlier_threshold must be a floating-point number"
assert isinstance(self.imputation_method, str), "imputation_method must be a string"
assert isinstance(self.normalization_type, str), "normalization_type must be a string"
assert 0.0 < self.smoothing_factor <= 1.0, "smoothing_factor must be between 0 and 1"
assert self.outlier_threshold > 0.0, "outlier_threshold must be greater than 0"
[docs]
def smooth_data(data: np.ndarray, factor: float) -> np.ndarray:
"""
Apply smoothing to the input data.
Args:
data: Input data array to smooth
factor: Smoothing factor (0 < factor <= 1)
Returns:
Smoothed data array
"""
assert isinstance(data, np.ndarray), "data must be a numpy ndarray"
assert isinstance(factor, float), "factor must be a floating-point number"
assert 0.0 < factor <= 1.0, "factor must be between 0 and 1"
# Apply exponential moving average for smoothing
window_size = max(2, int(1.0 / factor))
weights = np.exp(np.linspace(-1., 0., window_size))
weights /= weights.sum()
smoothed = np.convolve(data, weights, mode='same')
# Handle edge effects
smoothed[0] = data[0]
smoothed[-1] = data[-1]
return smoothed
[docs]
def remove_outliers(data: np.ndarray, threshold: float) -> np.ndarray:
"""
Remove outliers from the data using the specified threshold.
Args:
data: Input data array
threshold: Z-score threshold for outlier detection
Returns:
Data array with outliers replaced by median values
"""
assert isinstance(data, np.ndarray), "data must be a numpy ndarray"
assert isinstance(threshold, float), "threshold must be a floating-point number"
assert threshold > 0.0, "threshold must be greater than 0"
# Calculate z-scores
median = np.median(data)
mad = np.median(np.abs(data - median)) # Median Absolute Deviation
if mad == 0:
# Handle case where MAD is zero (all values are the same)
return data
z_scores = 0.6745 * (data - median) / mad # Approximately equivalent to z-scores
# Create a copy to avoid modifying the original data
refined_data = data.copy()
# Replace outliers with median values
outlier_mask = np.abs(z_scores) > threshold
refined_data[outlier_mask] = median
return refined_data
[docs]
def impute_missing_values(
data: np.ndarray,
method: str = "mean",
window_size: int = 3
) -> np.ndarray:
"""
Impute missing values in the data.
Args:
data: Input data array with potential NaN values
method: Imputation method ('mean', 'median', 'linear', 'forward')
window_size: Size of the window for local imputation methods
Returns:
Data array with missing values imputed
"""
assert isinstance(data, np.ndarray), "data must be a numpy ndarray"
assert isinstance(method, str), "method must be a string"
assert isinstance(window_size, int), "window_size must be an integer"
assert window_size > 0, "window_size must be greater than 0"
# Create a copy to avoid modifying the original data
imputed_data = data.copy()
# Find indices of missing values
missing_indices = np.where(np.isnan(imputed_data))[0]
if len(missing_indices) == 0:
return imputed_data
if method == "mean":
# Replace missing values with the mean of non-missing values
mean_value = np.nanmean(imputed_data)
imputed_data[missing_indices] = mean_value
elif method == "median":
# Replace missing values with the median of non-missing values
median_value = np.nanmedian(imputed_data)
imputed_data[missing_indices] = median_value
elif method == "linear":
# Linear interpolation
valid_indices = np.where(~np.isnan(imputed_data))[0]
if len(valid_indices) < 2:
# Not enough valid points for interpolation, use forward fill
imputed_data = pd.Series(imputed_data).fillna(method='ffill').values
else:
imputed_data = pd.Series(imputed_data).interpolate(method='linear').values
elif method == "forward":
# Forward fill
imputed_data = pd.Series(imputed_data).fillna(method='ffill').values
# If we still have NaN values at the beginning, fill with the first valid value
if np.isnan(imputed_data[0]):
first_valid_idx = np.where(~np.isnan(imputed_data))[0]
if len(first_valid_idx) > 0:
imputed_data[0:first_valid_idx[0]] = imputed_data[first_valid_idx[0]]
else:
imputed_data[:] = 0.0 # If all values are NaN, set to 0
else:
raise ValueError(f"Unknown imputation method: {method}")
return imputed_data
[docs]
def refine_data(
data: np.ndarray,
config: RefinementConfig
) -> np.ndarray:
"""
Apply a complete refinement pipeline to the data.
Args:
data: Input data array
config: Refinement configuration
Returns:
Refined data array
"""
assert isinstance(data, np.ndarray), "data must be a numpy ndarray"
assert isinstance(config, RefinementConfig), "config must be a RefinementConfig object"
# First, impute missing values
refined = impute_missing_values(data, method=config.imputation_method)
# Then remove outliers
refined = remove_outliers(refined, threshold=config.outlier_threshold)
# Finally, smooth the data
refined = smooth_data(refined, factor=config.smoothing_factor)
return refined
[docs]
def apply_refinement_pipeline(
data_dict: Dict[str, np.ndarray],
config: RefinementConfig
) -> Dict[str, np.ndarray]:
"""
Apply refinement pipeline to a dictionary of data arrays.
Args:
data_dict: Dictionary mapping variable names to data arrays
config: Refinement configuration
Returns:
Dictionary with refined data arrays
"""
assert isinstance(data_dict, dict), "data_dict must be a dictionary"
assert isinstance(config, RefinementConfig), "config must be a RefinementConfig object"
refined_dict = {}
for key, data in data_dict.items():
assert isinstance(data, np.ndarray), f"Value for key '{key}' must be a numpy ndarray"
refined_dict[key] = refine_data(data, config)
return refined_dict