"""
Data writers for Macrodata Refinement (MDR).
This module provides functions and classes for writing macrodata
to various file formats.
"""
import os
from typing import Dict, List, Union, Optional, Any, Tuple, Type, BinaryIO
from abc import ABC, abstractmethod
import numpy as np
import pandas as pd
from enum import Enum, auto
import json
import csv
[docs]
class DataDestination(Enum):
"""Types of data destinations."""
FILE = auto()
DATABASE = auto()
API = auto()
MEMORY = auto()
[docs]
class DataWriter(ABC):
"""Abstract base class for data writers."""
[docs]
def __init__(self, dest_type: DataDestination = DataDestination.FILE):
"""
Initialize the data writer.
Args:
dest_type: Type of data destination
"""
assert isinstance(dest_type, DataDestination), "dest_type must be a DataDestination enum"
self.dest_type = dest_type
[docs]
@abstractmethod
def write(self, data: Dict[str, np.ndarray], destination: str, **options) -> None:
"""
Write data to the destination.
Args:
data: Dictionary mapping variable names to data arrays
destination: Destination identifier (file path, table name, etc.)
**options: Additional writing options
"""
pass
[docs]
@abstractmethod
def validate_destination(self, destination: str) -> bool:
"""
Validate if the destination can be written to.
Args:
destination: Destination identifier
Returns:
True if the destination is valid, False otherwise
"""
pass
[docs]
class FileWriter(DataWriter):
"""Base class for file-based data writers."""
[docs]
def __init__(self, encoding: str = "utf-8", overwrite: bool = False):
"""
Initialize the file writer.
Args:
encoding: File encoding
overwrite: Whether to overwrite existing files
"""
super().__init__(DataDestination.FILE)
assert isinstance(encoding, str), "encoding must be a string"
assert isinstance(overwrite, bool), "overwrite must be a boolean"
self.encoding = encoding
self.overwrite = overwrite
[docs]
def validate_destination(self, destination: str) -> bool:
"""
Validate if the file can be written to.
Args:
destination: File path
Returns:
True if the file is valid, False otherwise
"""
assert isinstance(destination, str), "destination must be a string"
# Check if the directory exists
dir_path = os.path.dirname(destination)
if dir_path and not os.path.isdir(dir_path):
return False
# Check if the file exists and if overwriting is allowed
if os.path.exists(destination):
if not self.overwrite:
return False
if not os.access(destination, os.W_OK):
return False
return True
[docs]
class CSVWriter(FileWriter):
"""Writer for CSV files."""
[docs]
def __init__(
self,
delimiter: str = ",",
quotechar: str = '"',
encoding: str = "utf-8",
overwrite: bool = False
):
"""
Initialize the CSV writer.
Args:
delimiter: Field delimiter
quotechar: Character for quoting fields
encoding: File encoding
overwrite: Whether to overwrite existing files
"""
super().__init__(encoding=encoding, overwrite=overwrite)
assert isinstance(delimiter, str), "delimiter must be a string"
assert isinstance(quotechar, str), "quotechar must be a string"
assert len(delimiter) == 1, "delimiter must be a single character"
assert len(quotechar) == 1, "quotechar must be a single character"
self.delimiter = delimiter
self.quotechar = quotechar
[docs]
def write(
self,
data: Dict[str, np.ndarray],
destination: str,
index: bool = False,
float_format: Optional[str] = "%.6f",
date_format: Optional[str] = None,
**options
) -> None:
"""
Write data to a CSV file.
Args:
data: Dictionary mapping column names to data arrays
destination: File path
index: Whether to write row indices
float_format: Format string for float values
date_format: Format string for date values
**options: Additional pandas.to_csv options
"""
assert isinstance(data, dict), "data must be a dictionary"
assert all(isinstance(k, str) for k in data.keys()), "All keys in data must be strings"
assert all(isinstance(v, np.ndarray) for v in data.values()), "All values in data must be numpy arrays"
assert isinstance(destination, str), "destination must be a string"
assert isinstance(index, bool), "index must be a boolean"
if float_format is not None:
assert isinstance(float_format, str), "float_format must be a string"
if date_format is not None:
assert isinstance(date_format, str), "date_format must be a string"
if not self.validate_destination(destination):
raise ValueError(f"Invalid or inaccessible destination: {destination}")
# Convert dictionary of arrays to DataFrame
df = pd.DataFrame(data)
# Write to CSV
df.to_csv(
destination,
sep=self.delimiter,
index=index,
quotechar=self.quotechar,
encoding=self.encoding,
float_format=float_format,
date_format=date_format,
**options
)
[docs]
class JSONWriter(FileWriter):
"""Writer for JSON files."""
[docs]
def write(
self,
data: Dict[str, np.ndarray],
destination: str,
orient: str = "columns",
date_format: str = "iso",
indent: Optional[int] = 4,
**options
) -> None:
"""
Write data to a JSON file.
Args:
data: Dictionary mapping column names to data arrays
destination: File path
orient: JSON format, one of ['columns', 'records', 'index', 'split', 'values']
date_format: Format for date values
indent: Number of spaces for indentation (None for no indentation)
**options: Additional pandas.to_json options
"""
assert isinstance(data, dict), "data must be a dictionary"
assert all(isinstance(k, str) for k in data.keys()), "All keys in data must be strings"
assert all(isinstance(v, np.ndarray) for v in data.values()), "All values in data must be numpy arrays"
assert isinstance(destination, str), "destination must be a string"
assert isinstance(orient, str), "orient must be a string"
assert orient in ["columns", "records", "index", "split", "values"], \
"orient must be one of ['columns', 'records', 'index', 'split', 'values']"
assert isinstance(date_format, str), "date_format must be a string"
if indent is not None:
assert isinstance(indent, int), "indent must be an integer"
assert indent >= 0, "indent must be a non-negative integer"
if not self.validate_destination(destination):
raise ValueError(f"Invalid or inaccessible destination: {destination}")
# Convert dictionary of arrays to DataFrame
df = pd.DataFrame(data)
# Write to JSON
df.to_json(
destination,
orient=orient,
date_format=date_format,
indent=indent,
**options
)
[docs]
class ExcelWriter(FileWriter):
"""Writer for Excel files."""
[docs]
def write(
self,
data: Dict[str, np.ndarray],
destination: str,
sheet_name: str = "Sheet1",
float_format: Optional[str] = "%.6f",
freeze_panes: Optional[Tuple[int, int]] = None,
**options
) -> None:
"""
Write data to an Excel file.
Args:
data: Dictionary mapping column names to data arrays
destination: File path
sheet_name: Name of the sheet
float_format: Format string for float values
freeze_panes: Tuple of (rows, cols) to freeze
**options: Additional pandas.to_excel options
"""
assert isinstance(data, dict), "data must be a dictionary"
assert all(isinstance(k, str) for k in data.keys()), "All keys in data must be strings"
assert all(isinstance(v, np.ndarray) for v in data.values()), "All values in data must be numpy arrays"
assert isinstance(destination, str), "destination must be a string"
assert isinstance(sheet_name, str), "sheet_name must be a string"
if float_format is not None:
assert isinstance(float_format, str), "float_format must be a string"
if freeze_panes is not None:
assert isinstance(freeze_panes, tuple), "freeze_panes must be a tuple"
assert len(freeze_panes) == 2, "freeze_panes must be a tuple of length 2"
assert isinstance(freeze_panes[0], int), "freeze_panes[0] must be an integer"
assert isinstance(freeze_panes[1], int), "freeze_panes[1] must be an integer"
assert freeze_panes[0] >= 0, "freeze_panes[0] must be a non-negative integer"
assert freeze_panes[1] >= 0, "freeze_panes[1] must be a non-negative integer"
if not self.validate_destination(destination):
raise ValueError(f"Invalid or inaccessible destination: {destination}")
# Convert dictionary of arrays to DataFrame
df = pd.DataFrame(data)
# Write to Excel
df.to_excel(
destination,
sheet_name=sheet_name,
float_format=float_format,
freeze_panes=freeze_panes,
**options
)
[docs]
class ParquetWriter(FileWriter):
"""Writer for Parquet files."""
[docs]
def write(
self,
data: Dict[str, np.ndarray],
destination: str,
compression: str = "snappy",
index: bool = False,
**options
) -> None:
"""
Write data to a Parquet file.
Args:
data: Dictionary mapping column names to data arrays
destination: File path
compression: Compression method
index: Whether to include row indices
**options: Additional pandas.to_parquet options
"""
assert isinstance(data, dict), "data must be a dictionary"
assert all(isinstance(k, str) for k in data.keys()), "All keys in data must be strings"
assert all(isinstance(v, np.ndarray) for v in data.values()), "All values in data must be numpy arrays"
assert isinstance(destination, str), "destination must be a string"
assert isinstance(compression, str), "compression must be a string"
assert isinstance(index, bool), "index must be a boolean"
if not self.validate_destination(destination):
raise ValueError(f"Invalid or inaccessible destination: {destination}")
try:
# Convert dictionary of arrays to DataFrame
df = pd.DataFrame(data)
# Write to Parquet
df.to_parquet(
destination,
compression=compression,
index=index,
**options
)
except ImportError:
raise ImportError("pyarrow or fastparquet is required for writing Parquet files")
[docs]
class HDF5Writer(FileWriter):
"""Writer for HDF5 files."""
[docs]
def write(
self,
data: Dict[str, np.ndarray],
destination: str,
key: str,
mode: str = "a",
complevel: Optional[int] = 9,
complib: Optional[str] = "zlib",
**options
) -> None:
"""
Write data to an HDF5 file.
Args:
data: Dictionary mapping column names to data arrays
destination: File path
key: Group identifier in the HDF5 file
mode: File open mode ('a' for append, 'w' for write)
complevel: Compression level (0-9, 0 for no compression)
complib: Compression library
**options: Additional pandas.to_hdf options
"""
assert isinstance(data, dict), "data must be a dictionary"
assert all(isinstance(k, str) for k in data.keys()), "All keys in data must be strings"
assert all(isinstance(v, np.ndarray) for v in data.values()), "All values in data must be numpy arrays"
assert isinstance(destination, str), "destination must be a string"
assert isinstance(key, str), "key must be a string"
assert isinstance(mode, str), "mode must be a string"
assert mode in ["a", "w"], "mode must be 'a' (append) or 'w' (write)"
if complevel is not None:
assert isinstance(complevel, int), "complevel must be an integer"
assert 0 <= complevel <= 9, "complevel must be an integer between 0 and 9"
if complib is not None:
assert isinstance(complib, str), "complib must be a string"
if not self.validate_destination(destination):
raise ValueError(f"Invalid or inaccessible destination: {destination}")
try:
# Convert dictionary of arrays to DataFrame
df = pd.DataFrame(data)
# Write to HDF5
df.to_hdf(
destination,
key=key,
mode=mode,
complevel=complevel,
complib=complib,
**options
)
except ImportError:
raise ImportError("tables is required for writing HDF5 files")
# Factory function to create writers for different file types
[docs]
def get_writer(file_type: str, **options) -> DataWriter:
"""
Get a writer for the specified file type.
Args:
file_type: Type of file ('csv', 'json', 'excel', 'parquet', 'hdf5')
**options: Additional options for the writer
Returns:
Appropriate DataWriter instance
"""
assert isinstance(file_type, str), "file_type must be a string"
file_type = file_type.lower()
if file_type == 'csv':
return CSVWriter(**options)
elif file_type == 'json':
return JSONWriter(**options)
elif file_type == 'excel' or file_type == 'xlsx' or file_type == 'xls':
return ExcelWriter(**options)
elif file_type == 'parquet':
return ParquetWriter(**options)
elif file_type == 'hdf5' or file_type == 'h5':
return HDF5Writer(**options)
else:
raise ValueError(f"Unsupported file type: {file_type}")
# Convenience functions for writing data to different file formats
[docs]
def write_csv(
data: Dict[str, np.ndarray],
filepath: str,
delimiter: str = ",",
float_format: str = "%.6f",
**options
) -> None:
"""
Write data to a CSV file.
Args:
data: Dictionary mapping column names to data arrays
filepath: Path to the CSV file
delimiter: Field delimiter
float_format: Format string for float values
**options: Additional writing options
"""
assert isinstance(data, dict), "data must be a dictionary"
assert all(isinstance(k, str) for k in data.keys()), "All keys in data must be strings"
assert all(isinstance(v, np.ndarray) for v in data.values()), "All values in data must be numpy arrays"
assert isinstance(filepath, str), "filepath must be a string"
assert isinstance(delimiter, str), "delimiter must be a string"
assert len(delimiter) == 1, "delimiter must be a single character"
assert isinstance(float_format, str), "float_format must be a string"
writer = CSVWriter(delimiter=delimiter, overwrite=True)
writer.write(data, filepath, float_format=float_format, **options)
[docs]
def write_json(
data: Dict[str, np.ndarray],
filepath: str,
orient: str = "columns",
**options
) -> None:
"""
Write data to a JSON file.
Args:
data: Dictionary mapping column names to data arrays
filepath: Path to the JSON file
orient: JSON format
**options: Additional writing options
"""
assert isinstance(data, dict), "data must be a dictionary"
assert all(isinstance(k, str) for k in data.keys()), "All keys in data must be strings"
assert all(isinstance(v, np.ndarray) for v in data.values()), "All values in data must be numpy arrays"
assert isinstance(filepath, str), "filepath must be a string"
assert isinstance(orient, str), "orient must be a string"
assert orient in ["columns", "records", "index", "split", "values"], \
"orient must be one of ['columns', 'records', 'index', 'split', 'values']"
writer = JSONWriter(overwrite=True)
writer.write(data, filepath, orient=orient, **options)
[docs]
def write_excel(
data: Dict[str, np.ndarray],
filepath: str,
sheet_name: str = "Sheet1",
**options
) -> None:
"""
Write data to an Excel file.
Args:
data: Dictionary mapping column names to data arrays
filepath: Path to the Excel file
sheet_name: Name of the sheet
**options: Additional writing options
"""
assert isinstance(data, dict), "data must be a dictionary"
assert all(isinstance(k, str) for k in data.keys()), "All keys in data must be strings"
assert all(isinstance(v, np.ndarray) for v in data.values()), "All values in data must be numpy arrays"
assert isinstance(filepath, str), "filepath must be a string"
assert isinstance(sheet_name, str), "sheet_name must be a string"
writer = ExcelWriter(overwrite=True)
writer.write(data, filepath, sheet_name=sheet_name, **options)
[docs]
def write_parquet(
data: Dict[str, np.ndarray],
filepath: str,
compression: str = "snappy",
**options
) -> None:
"""
Write data to a Parquet file.
Args:
data: Dictionary mapping column names to data arrays
filepath: Path to the Parquet file
compression: Compression method
**options: Additional writing options
"""
assert isinstance(data, dict), "data must be a dictionary"
assert all(isinstance(k, str) for k in data.keys()), "All keys in data must be strings"
assert all(isinstance(v, np.ndarray) for v in data.values()), "All values in data must be numpy arrays"
assert isinstance(filepath, str), "filepath must be a string"
assert isinstance(compression, str), "compression must be a string"
writer = ParquetWriter(overwrite=True)
writer.write(data, filepath, compression=compression, **options)
[docs]
def write_hdf5(
data: Dict[str, np.ndarray],
filepath: str,
key: str,
**options
) -> None:
"""
Write data to an HDF5 file.
Args:
data: Dictionary mapping column names to data arrays
filepath: Path to the HDF5 file
key: Group identifier in the HDF5 file
**options: Additional writing options
"""
assert isinstance(data, dict), "data must be a dictionary"
assert all(isinstance(k, str) for k in data.keys()), "All keys in data must be strings"
assert all(isinstance(v, np.ndarray) for v in data.values()), "All values in data must be numpy arrays"
assert isinstance(filepath, str), "filepath must be a string"
assert isinstance(key, str), "key must be a string"
writer = HDF5Writer(overwrite=True)
writer.write(data, filepath, key=key, **options)