Source code for pyobo.struct.reference

"""Data structures for OBO."""

from __future__ import annotations

import datetime
import logging
from collections import Counter
from collections.abc import Iterable, Sequence
from datetime import date as date_cls
from datetime import datetime as datetime_cls
from typing import Any, NamedTuple

import bioregistry
import curies
import dateutil.parser
import pytz
from bioregistry import NormalizedNamableReference as Reference
from curies import ReferenceTuple
from curies import vocabulary as v
from curies.preprocessing import BlocklistError

from ..identifier_utils import (
    NotCURIEError,
    ParseError,
    UnparsableIRIError,
    _is_valid_identifier,
    _parse_str_or_curie_or_uri_helper,
)

__all__ = [
    "OBOLiteral",
    "Reference",
    "Referenced",
    "default_reference",
    "get_preferred_curie",
    "multi_reference_escape",
    "reference_escape",
    "unspecified_matching",
]

logger = logging.getLogger(__name__)


def _parse_str_or_curie_or_uri(
    str_curie_or_uri: str,
    name: str | None = None,
    *,
    strict: bool = False,
    ontology_prefix: str | None = None,
    node: Reference | None = None,
    predicate: Reference | None = None,
    line: str | None = None,
    context: str | None = None,
    upgrade: bool = False,
) -> Reference | None:
    reference = _parse_str_or_curie_or_uri_helper(
        str_curie_or_uri,
        ontology_prefix=ontology_prefix,
        name=name,
        node=node,
        predicate=predicate,
        line=line,
        context=context,
        upgrade=upgrade,
    )

    match reference:
        case Reference():
            return reference
        case BlocklistError():
            return None
        case ParseError():
            if strict:
                raise reference
            else:
                return None
        case _:
            raise TypeError(f"Got invalid: ({type(reference)}) {reference}")


class Referenced:
    """A class that contains a reference."""

    reference: Reference

    def __hash__(self) -> int:
        return self.reference.__hash__()

    def __eq__(self, other: Any) -> bool:
        if isinstance(other, curies.Reference | Referenced):
            return self.prefix == other.prefix and self.identifier == other.identifier
        raise TypeError

    def __lt__(self, other: curies.Reference | Referenced) -> bool:
        if isinstance(other, curies.Reference):
            return self.reference < other
        if isinstance(other, Referenced):
            return self.reference < other.reference
        raise TypeError

    @property
    def prefix(self) -> str:
        """The prefix of the typedef."""
        return self.reference.prefix

    @property
    def name(self) -> str | None:
        """The name of the typedef."""
        return self.reference.name

    @property
    def identifier(self) -> str:
        """The local unique identifier for this typedef."""
        return self.reference.identifier

    @property
    def curie(self) -> str:
        """The CURIE for this typedef."""
        return self.reference.curie

    @property
    def pair(self) -> ReferenceTuple:
        """The pair of namespace/identifier."""
        return self.reference.pair


def get_preferred_prefix(
    ref: curies.Reference | Reference | Referenced,
) -> str:
    """Get the preferred prefix from a variety of types."""
    match ref:
        case Referenced() | Reference():
            return bioregistry.get_preferred_prefix(ref.prefix) or ref.prefix
        case curies.Reference():
            return ref.prefix


def get_preferred_curie(
    ref: curies.Reference | Reference | Referenced,
) -> str:
    """Get the preferred CURIE from a variety of types."""
    match ref:
        case Referenced() | Reference():
            return f"{get_preferred_prefix(ref)}:{ref.identifier}"
        case curies.Reference():
            return ref.curie


[docs] def default_reference(prefix: str, identifier: str, name: str | None = None) -> Reference: """Create a CURIE for an "unqualified" reference. :param prefix: The prefix of the ontology in which the "unqualified" reference is made :param identifier: The "unqualified" reference. For example, if you just write "located_in" somewhere there is supposed to be a CURIE :returns: A CURIE for the "unqualified" reference based on the OBO semantic space >>> default_reference("chebi", "conjugate_base_of") Reference(prefix="obo", identifier="chebi#conjugate_base_of", name=None) """ if not identifier.strip(): raise ValueError("default identifier is empty") return Reference(prefix="obo", identifier=f"{prefix}#{identifier}", name=name)
def _get_ref_name(reference: curies.Reference | Referenced) -> str | None: if isinstance(reference, curies.NamableReference | Referenced): return reference.name return None def reference_escape( reference: curies.Reference | Referenced, *, ontology_prefix: str, add_name_comment: bool = False, ) -> str: """Write a reference with default namespace removed.""" if reference.prefix == "obo" and reference.identifier.startswith(f"{ontology_prefix}#"): return reference.identifier.removeprefix(f"{ontology_prefix}#") rv = get_preferred_curie(reference) if add_name_comment and (name := _get_ref_name(reference)): rv += f" ! {name}" return rv def multi_reference_escape( references: Sequence[Reference | Referenced], *, ontology_prefix: str, add_name_comment: bool = False, ) -> str: """Write multiple references with default namespace normalized.""" rv = " ".join( reference_escape(r, ontology_prefix=ontology_prefix, add_name_comment=False) for r in references ) names = [r.name or "" for r in references] if add_name_comment and all(names): rv += " ! " + " ".join(names) return rv def comma_separate_references(elements: Iterable[Reference | OBOLiteral]) -> str: """Map a list to strings and make comma separated.""" return ", ".join(reference_or_literal_to_str(element) for element in elements) def _obo_parse_identifier( str_or_curie_or_uri: str, *, ontology_prefix: str, strict: bool = False, node: Reference | None = None, predicate: Reference | None = None, line: str | None = None, context: str | None = None, name: str | None = None, upgrade: bool = True, counter: Counter[tuple[str, str]] | None = None, ) -> Reference | None: """Parse from a CURIE, URI, or default string in the ontology prefix's IDspace using OBO semantics.""" match _parse_str_or_curie_or_uri_helper( str_or_curie_or_uri, ontology_prefix=ontology_prefix, node=node, predicate=predicate, line=line, context=context, name=name, upgrade=upgrade, ): case Reference() as reference: return reference case BlocklistError(): return None case NotCURIEError() as exc: # this means there's no colon `:` if _is_valid_identifier(str_or_curie_or_uri): return default_reference(prefix=ontology_prefix, identifier=str_or_curie_or_uri) elif strict: raise exc else: return None case ParseError() as exc: if strict: raise exc if counter is None: logger.warning(str(exc)) else: if not counter[ontology_prefix, str_or_curie_or_uri]: logger.warning(str(exc)) counter[ontology_prefix, str_or_curie_or_uri] += 1 return None def _parse_reference_or_uri_literal( str_or_curie_or_uri: str, *, ontology_prefix: str, strict: bool = False, node: Reference, predicate: Reference | None = None, line: str, context: str, name: str | None = None, upgrade: bool = True, # counter: Counter[tuple[str, str]] | None = None, ) -> None | Reference | OBOLiteral: match _parse_str_or_curie_or_uri_helper( str_or_curie_or_uri, node=node, predicate=predicate, ontology_prefix=ontology_prefix, line=line, context=context, name=name, upgrade=upgrade, ): case Reference() as reference: return reference case BlocklistError(): return None case UnparsableIRIError(): # this means that it's defininitely a URI, # but it couldn't be parsed with Bioregistry return OBOLiteral.uri(str_or_curie_or_uri) case NotCURIEError() as exc: # this means there's no colon `:` if _is_valid_identifier(str_or_curie_or_uri): return default_reference(prefix=ontology_prefix, identifier=str_or_curie_or_uri) elif strict: raise exc else: return None case ParseError() as exc: if strict: raise exc if counter is None: logger.warning(str(exc)) else: if not counter[ontology_prefix, str_or_curie_or_uri]: logger.warning(str(exc)) counter[ontology_prefix, str_or_curie_or_uri] += 1 return None unspecified_matching = Reference.from_reference(v.unspecified_matching_process) class OBOLiteral(NamedTuple): """A tuple representing a property with a literal value.""" value: str datatype: curies.Reference language: str | None @classmethod def string(cls, value: str, *, language: str | None = None) -> OBOLiteral: """Get a string literal.""" return cls(value, v.xsd_string, language) @classmethod def boolean(cls, value: bool) -> OBOLiteral: """Get a boolean literal.""" return cls(str(value).lower(), v.xsd_boolean, None) @classmethod def decimal(cls, value: float) -> OBOLiteral: """Get a decimal literal.""" return cls(str(value), v.xsd_decimal, None) @classmethod def float(cls, value: float) -> OBOLiteral: """Get a float literal.""" return cls(str(value), v.xsd_float, None) @classmethod def integer(cls, value: int | str) -> OBOLiteral: """Get a integer literal.""" return cls(str(int(value)), v.xsd_integer, None) @classmethod def year(cls, value: int | str) -> OBOLiteral: """Get a year (gYear) literal.""" return cls(str(int(value)), v.xsd_year, None) @classmethod def uri(cls, uri: str) -> OBOLiteral: """Get a string literal for a URI.""" return cls(uri, v.xsd_uri, None) @classmethod def datetime(cls, dt: datetime_cls | str) -> OBOLiteral: """Get a datetime literal.""" if isinstance(dt, str): dt = _parse_datetime(dt) return cls(dt.isoformat(), v.xsd_datetime, None) @classmethod def date(cls, dt: datetime_cls | date_cls | str) -> OBOLiteral: """Get a datetime literal.""" if isinstance(dt, str): dt = datetime.date.fromisoformat(dt) elif isinstance(dt, datetime.datetime): dt = dt.date() return cls(dt.isoformat(), v.xsd_date, None) def _parse_datetime(dd: str) -> datetime.datetime: xx = dateutil.parser.parse(dd) xx = xx.astimezone(pytz.UTC) return xx def _reference_list_tag( tag: str, references: Iterable[Reference], ontology_prefix: str ) -> Iterable[str]: for reference in references: yield f"{tag}: {reference_escape(reference, ontology_prefix=ontology_prefix, add_name_comment=True)}" def reference_or_literal_to_str(x: OBOLiteral | curies.Reference | Reference | Referenced) -> str: """Get a string from a reference or literal.""" if isinstance(x, OBOLiteral): return x.value return get_preferred_curie(x)