Source code for mdr.io.formats

"""
Format utilities for Macrodata Refinement (MDR).

This module provides functions for detecting, validating, and converting
between different data formats.
"""

import os
import mimetypes
from typing import Dict, List, Union, Optional, Any, Tuple
import numpy as np
import pandas as pd
from enum import Enum, auto
import json
import csv
import tempfile
from datetime import datetime
import warnings

# Initialize mimetypes
mimetypes.init()


[docs] class FormatType(Enum): """Supported data format types.""" CSV = auto() JSON = auto() EXCEL = auto() PARQUET = auto() HDF5 = auto() UNKNOWN = auto()
[docs] def detect_format(filepath: str) -> FormatType: """ Detect the format of a file based on its extension or content. Args: filepath: Path to the file Returns: Detected format type """ assert isinstance(filepath, str), "filepath must be a string" # Check if the file exists if not os.path.isfile(filepath): raise ValueError(f"File does not exist: {filepath}") # Get the file extension _, ext = os.path.splitext(filepath) ext = ext.lower() # Map extensions to format types if ext in ['.csv', '.tsv', '.txt']: return FormatType.CSV elif ext in ['.json']: return FormatType.JSON elif ext in ['.xls', '.xlsx', '.xlsm', '.xlsb']: return FormatType.EXCEL elif ext in ['.parquet']: return FormatType.PARQUET elif ext in ['.h5', '.hdf5', '.he5']: return FormatType.HDF5 # If extension is not recognized, try to identify by content try: with open(filepath, 'rb') as f: content = f.read(4096) # Read the first 4KB # Check for Excel file signatures if content.startswith(b'\x50\x4B\x03\x04') or content.startswith(b'\xD0\xCF\x11\xE0'): return FormatType.EXCEL # Check for Parquet file signature if content.startswith(b'PAR1'): return FormatType.PARQUET # Check for HDF5 file signature if content.startswith(b'\x89HDF\r\n\x1a\n'): return FormatType.HDF5 # Try to decode as text try: text_content = content.decode('utf-8') # Check for JSON format if text_content.strip().startswith('{') or text_content.strip().startswith('['): try: json.loads(text_content) return FormatType.JSON except json.JSONDecodeError: pass # Check for CSV format by detecting commas or tabs if ',' in text_content or '\t' in text_content: # Check if it has a consistent number of fields lines = text_content.split('\n') if lines: first_line_fields = len(lines[0].split(',')) consistent = all(len(line.split(',')) == first_line_fields for line in lines[1:3] if line.strip()) if consistent: return FormatType.CSV except UnicodeDecodeError: pass except IOError: pass # If we can't determine the format, return UNKNOWN return FormatType.UNKNOWN
[docs] def validate_format( filepath: str, expected_format: FormatType ) -> bool: """ Validate if a file has the expected format. Args: filepath: Path to the file expected_format: Expected format type Returns: True if the file has the expected format, False otherwise """ assert isinstance(filepath, str), "filepath must be a string" assert isinstance(expected_format, FormatType), "expected_format must be a FormatType enum" try: detected_format = detect_format(filepath) return detected_format == expected_format except Exception: return False
[docs] def convert_format( data: Dict[str, np.ndarray], source_format: FormatType, target_format: FormatType, **options ) -> bytes: """ Convert data from one format to another. Args: data: Dictionary mapping column names to data arrays source_format: Source format type target_format: Target format type **options: Additional options for the conversion Returns: Converted data as bytes """ assert isinstance(data, dict), "data must be a dictionary" assert all(isinstance(k, str) for k in data.keys()), "All keys in data must be strings" assert all(isinstance(v, np.ndarray) for v in data.values()), "All values in data must be numpy arrays" assert isinstance(source_format, FormatType), "source_format must be a FormatType enum" assert isinstance(target_format, FormatType), "target_format must be a FormatType enum" # Convert data to DataFrame df = pd.DataFrame(data) # Use a temporary file to hold the converted data with tempfile.NamedTemporaryFile(delete=False) as temp_file: temp_path = temp_file.name try: # Write to the target format if target_format == FormatType.CSV: delimiter = options.get('delimiter', ',') float_format = options.get('float_format', '%.6f') assert isinstance(delimiter, str), "delimiter must be a string" assert len(delimiter) == 1, "delimiter must be a single character" assert isinstance(float_format, str), "float_format must be a string" df.to_csv(temp_path, sep=delimiter, index=False, float_format=float_format) elif target_format == FormatType.JSON: orient = options.get('orient', 'columns') date_format = options.get('date_format', 'iso') indent = options.get('indent', 4) assert isinstance(orient, str), "orient must be a string" assert orient in ["columns", "records", "index", "split", "values"], \ "orient must be one of ['columns', 'records', 'index', 'split', 'values']" assert isinstance(date_format, str), "date_format must be a string" if indent is not None: assert isinstance(indent, int), "indent must be an integer" assert indent >= 0, "indent must be a non-negative integer" df.to_json(temp_path, orient=orient, date_format=date_format, indent=indent) elif target_format == FormatType.EXCEL: sheet_name = options.get('sheet_name', 'Sheet1') assert isinstance(sheet_name, str), "sheet_name must be a string" df.to_excel(temp_path, sheet_name=sheet_name, index=False) elif target_format == FormatType.PARQUET: compression = options.get('compression', 'snappy') assert isinstance(compression, str), "compression must be a string" df.to_parquet(temp_path, compression=compression, index=False) elif target_format == FormatType.HDF5: key = options.get('key', 'data') complevel = options.get('complevel', 9) complib = options.get('complib', 'zlib') assert isinstance(key, str), "key must be a string" if complevel is not None: assert isinstance(complevel, int), "complevel must be an integer" assert 0 <= complevel <= 9, "complevel must be an integer between 0 and 9" if complib is not None: assert isinstance(complib, str), "complib must be a string" df.to_hdf(temp_path, key=key, complevel=complevel, complib=complib) else: raise ValueError(f"Unsupported target format: {target_format}") # Read the converted data with open(temp_path, 'rb') as f: converted_data = f.read() return converted_data finally: # Clean up the temporary file if os.path.exists(temp_path): os.unlink(temp_path)
[docs] def convert_file_format( source_filepath: str, target_filepath: str, **options ) -> None: """ Convert a file from one format to another. Args: source_filepath: Path to the source file target_filepath: Path to the target file **options: Additional options for the conversion """ assert isinstance(source_filepath, str), "source_filepath must be a string" assert isinstance(target_filepath, str), "target_filepath must be a string" # Detect source format source_format = detect_format(source_filepath) if source_format == FormatType.UNKNOWN: raise ValueError(f"Could not detect format of source file: {source_filepath}") # Detect target format based on extension _, ext = os.path.splitext(target_filepath) ext = ext.lower() if ext in ['.csv', '.tsv', '.txt']: target_format = FormatType.CSV elif ext in ['.json']: target_format = FormatType.JSON elif ext in ['.xls', '.xlsx']: target_format = FormatType.EXCEL elif ext in ['.parquet']: target_format = FormatType.PARQUET elif ext in ['.h5', '.hdf5']: target_format = FormatType.HDF5 else: raise ValueError(f"Unsupported target file extension: {ext}") # Read the source file if source_format == FormatType.CSV: delimiter = options.get('source_delimiter', ',') assert isinstance(delimiter, str), "source_delimiter must be a string" assert len(delimiter) == 1, "source_delimiter must be a single character" df = pd.read_csv(source_filepath, sep=delimiter) elif source_format == FormatType.JSON: orient = options.get('source_orient', 'columns') assert isinstance(orient, str), "source_orient must be a string" assert orient in ["columns", "records", "index", "split", "values"], \ "source_orient must be one of ['columns', 'records', 'index', 'split', 'values']" df = pd.read_json(source_filepath, orient=orient) elif source_format == FormatType.EXCEL: sheet_name = options.get('source_sheet_name', 0) df = pd.read_excel(source_filepath, sheet_name=sheet_name) elif source_format == FormatType.PARQUET: df = pd.read_parquet(source_filepath) elif source_format == FormatType.HDF5: key = options.get('source_key', None) assert key is not None, "source_key must be provided for HDF5 files" assert isinstance(key, str), "source_key must be a string" df = pd.read_hdf(source_filepath, key=key) # Convert DataFrame to dictionary of numpy arrays data = {} for column in df.columns: data[str(column)] = df[column].to_numpy() # Write to the target file if target_format == FormatType.CSV: delimiter = options.get('target_delimiter', ',') float_format = options.get('float_format', '%.6f') assert isinstance(delimiter, str), "target_delimiter must be a string" assert len(delimiter) == 1, "target_delimiter must be a single character" assert isinstance(float_format, str), "float_format must be a string" df.to_csv(target_filepath, sep=delimiter, index=False, float_format=float_format) elif target_format == FormatType.JSON: orient = options.get('target_orient', 'columns') date_format = options.get('date_format', 'iso') indent = options.get('indent', 4) assert isinstance(orient, str), "target_orient must be a string" assert orient in ["columns", "records", "index", "split", "values"], \ "target_orient must be one of ['columns', 'records', 'index', 'split', 'values']" assert isinstance(date_format, str), "date_format must be a string" if indent is not None: assert isinstance(indent, int), "indent must be an integer" assert indent >= 0, "indent must be a non-negative integer" df.to_json(target_filepath, orient=orient, date_format=date_format, indent=indent) elif target_format == FormatType.EXCEL: sheet_name = options.get('target_sheet_name', 'Sheet1') assert isinstance(sheet_name, str), "target_sheet_name must be a string" df.to_excel(target_filepath, sheet_name=sheet_name, index=False) elif target_format == FormatType.PARQUET: compression = options.get('compression', 'snappy') assert isinstance(compression, str), "compression must be a string" df.to_parquet(target_filepath, compression=compression, index=False) elif target_format == FormatType.HDF5: key = options.get('target_key', 'data') complevel = options.get('complevel', 9) complib = options.get('complib', 'zlib') assert isinstance(key, str), "target_key must be a string" if complevel is not None: assert isinstance(complevel, int), "complevel must be an integer" assert 0 <= complevel <= 9, "complevel must be an integer between 0 and 9" if complib is not None: assert isinstance(complib, str), "complib must be a string" df.to_hdf(target_filepath, key=key, complevel=complevel, complib=complib)
[docs] def infer_column_types( data: pd.DataFrame ) -> Dict[str, str]: """ Infer the data types of columns in a DataFrame. Args: data: The DataFrame to analyze Returns: Dictionary mapping column names to inferred types """ assert isinstance(data, pd.DataFrame), "data must be a pandas DataFrame" type_map = {} for column in data.columns: col_data = data[column] # Check if the column is numeric if pd.api.types.is_numeric_dtype(col_data): # Check if it's integer-like if pd.api.types.is_integer_dtype(col_data) or col_data.dropna().apply(lambda x: x.is_integer()).all(): type_map[str(column)] = "integer" else: type_map[str(column)] = "float" # Check if the column is datetime-like elif pd.api.types.is_datetime64_dtype(col_data) or ( # Try to detect datetime format instead of relying on dateutil # First check if it's a string or object column (pd.api.types.is_string_dtype(col_data) or pd.api.types.is_object_dtype(col_data)) and # Then try common datetime formats try_common_datetime_formats(col_data) ): type_map[str(column)] = "datetime" # Check if the column is boolean elif pd.api.types.is_bool_dtype(col_data) or set(col_data.dropna().unique()).issubset({True, False, "True", "False", 1, 0}): type_map[str(column)] = "boolean" # Default to string else: type_map[str(column)] = "string" return type_map
[docs] def cast_column_types( data: pd.DataFrame, type_map: Dict[str, str] ) -> pd.DataFrame: """ Cast columns in a DataFrame to specified types. Args: data: The DataFrame to modify type_map: Dictionary mapping column names to target types Returns: DataFrame with columns cast to specified types """ assert isinstance(data, pd.DataFrame), "data must be a pandas DataFrame" assert isinstance(type_map, dict), "type_map must be a dictionary" assert all(isinstance(k, str) for k in type_map.keys()), "All keys in type_map must be strings" assert all(isinstance(v, str) for v in type_map.values()), "All values in type_map must be strings" # Create a copy to avoid modifying the original result = data.copy() for column, target_type in type_map.items(): if column not in result.columns: continue if target_type == "integer": # Convert to integer, replacing NaN with a sentinel value result[column] = pd.to_numeric(result[column], errors='coerce').fillna(0).astype(int) elif target_type == "float": # Convert to float result[column] = pd.to_numeric(result[column], errors='coerce').astype(float) elif target_type == "datetime": # Convert to datetime with warning suppressed with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="Could not infer format", category=UserWarning) result[column] = pd.to_datetime(result[column], errors='coerce') elif target_type == "boolean": # Convert to boolean result[column] = result[column].map({'True': True, 'False': False, '1': True, '0': False, 1: True, 0: False}) result[column] = result[column].astype(bool) elif target_type == "string": # Convert to string result[column] = result[column].astype(str) return result
[docs] def is_numeric_column(data: np.ndarray) -> bool: """ Check if a numpy array contains numeric data. Args: data: The array to check Returns: True if the array contains numeric data, False otherwise """ assert isinstance(data, np.ndarray), "data must be a numpy ndarray" # Check if the array's dtype is numeric if np.issubdtype(data.dtype, np.number): return True # For object dtypes, try to convert to float and check for success if data.dtype == np.dtype('O'): try: numeric_data = pd.to_numeric(data, errors='coerce') # If we have more than 80% non-NaN values after conversion, consider it numeric return numeric_data.notna().mean() > 0.8 except Exception: return False return False
[docs] def try_common_datetime_formats(col_data: pd.Series) -> bool: """ Try to parse a column with common datetime formats. Args: col_data: The pandas Series to check Returns: True if the column contains datetime data, False otherwise """ # Get a sample of the column (up to 100 non-null values) to check formats sample = col_data.dropna().head(100) if len(sample) == 0: return False # Common datetime formats to try formats = [ '%Y-%m-%d', # 2023-01-31 '%Y/%m/%d', # 2023/01/31 '%d-%m-%Y', # 31-01-2023 '%d/%m/%Y', # 31/01/2023 '%m-%d-%Y', # 01-31-2023 '%m/%d/%Y', # 01/31/2023 '%Y-%m-%d %H:%M:%S', # 2023-01-31 14:30:45 '%Y-%m-%d %H:%M', # 2023-01-31 14:30 '%Y/%m/%d %H:%M:%S', # 2023/01/31 14:30:45 '%d-%m-%Y %H:%M:%S', # 31-01-2023 14:30:45 '%d/%m/%Y %H:%M:%S', # 31/01/2023 14:30:45 '%m-%d-%Y %H:%M:%S', # 01-31-2023 14:30:45 '%m/%d/%Y %H:%M:%S', # 01/31/2023 14:30:45 '%Y-%m-%dT%H:%M:%S', # 2023-01-31T14:30:45 (ISO format) '%Y-%m-%dT%H:%M:%S.%f', # 2023-01-31T14:30:45.123456 '%Y%m%d', # 20230131 '%Y%m%d%H%M%S', # 20230131143045 ] # Try each format for fmt in formats: try: # Try to parse all sample values with this format success_count = 0 for val in sample: try: if isinstance(val, str): datetime.strptime(val, fmt) success_count += 1 except ValueError: pass # If more than 90% of values match this format, consider it a datetime column if success_count / len(sample) > 0.9: return True except Exception: continue # If no format was good enough, fall back to pandas with warning suppressed try: # Suppress the specific warning about format inference with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="Could not infer format", category=UserWarning) datetime_data = pd.to_datetime(sample, errors='coerce') # If we have more than 80% non-NaN values after conversion, consider it datetime return datetime_data.notna().mean() > 0.8 except Exception: return False
[docs] def is_datetime_column(data: np.ndarray) -> bool: """ Check if a numpy array contains datetime data. Args: data: The array to check Returns: True if the array contains datetime data, False otherwise """ assert isinstance(data, np.ndarray), "data must be a numpy ndarray" # Check if the array's dtype is datetime if np.issubdtype(data.dtype, np.datetime64): return True # For object dtypes, try common datetime formats if data.dtype == np.dtype('O'): try: # Convert to pandas Series for easier handling series = pd.Series(data) return try_common_datetime_formats(series) except Exception: return False return False