"""Text, CSV, and JSON parsing helpers.
The centerpiece is :func:`csv_load`, which streams rows from any
:data:`TextProvider` into plain dictionaries or typed dataclass instances,
with optional column-name normalization (via
:class:`sevaht_utility.naming.NameStyle`) and explicit field mapping for
awkward headers. Supporting utilities include :func:`get_text` /
:func:`open_text` for uniform text access, :class:`StringParser` for
string-to-value conversion, and :func:`json5_load` for JSON with comments and
trailing commas.
"""
from __future__ import annotations
import csv
import json
import logging
import os
import re
import threading
from collections.abc import Callable, Iterable, Iterator, Mapping, Sequence
from contextlib import contextmanager
from dataclasses import dataclass, field, is_dataclass
from functools import cache
from io import StringIO
from pathlib import Path
from types import MappingProxyType, UnionType
from typing import Any, TextIO, TypeVar, cast, overload
from .hinting import get_callable_argument_hints, iterate_types, verify_type
from .naming import NameStyle, convert_name
logger = logging.getLogger(__name__)
T = TypeVar("T")
type TextProvider = str | Path | TextIO | list[str]
type StringConverter = Callable[[str], object]
type JsonValue = (
dict[str, JsonValue] | list[JsonValue] | str | int | float | bool | None
)
[docs]
def get_text(source: TextProvider) -> str:
"""Return the full text from any supported TextProvider."""
if isinstance(source, str):
return source
if isinstance(source, list):
return os.linesep.join(source)
if isinstance(source, Path):
return source.read_text(encoding="utf-8")
return source.read() # TextIO
[docs]
@contextmanager
def open_text(source: TextProvider) -> Iterator[TextIO]:
"""Yield a readable TextIO. Must always be used as a context manager."""
if isinstance(source, Path):
yield source.open(encoding="utf-8")
elif isinstance(source, (str, list)):
yield StringIO(get_text(source))
else:
yield source # TextIO; already open, do not close
[docs]
def parse_bool(value: str) -> bool:
"""Parse a string as a boolean.
Args:
value: The string to parse.
Returns:
``True`` if ``value`` is (case-insensitively) one of ``"1"``,
``"true"``, or ``"yes"``; otherwise ``False``.
"""
return value.lower() in ("1", "true", "yes")
@cache
def default_string_converters() -> Mapping[type[Any], StringConverter]:
return MappingProxyType(
{Any: str, str: str, int: int, float: float, bool: parse_bool}
)
[docs]
class StringParserError(TypeError):
def __init__(self, value: object) -> None:
super().__init__(f"Could not parse string: {value}")
self.value = value
[docs]
@dataclass
class StringParser:
_CONVERTERS: dict[type[Any], StringConverter] = field(
init=False, default_factory=lambda: dict(default_string_converters())
)
_CONVERTER_LOCK: threading.Lock = field(
init=False, default_factory=threading.Lock
)
@staticmethod
@cache
def default() -> StringParser:
return StringParser()
def converters(
self, target: type[Any] | UnionType
) -> list[tuple[StringConverter, type[Any]]]:
converters: list[tuple[StringConverter, type[Any]]] = []
with self._CONVERTER_LOCK:
for candidate_type in iterate_types(target):
if (
converter := self._CONVERTERS.get(candidate_type)
) is None and callable(
method := getattr(candidate_type, "from_string", None)
):
self._CONVERTERS[candidate_type] = converter = method
if converter is not None:
converters.append((converter, candidate_type))
else:
logger.debug(
f"Skipping type without converter: {candidate_type}"
)
return converters
def parse(self, source: TextProvider, *, target: type[T]) -> T:
# first_valid_conversion verifies the cast to a specific type
# whereas T might be a Union here.
source = get_text(source)
try:
return cast(
"T",
type(self).first_valid_conversion(
source, converters=self.converters(target)
),
)
except StringParserError:
logger.exception(f"Could not parse to {target}: {source}")
raise
@staticmethod
def first_valid_conversion(
source: TextProvider,
*,
converters: Iterable[tuple[StringConverter, type[Any]]],
) -> object:
source = get_text(source)
for converter, converter_type in converters:
try:
return verify_type(converter_type, converter(source))
except Exception: # noqa: BLE001
# catching bare exception because user-provided converters may
# raise anything; this code cannot control that.
logger.debug(
f"Failed to convert to {converter_type}: {source}"
)
raise StringParserError(source)
def set_converter(
self, target: type[T], *, converter: StringConverter
) -> None:
with self._CONVERTER_LOCK:
for candidate_type in iterate_types(target):
self._CONVERTERS[candidate_type] = converter
[docs]
class UnconsumedColumnsError(Exception):
def __init__(self, columns: Sequence[str]) -> None:
super().__init__(
f"{len(columns)} columns were not consumed: {', '.join(columns)}"
)
[docs]
class NotADataclassError(TypeError):
"""Raised when an argument expected to be a dataclass is not one."""
def __init__(self, obj: object) -> None:
super().__init__(f"Dataclass argument isn't a dataclass: {obj}")
self.obj = obj
[docs]
class ShortRowError(ValueError):
"""Raised when a CSV row has too few columns to fill a mapped field."""
def __init__(
self,
*,
line_number: int,
field_name: str,
column_index: int,
column_count: int,
) -> None:
super().__init__(
f"Row at line {line_number} has {column_count} column(s), "
f"too few to read field {field_name!r} from column index "
f"{column_index}."
)
self.line_number = line_number
self.field_name = field_name
self.column_index = column_index
self.column_count = column_count
[docs]
@dataclass
class DataMapping:
"""How CSV columns map onto target fields in :func:`csv_load`.
Every attribute is optional; an empty ``DataMapping`` lets ``csv_load``
match columns to fields by name. Provide attributes to override that
matching for awkward or ambiguous headers. When several apply, the
precedence (highest first) is ``field_to_column_index`` ->
``field_to_column_name`` -> field/parameter names -> dataclass metadata ->
raw column names.
Attributes:
column_names: Column names to use instead of reading a header row.
Supply this when the data has no header, or to override/rename the
existing header positionally.
field_to_column_name: Maps each target field to the source column name
it should read. Use for headers whose text differs from the field
name (e.g. ``{"user_id": "acct#"}``).
field_to_column_index: Maps each target field to a zero-based column
index. Highest precedence; use to disambiguate duplicate headers
(e.g. two ``"a"`` columns) or to bypass name matching entirely.
name_style: When set, both source column names and target field names
are normalized to this :class:`~sevaht_utility.naming.NameStyle`
before matching, so a ``camelCase`` header can feed a
``snake_case`` field. The target names are normalized too on
purpose: it lets your dataclass keep idiomatic PEP 8
``snake_case`` members no matter how the file is cased, instead of
renaming fields to match the header. Normalization that makes two
columns collide raises :class:`AmbiguousColumnNamesError`.
"""
column_names: Sequence[str] | None = None
field_to_column_name: Mapping[str, str] | None = None
field_to_column_index: Mapping[str, int] | None = None
name_style: NameStyle | None = None
[docs]
class AmbiguousColumnNamesError(ValueError):
def __init__(
self, *, canonical_name: str, columns: Sequence[tuple[int, str]]
) -> None:
formatted_columns = ", ".join(
f"{name!r} at index {index}" for index, name in columns
)
super().__init__(
"Ambiguous column names after normalization "
f"for key {canonical_name!r}: {formatted_columns}. "
"Use DataMapping.column_names or DataMapping.field_to_column_index "
"to disambiguate."
)
self.canonical_name = canonical_name
self.columns = columns
[docs]
class AmbiguousFieldMappingsError(ValueError):
def __init__(self, *, canonical_name: str, fields: Sequence[str]) -> None:
super().__init__(
"Ambiguous field mappings after normalization "
f"for key {canonical_name!r}: {', '.join(fields)}. "
"Use DataMapping.field_to_column_index or distinct names to "
"disambiguate."
)
self.canonical_name = canonical_name
self.fields = fields
[docs]
class ColumnIndexOutOfRangeError(ValueError):
def __init__(
self, *, field_name: str, column_index: int, column_count: int
) -> None:
super().__init__(
f"Column index out of range for field {field_name!r}: "
f"{column_index} (column count: {column_count})"
)
self.field_name = field_name
self.column_index = column_index
self.column_count = column_count
[docs]
@dataclass
class CsvLoadOptions:
"""Tuning options for :func:`csv_load` (the *how*, not the *what*).
Attributes:
delimiter: Field delimiter passed to the underlying CSV reader.
field_metadata_key: Dataclass field-metadata key consulted for a custom
column name, i.e. ``field(metadata={field_metadata_key: "Header"})``.
allow_column_subset: If ``True`` (default), columns with no matching
field are ignored. If ``False``, an unmatched column raises
:class:`UnconsumedColumnsError`.
string_parser: The :class:`StringParser` used to convert cell strings
to field types. Defaults to the shared :meth:`StringParser.default`
instance.
"""
delimiter: str = ","
field_metadata_key: str = "csv_key"
allow_column_subset: bool = True
string_parser: StringParser = field(default_factory=StringParser.default)
type FieldMapping = Mapping[str, str]
type FieldToColumnIndexMapping = Mapping[str, int]
type FieldConverters = list[tuple[StringConverter, type[Any]]]
type FieldIndexAndConverters = dict[str, tuple[int, FieldConverters]]
type CanonicalColumnIndices = dict[str, list[int]]
type AmbiguousColumns = dict[str, list[tuple[int, str]]]
[docs]
@dataclass(frozen=True)
class ColumnResolution:
resolved_indices: Mapping[str, int]
ambiguous_columns: AmbiguousColumns
column_count: int
mapping: DataMapping
def _convert_name_if_needed(value: str, *, mapping: DataMapping) -> str:
return (
convert_name(value, style=mapping.name_style)
if mapping.name_style
else value
)
def _build_canonical_column_indices(
*, column_names: Sequence[str], mapping: DataMapping
) -> CanonicalColumnIndices:
canonical_column_indices: CanonicalColumnIndices = {}
for index, column_name in enumerate(column_names):
canonical_name = _convert_name_if_needed(column_name, mapping=mapping)
canonical_column_indices.setdefault(canonical_name, []).append(index)
return canonical_column_indices
def _split_ambiguous_columns(
*,
column_names: Sequence[str],
canonical_column_indices: CanonicalColumnIndices,
) -> tuple[dict[str, int], AmbiguousColumns]:
resolved_column_indices = {
name: indices[0]
for name, indices in canonical_column_indices.items()
if len(indices) == 1
}
ambiguous_columns = {
canonical_name: [(index, column_names[index]) for index in indices]
for canonical_name, indices in canonical_column_indices.items()
if len(indices) > 1
}
return resolved_column_indices, ambiguous_columns
def _resolve_field_to_column_name(
*,
type_hints: Mapping[str, type[Any]],
field_to_column_name: FieldMapping | None,
mapping: DataMapping,
options: CsvLoadOptions,
dataclass_type: type[Any] | None,
) -> FieldMapping:
if field_to_column_name is not None:
return field_to_column_name
if dataclass_type is None:
return {
key: _convert_name_if_needed(key, mapping=mapping)
for key in type_hints
}
return {
name: _convert_name_if_needed(
dataclass_type.__dataclass_fields__[name].metadata.get(
options.field_metadata_key, name
),
mapping=mapping,
)
for name in type_hints
}
def _build_field_indices_and_converters(
*,
field_to_column_name: Mapping[str, str],
field_to_column_index: FieldToColumnIndexMapping | None,
column_resolution: ColumnResolution,
type_hints: Mapping[str, type[Any]],
string_parser: StringParser,
) -> FieldIndexAndConverters:
field_to_canonical_name: dict[str, str] = {}
field_to_index: dict[str, int] = {}
for field_name, column_name in field_to_column_name.items():
if (
field_to_column_index is not None
and (field_index := field_to_column_index.get(field_name))
is not None
):
if (
field_index < 0
or field_index >= column_resolution.column_count
):
raise ColumnIndexOutOfRangeError(
field_name=field_name,
column_index=field_index,
column_count=column_resolution.column_count,
)
field_to_index[field_name] = field_index
continue
canonical_name = _convert_name_if_needed(
column_name, mapping=column_resolution.mapping
)
field_to_canonical_name[field_name] = canonical_name
if ambiguous_name_columns := column_resolution.ambiguous_columns.get(
canonical_name
):
raise AmbiguousColumnNamesError(
canonical_name=canonical_name, columns=ambiguous_name_columns
)
if (
index := column_resolution.resolved_indices.get(canonical_name)
) is not None:
field_to_index[field_name] = index
canonical_to_fields: dict[str, list[str]] = {}
for field_name, canonical_name in field_to_canonical_name.items():
canonical_to_fields.setdefault(canonical_name, []).append(field_name)
if ambiguous_fields := [
(canonical_name, fields)
for canonical_name, fields in canonical_to_fields.items()
if len(fields) > 1
]:
canonical_name, fields = ambiguous_fields[0]
raise AmbiguousFieldMappingsError(
canonical_name=canonical_name, fields=fields
)
return {
field_name: (
index,
string_parser.converters(type_hints.get(field_name, str)),
)
for field_name, index in field_to_index.items()
}
def _assert_or_allow_unconsumed_columns(
*,
column_names: Sequence[str],
field_to_index_and_converters: FieldIndexAndConverters,
allow_column_subset: bool,
) -> None:
consumed_indices = {i for i, _ in field_to_index_and_converters.values()}
if len(consumed_indices) >= len(column_names):
return
unconsumed_column_names = [
name
for i, name in enumerate(column_names)
if i not in consumed_indices
]
if not unconsumed_column_names:
return
if not allow_column_subset:
raise UnconsumedColumnsError(unconsumed_column_names)
def _resolve_loader(
*,
dataclass: type[Any] | None,
init_function: Callable[..., object] | None,
column_names: Sequence[str],
mapping: DataMapping,
options: CsvLoadOptions,
) -> tuple[Callable[..., object], dict[str, type[Any]], FieldMapping]:
"""Resolve the row factory, its argument type hints, and field mapping.
Centralizes the dataclass-versus-dict branching so `csv_load` stays a
straightforward read-resolve-iterate pipeline.
"""
field_to_column_name = mapping.field_to_column_name
type_hints = (
get_callable_argument_hints(init_function)
if init_function is not None
else None
)
resolved_init_function: Callable[..., object] | None = init_function
if dataclass is not None:
if not is_dataclass(dataclass):
raise NotADataclassError(dataclass)
resolved_init_function = resolved_init_function or dataclass
type_hints = type_hints or get_callable_argument_hints(dataclass)
if field_to_column_name is None and init_function is not None:
field_to_column_name = {
key: _convert_name_if_needed(key, mapping=mapping)
for key in type_hints
}
else:
field_to_column_name = _resolve_field_to_column_name(
type_hints=type_hints,
field_to_column_name=field_to_column_name,
mapping=mapping,
options=options,
dataclass_type=dataclass,
)
else:
resolved_init_function = resolved_init_function or dict
type_hints = type_hints or {}
field_to_column_name = field_to_column_name or {
column_name: _convert_name_if_needed(column_name, mapping=mapping)
for column_name in column_names
}
return resolved_init_function, type_hints, field_to_column_name
def _convert_row(
*,
row: Sequence[str],
line_number: int,
field_to_index_and_converters: FieldIndexAndConverters,
) -> dict[str, object]:
first_valid_conversion = StringParser.first_valid_conversion
values: dict[str, object] = {}
for name, (index, converters) in field_to_index_and_converters.items():
if index >= len(row):
raise ShortRowError(
line_number=line_number,
field_name=name,
column_index=index,
column_count=len(row),
)
values[name] = first_valid_conversion(
row[index], converters=converters
)
return values
@overload # dict case, no init_function for type hints; dict[str, str]
def csv_load(
source: TextProvider,
*,
dataclass: None = ...,
init_function: None = None,
mapping: DataMapping | None = ...,
options: CsvLoadOptions | None = ...,
) -> Iterator[dict[str, str]]: ...
@overload # dict case, YES init_function for type hints; dict[str, object]
def csv_load(
source: TextProvider,
*,
dataclass: None = None,
init_function: Callable[..., dict[str, object]], # REQUIRED
mapping: DataMapping | None = ...,
options: CsvLoadOptions | None = ...,
) -> Iterator[dict[str, object]]: ...
@overload # dataclass case
def csv_load[T](
source: TextProvider,
*,
dataclass: type[T], # REQUIRED
init_function: Callable[..., T] | None = ...,
mapping: DataMapping | None = ...,
options: CsvLoadOptions | None = ...,
) -> Iterator[T]: ...
[docs]
def csv_load[T](
source: TextProvider,
*,
dataclass: type[T] | None = None,
init_function: Callable[..., object] | None = None,
mapping: DataMapping | None = None,
options: CsvLoadOptions | None = None,
) -> Iterator[T] | Iterator[dict[str, str]] | Iterator[dict[str, object]]:
"""Stream CSV rows as dictionaries or typed dataclass instances.
Rows are yielded lazily, so very large inputs are processed without being
held in memory. Blank lines are skipped. With no ``dataclass`` the result
is a dict per row; with a ``dataclass`` each row becomes an instance, its
cells converted to the annotated field types by ``options.string_parser``.
A field type may define a ``from_string(cls, s)`` classmethod to control
its own conversion.
Columns are matched to fields by name. Override that for awkward headers
via ``mapping``; the precedence, highest first, is:
1. ``mapping.field_to_column_index`` (explicit zero-based index)
2. ``mapping.field_to_column_name`` (explicit source column name)
3. ``init_function`` parameter names
4. Dataclass field metadata (``options.field_metadata_key``) or field name
5. Dict mode: the raw column names
When ``mapping.name_style`` is set, both source and target names are
normalized to that style before matching (e.g. a ``camelCase`` header
feeding a ``snake_case`` field).
Args:
source: Any :data:`TextProvider` (string, ``Path``, open text stream,
or list of lines).
dataclass: When given, each row is built into an instance of this type.
init_function: A factory called with the resolved field values instead
of the dataclass constructor; its parameter names drive matching.
mapping: Column-to-field mapping overrides. See :class:`DataMapping`.
options: Reader/conversion tuning. See :class:`CsvLoadOptions`.
Yields:
``dict[str, str]`` per row in dict mode, or one ``dataclass`` instance
per row otherwise.
Raises:
NotADataclassError: ``dataclass`` is not a dataclass type.
AmbiguousColumnNamesError: Normalization collapses two columns onto one
name that a field needs.
ColumnIndexOutOfRangeError: A ``field_to_column_index`` entry is out of
range for the header.
ShortRowError: A row has too few columns to fill a mapped field.
UnconsumedColumnsError: ``options.allow_column_subset`` is ``False`` and
a column matched no field.
Example:
Dict mode reads the header and yields one dict per row::
>>> list(csv_load(["name,score", "Ada,95"]))
[{'name': 'Ada', 'score': '95'}]
Dataclass mode converts cells to the annotated types::
>>> from dataclasses import dataclass
>>> @dataclass
... class Person:
... name: str
... score: int
>>> list(csv_load(["name,score", "Ada,95"], dataclass=Person))
[Person(name='Ada', score=95)]
"""
mapping = mapping or DataMapping()
options = options or CsvLoadOptions()
field_to_column_index = mapping.field_to_column_index
with open_text(source) as source_io:
reader = csv.reader(source_io, delimiter=options.delimiter)
string_parser = options.string_parser or StringParser.default()
column_names = mapping.column_names
if column_names is None:
# Skip leading blank lines so a header preceded by empty rows is
# still detected rather than read as an empty header.
for candidate_row in reader:
if candidate_row:
column_names = candidate_row
break
else:
logger.debug("No column names provided and source is empty.")
return
canonical_column_indices = _build_canonical_column_indices(
column_names=column_names, mapping=mapping
)
resolved_column_indices, ambiguous_columns = _split_ambiguous_columns(
column_names=column_names,
canonical_column_indices=canonical_column_indices,
)
resolved_init_function, type_hints, field_to_column_name = (
_resolve_loader(
dataclass=dataclass,
init_function=init_function,
column_names=column_names,
mapping=mapping,
options=options,
)
)
field_to_index_and_converters = _build_field_indices_and_converters(
field_to_column_name=field_to_column_name,
field_to_column_index=field_to_column_index,
column_resolution=ColumnResolution(
resolved_indices=resolved_column_indices,
ambiguous_columns=ambiguous_columns,
column_count=len(column_names),
mapping=mapping,
),
type_hints=type_hints,
string_parser=string_parser,
)
_assert_or_allow_unconsumed_columns(
column_names=column_names,
field_to_index_and_converters=field_to_index_and_converters,
allow_column_subset=options.allow_column_subset,
)
for row in reader:
if not row:
continue # skip blank lines, which are common in CSV files
yield cast(
"T",
resolved_init_function(
**_convert_row(
row=row,
line_number=reader.line_num,
field_to_index_and_converters=(
field_to_index_and_converters
),
)
),
)
# Captures JSON string literals (groups 1 and 2) so that anything matched by a
# following alternative can be stripped without ever reaching string contents.
_JSON5_STRING_ALTERNATION = r"""
( # 1: double-quoted string
"(?:\\.|[^"\\])*"
)
| ( # 2: single-quoted string
'(?:\\.|[^'\\])*'
)
"""
_JSON5_COMMENT_PATTERN = re.compile(
_JSON5_STRING_ALTERNATION + r"""
| (?:[ \t]*//[^\r\n]*) # remove spaces + single-line comment
| (?:[ \t]*/\*.*?\*/) # remove spaces + block comment (ungreedy)
""",
re.VERBOSE | re.DOTALL,
)
_JSON5_TRAILING_COMMA_PATTERN = re.compile(
_JSON5_STRING_ALTERNATION + r"""
| ,(?=\s*[\]}]) # remove a comma before a closing ] or }
""",
re.VERBOSE | re.DOTALL,
)
def _keep_string_drop_match(match: re.Match[str]) -> str:
# Groups 1 and 2 are string literals to preserve verbatim; any other
# alternative (a comment or a trailing comma) is dropped.
return match.group(1) or match.group(2) or ""
[docs]
def json5_load(source: TextProvider) -> JsonValue:
"""Parse JSON with comments and trailing commas into JSON data.
Comments and trailing commas are stripped only outside of string literals,
so contents such as ``"a,}"`` or ``"// not a comment"`` survive intact.
"""
without_comments = _JSON5_COMMENT_PATTERN.sub(
_keep_string_drop_match, get_text(source)
)
without_trailing_commas = _JSON5_TRAILING_COMMA_PATTERN.sub(
_keep_string_drop_match, without_comments
)
return cast("JsonValue", json.loads(without_trailing_commas))