Source code for mdr.core.transformation

"""
Transformation module for Macrodata Refinement (MDR).

This module provides functions for transforming macrodata
through various statistical and mathematical operations.
"""

from typing import Dict, List, Tuple, Union, Optional, Any, Callable
import numpy as np
import pandas as pd
from enum import Enum, auto


[docs] class NormalizationType(Enum): """Types of normalization methods.""" MINMAX = auto() ZSCORE = auto() ROBUST = auto() DECIMAL_SCALING = auto()
[docs] def normalize_data( data: np.ndarray, method: Union[str, NormalizationType] = "minmax", params: Optional[Dict[str, Any]] = None ) -> Tuple[np.ndarray, Dict[str, float]]: """ Normalize data using the specified method. Args: data: Input data array method: Normalization method ('minmax', 'zscore', 'robust', 'decimal_scaling') params: Additional parameters for normalization Returns: Tuple of (normalized data array, normalization parameters) """ assert isinstance(data, np.ndarray), "data must be a numpy ndarray" if isinstance(method, str): try: method = NormalizationType[method.upper()] except KeyError: raise ValueError(f"Unknown normalization method: {method}") assert isinstance(method, NormalizationType), "method must be a string or NormalizationType" if params is None: params = {} assert isinstance(params, dict), "params must be a dictionary" # Handle missing values valid_mask = ~np.isnan(data) valid_data = data[valid_mask] if len(valid_data) == 0: # All values are missing, return the original data and empty parameters return data.copy(), {} # Create output array normalized = np.full_like(data, np.nan) # Apply normalization based on the specified method if method == NormalizationType.MINMAX: # Min-max normalization data_min = np.min(valid_data) data_max = np.max(valid_data) # If min and max are the same, return zeros if data_min == data_max: normalized[valid_mask] = 0.0 normalization_params = {"min": float(data_min), "max": float(data_max)} else: # Apply min-max scaling normalized[valid_mask] = (valid_data - data_min) / (data_max - data_min) normalization_params = {"min": float(data_min), "max": float(data_max)} elif method == NormalizationType.ZSCORE: # Z-score normalization data_mean = np.mean(valid_data) data_std = np.std(valid_data) # If standard deviation is zero, return zeros if data_std == 0: normalized[valid_mask] = 0.0 normalization_params = {"mean": float(data_mean), "std": float(data_std)} else: # Apply z-score normalization normalized[valid_mask] = (valid_data - data_mean) / data_std normalization_params = {"mean": float(data_mean), "std": float(data_std)} elif method == NormalizationType.ROBUST: # Robust normalization using median and IQR data_median = np.median(valid_data) q1, q3 = np.percentile(valid_data, [25, 75]) iqr = q3 - q1 # If IQR is zero, return zeros if iqr == 0: normalized[valid_mask] = 0.0 normalization_params = {"median": float(data_median), "iqr": float(iqr)} else: # Apply robust normalization normalized[valid_mask] = (valid_data - data_median) / iqr normalization_params = {"median": float(data_median), "iqr": float(iqr)} elif method == NormalizationType.DECIMAL_SCALING: # Decimal scaling normalization max_abs = np.max(np.abs(valid_data)) if max_abs == 0: normalized[valid_mask] = 0.0 normalization_params = {"scale": 1.0} else: # Calculate the number of digits in the maximum absolute value scale = 10 ** np.ceil(np.log10(max_abs)) # Apply decimal scaling normalized[valid_mask] = valid_data / scale normalization_params = {"scale": float(scale)} return normalized, normalization_params
[docs] def scale_data( data: np.ndarray, factor: float, offset: float = 0.0 ) -> np.ndarray: """ Scale data by a factor and add an offset. Args: data: Input data array factor: Scaling factor offset: Offset to add after scaling Returns: Scaled data array """ assert isinstance(data, np.ndarray), "data must be a numpy ndarray" assert isinstance(factor, float), "factor must be a floating-point number" assert isinstance(offset, float), "offset must be a floating-point number" return data * factor + offset
[docs] def apply_logarithmic_transform( data: np.ndarray, base: float = 10.0, epsilon: float = 1e-10 ) -> np.ndarray: """ Apply logarithmic transformation to the data. Args: data: Input data array base: Logarithm base epsilon: Small value to add to prevent log(0) Returns: Log-transformed data array """ assert isinstance(data, np.ndarray), "data must be a numpy ndarray" assert isinstance(base, float), "base must be a floating-point number" assert base > 0.0, "base must be positive" assert isinstance(epsilon, float), "epsilon must be a floating-point number" assert epsilon > 0.0, "epsilon must be positive" # Create a copy to avoid modifying the original data transformed = data.copy() # Replace negative values with NaN transformed[transformed <= 0] = np.nan # Apply log transformation valid_mask = ~np.isnan(transformed) transformed[valid_mask] = np.log(transformed[valid_mask] + epsilon) / np.log(base) return transformed
[docs] def apply_power_transform( data: np.ndarray, power: float, preserve_sign: bool = True ) -> np.ndarray: """ Apply power transformation to the data. Args: data: Input data array power: Power to raise the data to preserve_sign: Whether to preserve the sign of the original data Returns: Power-transformed data array """ assert isinstance(data, np.ndarray), "data must be a numpy ndarray" assert isinstance(power, float), "power must be a floating-point number" assert isinstance(preserve_sign, bool), "preserve_sign must be a boolean" # Create a copy to avoid modifying the original data transformed = data.copy() if preserve_sign: # Preserve the sign of the original data signs = np.sign(transformed) # Transform the absolute values valid_mask = ~np.isnan(transformed) transformed[valid_mask] = np.abs(transformed[valid_mask]) ** power * signs[valid_mask] else: # Apply power transformation directly valid_mask = ~np.isnan(transformed) transformed[valid_mask] = transformed[valid_mask] ** power return transformed
[docs] def apply_rolling_window( data: np.ndarray, window_size: int, window_function: Callable[[np.ndarray], float], center: bool = True ) -> np.ndarray: """ Apply a rolling window function to the data. Args: data: Input data array window_size: Size of the rolling window window_function: Function to apply to each window (e.g., np.mean, np.median) center: Whether to center the window Returns: Data array with the rolling window function applied """ assert isinstance(data, np.ndarray), "data must be a numpy ndarray" assert isinstance(window_size, int), "window_size must be an integer" assert window_size > 0, "window_size must be positive" assert callable(window_function), "window_function must be callable" assert isinstance(center, bool), "center must be a boolean" # Create pandas Series for easy rolling window operations series = pd.Series(data) # Apply rolling window function rolled = series.rolling(window=window_size, center=center).apply( lambda x: window_function(x.values) ) return rolled.values
[docs] def transform_data( data: np.ndarray, transformations: List[Dict[str, Any]] ) -> np.ndarray: """ Apply a sequence of transformations to the data. Args: data: Input data array transformations: List of transformation specifications Returns: Transformed data array """ assert isinstance(data, np.ndarray), "data must be a numpy ndarray" assert isinstance(transformations, list), "transformations must be a list" # Create a copy to avoid modifying the original data transformed = data.copy() for transform_spec in transformations: assert isinstance(transform_spec, dict), "Each transformation specification must be a dictionary" assert "type" in transform_spec, "Each transformation specification must have a 'type' field" transform_type = transform_spec["type"] if transform_type == "normalize": method = transform_spec.get("method", "minmax") params = transform_spec.get("params", {}) transformed, _ = normalize_data(transformed, method=method, params=params) elif transform_type == "scale": factor = float(transform_spec.get("factor", 1.0)) offset = float(transform_spec.get("offset", 0.0)) transformed = scale_data(transformed, factor=factor, offset=offset) elif transform_type == "log": base = float(transform_spec.get("base", 10.0)) epsilon = float(transform_spec.get("epsilon", 1e-10)) transformed = apply_logarithmic_transform(transformed, base=base, epsilon=epsilon) elif transform_type == "power": power = float(transform_spec.get("power", 2.0)) preserve_sign = transform_spec.get("preserve_sign", True) transformed = apply_power_transform(transformed, power=power, preserve_sign=preserve_sign) elif transform_type == "rolling": window_size = int(transform_spec.get("window_size", 3)) # Get window function func_name = transform_spec.get("function", "mean") if func_name == "mean": window_function = np.nanmean elif func_name == "median": window_function = np.nanmedian elif func_name == "sum": window_function = np.nansum elif func_name == "min": window_function = np.nanmin elif func_name == "max": window_function = np.nanmax else: raise ValueError(f"Unknown window function: {func_name}") center = transform_spec.get("center", True) transformed = apply_rolling_window( transformed, window_size=window_size, window_function=window_function, center=center ) else: raise ValueError(f"Unknown transformation type: {transform_type}") return transformed