Source code for mdr.utils.helpers

"""
Helper utilities for Macrodata Refinement (MDR).

This module provides utility functions for data validation, transformation,
and management.
"""

import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Union, Optional, Any, Callable
import sys
import os
import gc
from collections import deque


[docs] def validate_numeric_array( arr: np.ndarray, allow_nan: bool = True, min_val: Optional[float] = None, max_val: Optional[float] = None ) -> bool: """ Validate that an array consists of numeric values. Args: arr: Input array to validate allow_nan: Whether to allow NaN values min_val: Minimum allowed value (optional) max_val: Maximum allowed value (optional) Returns: True if the array is valid, False otherwise """ assert isinstance(arr, np.ndarray), "arr must be a numpy ndarray" assert isinstance(allow_nan, bool), "allow_nan must be a boolean" if min_val is not None: assert isinstance(min_val, float), "min_val must be a floating-point number" if max_val is not None: assert isinstance(max_val, float), "max_val must be a floating-point number" if min_val is not None and max_val is not None: assert min_val <= max_val, "min_val must be less than or equal to max_val" # Check if the array is numeric if not np.issubdtype(arr.dtype, np.number): return False # Check for NaN values if not allowed if not allow_nan and np.isnan(arr).any(): return False # Check minimum value if specified if min_val is not None: if not allow_nan: if np.any(arr < min_val): return False else: if np.any(np.logical_and(~np.isnan(arr), arr < min_val)): return False # Check maximum value if specified if max_val is not None: if not allow_nan: if np.any(arr > max_val): return False else: if np.any(np.logical_and(~np.isnan(arr), arr > max_val)): return False return True
[docs] def validate_range( value: float, min_val: float, max_val: float, inclusive: bool = True ) -> bool: """ Check if a value is within a specified range. Args: value: The value to check min_val: Minimum allowed value max_val: Maximum allowed value inclusive: Whether the range bounds are inclusive Returns: True if the value is within the range, False otherwise """ assert isinstance(value, float), "value must be a floating-point number" assert isinstance(min_val, float), "min_val must be a floating-point number" assert isinstance(max_val, float), "max_val must be a floating-point number" assert isinstance(inclusive, bool), "inclusive must be a boolean" assert min_val <= max_val, "min_val must be less than or equal to max_val" if inclusive: return min_val <= value <= max_val else: return min_val < value < max_val
[docs] def moving_average( data: np.ndarray, window_size: int, center: bool = False ) -> np.ndarray: """ Calculate the moving average of a data array. Args: data: Input data array window_size: Size of the moving window center: Whether to center the window Returns: Array of moving averages """ assert isinstance(data, np.ndarray), "data must be a numpy ndarray" assert isinstance(window_size, int), "window_size must be an integer" assert window_size > 0, "window_size must be positive" assert isinstance(center, bool), "center must be a boolean" # Convert to pandas Series for easy rolling calculation series = pd.Series(data) # Calculate rolling mean rolling_mean = series.rolling(window=window_size, center=center).mean() # Convert back to numpy array result = rolling_mean.to_numpy() return result
[docs] def detect_seasonality( data: np.ndarray, max_lag: int = 365, threshold: float = 0.3 ) -> Tuple[bool, Optional[int]]: """ Detect seasonality in a time series using autocorrelation. Args: data: Input time series data max_lag: Maximum lag to consider threshold: Correlation threshold for seasonality detection Returns: Tuple of (is_seasonal, period), where period is the detected seasonal period or None if no seasonality is detected """ assert isinstance(data, np.ndarray), "data must be a numpy ndarray" assert isinstance(max_lag, int), "max_lag must be an integer" assert max_lag > 0, "max_lag must be positive" assert isinstance(threshold, float), "threshold must be a floating-point number" assert 0.0 <= threshold <= 1.0, "threshold must be between 0 and 1" # Ensure we have enough data if len(data) < 2 * max_lag: max_lag = len(data) // 2 # Remove NaN values clean_data = data[~np.isnan(data)] if len(clean_data) < 2 * max_lag: return False, None # Compute autocorrelation autocorr = np.correlate(clean_data, clean_data, mode='full') autocorr = autocorr[len(clean_data)-1:len(clean_data)+max_lag] autocorr = autocorr / np.max(autocorr) # Normalize # Find peaks peaks = [] for i in range(1, len(autocorr)-1): if autocorr[i] > autocorr[i-1] and autocorr[i] > autocorr[i+1] and autocorr[i] > threshold: peaks.append((i, autocorr[i])) # Sort peaks by correlation value peaks.sort(key=lambda x: x[1], reverse=True) # Return the period of the highest peak (excluding lag 0) if peaks and peaks[0][0] > 0: return True, peaks[0][0] else: return False, None
[docs] def interpolate_missing( data: np.ndarray, method: str = 'linear', max_gap: Optional[int] = None, order: Optional[int] = None ) -> np.ndarray: """ Interpolate missing values in a data array. Args: data: Input data array with potential NaN values method: Interpolation method ('linear', 'nearest', 'cubic', 'spline') max_gap: Maximum gap size to interpolate (None for no limit) Returns: Data array with missing values interpolated """ assert isinstance(data, np.ndarray), "data must be a numpy ndarray" assert isinstance(method, str), "method must be a string" assert method in ['linear', 'nearest', 'cubic', 'spline'], \ "method must be one of ['linear', 'nearest', 'cubic', 'spline']" if max_gap is not None: assert isinstance(max_gap, int), "max_gap must be an integer" assert max_gap > 0, "max_gap must be positive" # Create a pandas Series for interpolation series = pd.Series(data) # Find missing value indices missing_mask = series.isna() if not missing_mask.any(): # No missing values to interpolate return data if max_gap is not None: # Find runs of missing values missing_indices = np.where(missing_mask)[0] # Find gaps between consecutive missing indices gaps = np.diff(missing_indices) # Find runs of consecutive missing values run_starts = np.where(gaps > 1)[0] + 1 run_starts = np.insert(run_starts, 0, 0) # Find runs that are too large for i in range(len(run_starts) - 1): start_idx = missing_indices[run_starts[i]] end_idx = missing_indices[run_starts[i+1] - 1] if end_idx - start_idx >= max_gap: # Don't interpolate this gap series.iloc[start_idx:end_idx+1] = np.nan # Interpolate using the specified method kwargs = {} if method in ['spline', 'polynomial'] and order is not None: kwargs['order'] = order interpolated = series.interpolate(method=method, **kwargs) # Return as numpy array return interpolated.to_numpy()
[docs] def flatten_dict( d: Dict[str, Any], parent_key: str = '', sep: str = '.' ) -> Dict[str, Any]: """ Flatten a nested dictionary. Args: d: Input dictionary to flatten parent_key: Prefix for flattened keys sep: Separator for nested keys Returns: Flattened dictionary """ assert isinstance(d, dict), "d must be a dictionary" assert isinstance(parent_key, str), "parent_key must be a string" assert isinstance(sep, str), "sep must be a string" items = [] for k, v in d.items(): assert isinstance(k, str), "All dictionary keys must be strings" new_key = parent_key + sep + k if parent_key else k if isinstance(v, dict): items.extend(flatten_dict(v, new_key, sep=sep).items()) else: items.append((new_key, v)) return dict(items)
[docs] def unflatten_dict( d: Dict[str, Any], sep: str = '.' ) -> Dict[str, Any]: """ Convert a flattened dictionary back to a nested dictionary. Args: d: Flattened dictionary sep: Separator used in flattened keys Returns: Nested dictionary """ assert isinstance(d, dict), "d must be a dictionary" assert isinstance(sep, str), "sep must be a string" assert all(isinstance(k, str) for k in d.keys()), "All dictionary keys must be strings" result = {} for key, value in d.items(): parts = key.split(sep) # Navigate to the correct nested dictionary current = result for part in parts[:-1]: if part not in current: current[part] = {} current = current[part] # Set the value current[parts[-1]] = value return result
[docs] def get_memory_usage( obj: Any = None, unit: str = 'MB' ) -> float: """ Get the memory usage of an object or the current process. Args: obj: Python object to measure (None for current process) unit: Unit for the result ('B', 'KB', 'MB', 'GB') Returns: Memory usage in the specified unit """ assert unit in ['B', 'KB', 'MB', 'GB'], "unit must be one of ['B', 'KB', 'MB', 'GB']" if obj is None: # Get memory usage of the current process import psutil process = psutil.Process(os.getpid()) memory_bytes = process.memory_info().rss else: # Get memory usage of the specified object import sys memory_bytes = sys.getsizeof(obj) # For containers, recursively add the size of their contents if isinstance(obj, (list, tuple, set, dict)): if isinstance(obj, dict): memory_bytes += sum(sys.getsizeof(k) + sys.getsizeof(v) for k, v in obj.items()) else: memory_bytes += sum(sys.getsizeof(x) for x in obj) # Convert to the requested unit if unit == 'KB': return float(memory_bytes / 1024) elif unit == 'MB': return float(memory_bytes / (1024 * 1024)) elif unit == 'GB': return float(memory_bytes / (1024 * 1024 * 1024)) else: return float(memory_bytes)