Source code for pyobo.struct.struct_utils

"""Utiltites on top of the reference."""

from __future__ import annotations

import datetime
import itertools as itt
import logging
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Iterable, Mapping, Sequence
from typing import TYPE_CHECKING, Literal, NamedTuple, Self, TypeAlias, overload

import curies
from curies import ReferenceTuple
from curies import vocabulary as _v
from curies.vocabulary import SynonymScope
from pydantic import BaseModel, ConfigDict
from ssslm import LiteralMapping

from . import vocabulary as v
from .reference import (
    OBOLiteral,
    Reference,
    Referenced,
    comma_separate_references,
    default_reference,
    get_preferred_curie,
    multi_reference_escape,
    reference_escape,
    reference_or_literal_to_str,
    unspecified_matching,
)
from .utils import obo_escape_slim
from ..identifier_utils import (
    NotCURIEError,
    ParseError,
    _is_valid_identifier,
    _parse_str_or_curie_or_uri_helper,
)

if TYPE_CHECKING:
    from pyobo.struct.struct import Synonym, TypeDef

__all__ = [
    "Annotation",
    "AnnotationsDict",
    "HasReferencesMixin",
    "ReferenceHint",
    "Stanza",
]

logger = logging.getLogger(__name__)


[docs] class Annotation(NamedTuple): """A tuple representing a predicate-object pair.""" predicate: Reference value: Reference | OBOLiteral
[docs] @classmethod def float(cls, predicate: Reference | TypeDef, value: float) -> Self: """Return a literal property for a float.""" from .struct import TypeDef if isinstance(predicate, TypeDef): predicate = predicate.reference return cls(predicate, OBOLiteral.float(value))
[docs] @classmethod def uri(cls, predicate: Reference | TypeDef, uri: str) -> Self: """Return a literal property for a URI.""" from .struct import TypeDef if isinstance(predicate, TypeDef): predicate = predicate.reference return cls(predicate, OBOLiteral.uri(uri))
[docs] @classmethod def string( cls, predicate: Reference | TypeDef, value: str, *, language: str | None = None ) -> Self: """Return a literal property for a float.""" from .struct import TypeDef if isinstance(predicate, TypeDef): predicate = predicate.reference return cls(predicate, OBOLiteral.string(value, language=language))
@staticmethod def _sort_key(x: Annotation) -> tuple[Reference, tuple[int, Reference | OBOLiteral]]: return x.predicate, _reference_or_literal_key(x.value)
def _property_resolve(p: ReferenceHint, o: Reference | Referenced | OBOLiteral) -> Annotation: p = _ensure_ref(p) if isinstance(o, Referenced): o = o.reference return Annotation(p, o) PropertiesHint: TypeAlias = dict[Reference, list[Reference | OBOLiteral]] RelationsHint: TypeAlias = dict[Reference, list[Reference]] AnnotationsDict: TypeAlias = dict[Annotation, list[Annotation]] # note that an intersection is not valid in ROBOT with a literal, even though this _might_ make sense. IntersectionOfHint: TypeAlias = list[Reference | tuple[Reference, Reference]] UnionOfHint: TypeAlias = list[Reference] StanzaType: TypeAlias = Literal["Term", "Instance", "TypeDef"] stanza_type_to_prop: dict[StanzaType, Reference] = { "Term": v.is_a, "Instance": v.rdf_type, "TypeDef": v.subproperty_of, } stanza_type_to_eq_prop: dict[StanzaType, Reference] = { "Term": v.equivalent_class, "Instance": v.owl_same_as, "TypeDef": v.equivalent_property, } class HasReferencesMixin(ABC): """A class that can report on the references it contains.""" def _get_prefixes(self) -> set[str]: return set(self._get_references()) @abstractmethod def _get_references(self) -> dict[str, set[Reference]]: raise NotImplementedError class Stanza(Referenced, HasReferencesMixin): """A high-level class for stanzas.""" reference: Reference relationships: RelationsHint properties: PropertiesHint xrefs: list[Reference] parents: list[Reference] intersection_of: IntersectionOfHint equivalent_to: list[Reference] union_of: UnionOfHint subsets: list[Reference] disjoint_from: list[Reference] synonyms: list[Synonym] type: StanzaType _axioms: AnnotationsDict #: An annotation for obsolescence. By default, is None, but this means that it is not obsolete. is_obsolete: bool | None #: A description of the entity definition: str | None = None @staticmethod def _reference( reference: Reference, ontology_prefix: str, add_name_comment: bool = False ) -> str: return reference_escape( reference, ontology_prefix=ontology_prefix, add_name_comment=add_name_comment ) def _get_prefixes(self) -> set[str]: return set(self._get_references()) def _get_references(self) -> dict[str, set[Reference]]: """Get all prefixes used by the typedef.""" rv: defaultdict[str, set[Reference]] = defaultdict(set) def _add(r: Reference) -> None: rv[r.prefix].add(r) _add(self.reference) for synonym in self.synonyms: for prefix, references in synonym._get_references().items(): rv[prefix].update(references) if self.xrefs: # xrefs themselves added in the chain below _add(v.has_dbxref) for predicate, values in self.properties.items(): _add(predicate) for value in values: if isinstance(value, Reference): _add(value) elif isinstance(value, OBOLiteral): _add(v._c(value.datatype)) for parent in itt.chain( self.parents, self.union_of, self.equivalent_to, self.disjoint_from, self.subsets, self.xrefs, ): _add(parent) for intersection_of in self.intersection_of: match intersection_of: case Reference(): _add(intersection_of) case (intersection_predicate, intersection_value): _add(intersection_predicate) _add(intersection_value) for rel_predicate, rel_values in self.relationships.items(): _add(rel_predicate) for r in rel_values: _add(r) for p_o, annotations_ in self._axioms.items(): _add(p_o.predicate) if isinstance(p_o.value, Reference): _add(p_o.value) for prefix, references in _get_references_from_annotations(annotations_).items(): rv[prefix].update(references) return rv def get_literal_mappings(self) -> list[LiteralMapping]: """Get synonym objects for this term, including one for its label.""" rv = [_convert_synoynym(self, synonym) for synonym in self.synonyms] if self.reference.name: rv.append(_get_stanza_name_synonym(self)) return rv def append_relationship( self, typedef: ReferenceHint, reference: ReferenceHint, *, annotations: Iterable[Annotation] | None = None, ) -> Self: """Append a relationship.""" typedef = _ensure_ref(typedef) reference = _ensure_ref(reference) self.relationships[typedef].append(reference) self._extend_annotations(typedef, reference, annotations) return self def _extend_annotations( self, p: Reference, o: Reference | OBOLiteral, annotations: Iterable[Annotation] | None ) -> None: if annotations is None: return for annotation in annotations: self._append_annotation(p, o, annotation) def _append_annotation( self, p: ReferenceHint, o: Reference | OBOLiteral, annotation: Annotation ) -> None: self._axioms[_property_resolve(p, o)].append(annotation) # TODO check different usages of this def append_equivalent( self, reference: ReferenceHint, *, annotations: Iterable[Annotation] | None = None, ) -> Self: """Append an equivalent class axiom.""" return self.append_relationship( stanza_type_to_eq_prop[self.type], reference, annotations=annotations ) def append_equivalent_to( self, reference: ReferenceHint, *, annotations: Iterable[Annotation] | None = None ) -> Self: """Append to the "equivalent to" list.""" reference = _ensure_ref(reference) self.equivalent_to.append(reference) self._extend_annotations(stanza_type_to_eq_prop[self.type], reference, annotations) return self def append_xref( self, reference: ReferenceHint, *, mapping_justification: Reference | None = None, confidence: float | None = None, contributor: Reference | None = None, annotations: list[Annotation] | None = None, ) -> Self: """Append an xref.""" reference = _ensure_ref(reference) self.xrefs.append(reference) if annotations is None: annotations = [] annotations.extend( self._prepare_mapping_annotations( mapping_justification=mapping_justification, confidence=confidence, contributor=contributor, ) ) self._extend_annotations(v.has_dbxref, reference, annotations) return self def _prepare_mapping_annotations( self, *, mapping_justification: Reference | None = None, confidence: float | None = None, contributor: Reference | None = None, ) -> Iterable[Annotation]: if mapping_justification is not None: yield Annotation(v.mapping_has_justification, mapping_justification) if contributor is not None: yield Annotation(v.has_contributor, contributor) if confidence is not None: yield Annotation.float(v.mapping_has_confidence, confidence) def append_parent( self, reference: ReferenceHint, *, annotations: Iterable[Annotation] | None = None, ) -> Self: """Add a parent to this entity.""" reference = _ensure_ref(reference) if reference not in self.parents: self.parents.append(reference) self._extend_annotations(stanza_type_to_prop[self.type], reference, annotations) return self def append_intersection_of( self, /, reference: ReferenceHint | tuple[ReferenceHint, ReferenceHint], r2: ReferenceHint | None = None, *, annotations: Iterable[Annotation] | None = None, ) -> Self: """Append an intersection of.""" if r2 is not None: if isinstance(reference, tuple): raise TypeError self.intersection_of.append((_ensure_ref(reference), _ensure_ref(r2))) elif isinstance(reference, tuple): self.intersection_of.append((_ensure_ref(reference[0]), _ensure_ref(reference[1]))) else: self.intersection_of.append(_ensure_ref(reference)) return self def append_union_of(self, reference: ReferenceHint) -> Self: """Append to the "union of" list.""" self.union_of.append(_ensure_ref(reference)) return self def _iterate_intersection_of_obo(self, *, ontology_prefix: str) -> Iterable[str]: for element in sorted(self.intersection_of, key=self._intersection_of_key): match element: case Reference(): end = reference_escape( element, ontology_prefix=ontology_prefix, add_name_comment=True ) case (predicate, object): match object: case Reference(): end = multi_reference_escape( [predicate, object], ontology_prefix=ontology_prefix, add_name_comment=True, ) case OBOLiteral(): raise NotImplementedError case _: raise TypeError yield f"intersection_of: {end}" @staticmethod def _intersection_of_key( io: Reference | tuple[Reference, Reference], ) -> tuple[Literal[0], Reference] | tuple[Literal[1], tuple[Reference, Reference]]: if isinstance(io, Reference): return 0, io else: return 1, io def _iterate_xref_obo(self, *, ontology_prefix: str) -> Iterable[str]: for xref in sorted(self.xrefs): xref_yv = f"xref: {reference_escape(xref, ontology_prefix=ontology_prefix, add_name_comment=False)}" xref_yv += _get_obo_trailing_modifiers( v.has_dbxref, xref, self._axioms, ontology_prefix=ontology_prefix ) if xref.name: xref_yv += f" ! {xref.name}" yield xref_yv def _get_annotations( self, p: ReferenceHint, o: Reference | Referenced | OBOLiteral | str ) -> list[Annotation]: if isinstance(o, str): o = OBOLiteral.string(o) return self._axioms.get(_property_resolve(p, o), []) def _get_annotation( self, p: ReferenceHint, o: Reference | OBOLiteral, ap: Reference ) -> Reference | OBOLiteral | None: ap_norm = _ensure_ref(ap) for annotation in self._get_annotations(p, o): if annotation.predicate.pair == ap_norm.pair: return annotation.value return None def append_property( self, prop: Annotation, *, annotations: Iterable[Annotation] | None = None ) -> Self: """Annotate a property.""" self.properties[prop.predicate].append(prop.value) self._extend_annotations(prop.predicate, prop.value, annotations) return self def annotate_literal( self, prop: ReferenceHint, value: OBOLiteral, *, annotations: Iterable[Annotation] | None = None, ) -> Self: """Append an object annotation.""" prop = _ensure_ref(prop) self.properties[prop].append(value) self._extend_annotations(prop, value, annotations) return self def annotate_string( self, prop: ReferenceHint, value: str, *, annotations: Iterable[Annotation] | None = None, language: str | None = None, ) -> Self: """Append an object annotation.""" return self.annotate_literal( prop, OBOLiteral.string(value, language=language), annotations=annotations ) def annotate_boolean( self, prop: ReferenceHint, value: bool, *, annotations: Iterable[Annotation] | None = None, ) -> Self: """Append an object annotation.""" return self.annotate_literal(prop, OBOLiteral.boolean(value), annotations=annotations) def annotate_integer( self, prop: ReferenceHint, value: int | str, *, annotations: Iterable[Annotation] | None = None, ) -> Self: """Append an object annotation.""" return self.annotate_literal(prop, OBOLiteral.integer(value), annotations=annotations) def annotate_float( self, prop: ReferenceHint, value: float, *, annotations: Iterable[Annotation] | None = None ) -> Self: """Append a float annotation.""" return self.annotate_literal(prop, OBOLiteral.float(value), annotations=annotations) def annotate_decimal( self, prop: ReferenceHint, value: float, *, annotations: Iterable[Annotation] | None = None ) -> Self: """Append a decimal annotation.""" return self.annotate_literal(prop, OBOLiteral.decimal(value), annotations=annotations) def annotate_year( self, prop: ReferenceHint, value: int | str, *, annotations: Iterable[Annotation] | None = None, ) -> Self: """Append a year annotation.""" return self.annotate_literal(prop, OBOLiteral.year(value), annotations=annotations) def annotate_uri( self, prop: ReferenceHint, value: str, *, annotations: Iterable[Annotation] | None = None ) -> Self: """Append a URI annotation.""" return self.annotate_literal(prop, OBOLiteral.uri(value), annotations=annotations) def annotate_datetime( self, prop: ReferenceHint, value: datetime.datetime | str, *, annotations: Iterable[Annotation] | None = None, ) -> Self: """Append a datetime annotation.""" return self.annotate_literal(prop, OBOLiteral.datetime(value), annotations=annotations) def annotate_date( self, prop: ReferenceHint, value: datetime.datetime | datetime.date | str, *, annotations: Iterable[Annotation] | None = None, ) -> Self: """Append a date annotation.""" return self.annotate_literal(prop, OBOLiteral.date(value), annotations=annotations) def _iterate_obo_properties( self, *, ontology_prefix: str, skip_predicate_objects: Iterable[Reference] | None = None, skip_predicate_literals: Iterable[Reference] | None = None, typedefs: Mapping[ReferenceTuple, TypeDef], ) -> Iterable[str]: for line in _iterate_obo_relations( self.properties, self._axioms, ontology_prefix=ontology_prefix, skip_predicate_objects=skip_predicate_objects, skip_predicate_literals=skip_predicate_literals, typedefs=typedefs, ): yield f"property_value: {line}" def _iterate_obo_relations( self, *, ontology_prefix: str, typedefs: Mapping[ReferenceTuple, TypeDef] ) -> Iterable[str]: for line in _iterate_obo_relations( self.relationships, self._axioms, ontology_prefix=ontology_prefix, typedefs=typedefs, ): yield f"relationship: {line}" def append_subset(self, subset: ReferenceHint) -> Self: """Add a subset.""" self.subsets.append(_ensure_ref(subset)) return self def append_disjoint_from(self, reference: ReferenceHint) -> Self: """Add a disjoint from.""" self.disjoint_from.append(_ensure_ref(reference)) return self def annotate_object( self, typedef: ReferenceHint, value: ReferenceHint, *, annotations: Iterable[Annotation] | None = None, ) -> Self: """Append an object annotation.""" typedef = _ensure_ref(typedef) value = _ensure_ref(value) self.properties[typedef].append(value) self._extend_annotations(typedef, value, annotations) return self def append_contributor(self, reference: ReferenceHint) -> Self: """Append contributor.""" return self.annotate_object(v.has_contributor, reference) def append_creation_date(self, date: datetime.datetime | str) -> Self: """Append contributor.""" return self.annotate_datetime(v.obo_creation_date, date) def get_see_also(self) -> list[Reference]: """Get all see also objects.""" return self.get_property_objects(v.see_also) def get_replaced_by(self) -> list[Reference]: """Get all replaced by.""" return self.get_property_objects(v.term_replaced_by) def append_replaced_by( self, reference: Reference, *, annotations: Iterable[Annotation] | None = None ) -> Self: """Add a replaced by property.""" return self.annotate_object(v.term_replaced_by, reference, annotations=annotations) def iterate_relations(self) -> Iterable[tuple[Reference, Reference]]: """Iterate over pairs of typedefs and targets.""" for typedef, targets in sorted(self.relationships.items()): for target in sorted(targets): yield typedef, target def iterate_object_properties(self) -> Iterable[tuple[Reference, Reference]]: """Iterate over properties with references as their targets.""" for predicate, values in self.properties.items(): for value in values: if isinstance(value, Reference): yield predicate, value def iterate_literal_properties(self) -> Iterable[tuple[Reference, OBOLiteral]]: """Iterate over properties with literals as their targets.""" for predicate, values in self.properties.items(): for value in values: if isinstance(value, OBOLiteral): yield predicate, value def get_relationships(self, typedef: ReferenceHint) -> list[Reference]: """Get relationships from the given type.""" return self.relationships.get(_ensure_ref(typedef), []) # docstr-coverage:excused `overload` @overload def get_relationship( self, typedef: ReferenceHint, *, strict: Literal[False] = ... ) -> Reference | None: ... # docstr-coverage:excused `overload` @overload def get_relationship( self, typedef: ReferenceHint, *, strict: Literal[True] = ... ) -> Reference: ... def get_relationship(self, typedef: ReferenceHint, *, strict: bool = False) -> Reference | None: """Get a single relationship of the given type.""" r = self.get_relationships(typedef) if not r: if strict: raise ValueError return None if len(r) > 1: raise ValueError(f"multiple relationships returned: {r}") return r[0] def iterate_relation_targets(self, typedef: ReferenceHint) -> list[Reference]: """Iterate over pairs of typedefs and targets.""" return sorted(self.relationships.get(_ensure_ref(typedef), [])) def get_property_annotations(self) -> list[Annotation]: """Iterate over pairs of property and values.""" return [ Annotation(prop, value) for prop, values in sorted(self.properties.items()) for value in sorted(values, key=_reference_or_literal_key) ] def get_property_values(self, typedef: ReferenceHint) -> list[Reference | OBOLiteral]: """Iterate over references or values.""" return sorted(self.properties.get(_ensure_ref(typedef), [])) def get_property_objects(self, prop: ReferenceHint) -> list[Reference]: """Get properties from the given key.""" return sorted( reference for reference in self.properties.get(_ensure_ref(prop), []) if isinstance(reference, curies.Reference) ) def append_exact_synonym( self, synonym: str | Synonym, *, type: Reference | Referenced | None = None, provenance: Sequence[Reference | OBOLiteral] | None = None, annotations: Iterable[Annotation] | None = None, language: str | None = None, ) -> Self: """Add an exact synonym.""" return self.append_synonym( synonym, type=type, specificity="EXACT", provenance=provenance, annotations=annotations, language=language, ) def append_synonym( self, synonym: str | Synonym, *, type: Reference | Referenced | None = None, specificity: SynonymScope | None = None, provenance: Sequence[Reference | OBOLiteral] | None = None, annotations: Iterable[Annotation] | None = None, language: str | None = None, ) -> Self: """Add a synonym.""" if isinstance(type, Referenced): type = type.reference if isinstance(synonym, str): from pyobo.struct.struct import Synonym synonym = Synonym( synonym, type=type, specificity=specificity, provenance=list(provenance or []), annotations=list(annotations or []), language=language, ) self.synonyms.append(synonym) return self def append_alt( self, alt: Reference, *, annotations: Iterable[Annotation] | None = None ) -> Self: """Add an alternative identifier.""" return self.annotate_object(v.alternative_term, alt, annotations=annotations) def append_see_also( self, reference: ReferenceHint, *, annotations: Iterable[Annotation] | None = None ) -> Self: """Add a see also property.""" _reference = _ensure_ref(reference) return self.annotate_object(v.see_also, _reference, annotations=annotations) def append_comment( self, value: str, *, annotations: Iterable[Annotation] | None = None, language: str | None = None, ) -> Self: """Add a comment property.""" return self.annotate_string(v.comment, value, annotations=annotations, language=language) def get_comments(self) -> list[str]: """Get all comment strings.""" return [x.value for x in self.get_property_values(v.comment) if isinstance(x, OBOLiteral)] @property def alt_ids(self) -> Sequence[Reference]: """Get alternative terms.""" return tuple(self.get_property_objects(v.alternative_term)) def get_edges(self, *, include_xrefs: bool = True) -> list[tuple[Reference, Reference]]: """Get edges.""" return list(self._iter_edges(include_xrefs=include_xrefs)) def _iter_parents(self) -> Iterable[tuple[Reference, Reference]]: parent_prop = stanza_type_to_prop[self.type] for parent in itt.chain(self.parents, self.union_of): yield parent_prop, parent def _iter_intersections(self) -> Iterable[tuple[Reference, Reference]]: parent_prop = stanza_type_to_prop[self.type] for intersection_of in self.intersection_of: match intersection_of: case Reference(): yield parent_prop, intersection_of case (predicate, target): yield predicate, target def _iter_edges(self, *, include_xrefs: bool = True) -> Iterable[tuple[Reference, Reference]]: # The following are "object" properties, meaning # they're part of the definition of the object yield from self.iterate_relations() yield from self._iter_parents() yield from self._iter_intersections() for equivalent_to in self.equivalent_to: yield stanza_type_to_eq_prop[self.type], equivalent_to # The following are "annotation" properties for subset in self.subsets: yield v.in_subset, subset yield from self.iterate_object_properties() if include_xrefs: for xref_reference in self.xrefs: yield v.has_dbxref, xref_reference # TODO disjoint_from # docstr-coverage:excused `overload` @overload def get_mappings( self, *, include_xrefs: bool = ..., add_context: Literal[False] = ... ) -> list[tuple[Reference, Reference]]: ... # docstr-coverage:excused `overload` @overload def get_mappings( self, *, include_xrefs: bool = ..., add_context: Literal[True] = ... ) -> list[tuple[Reference, Reference, MappingContext]]: ... def get_mappings( self, *, include_xrefs: bool = True, add_context: bool = False ) -> list[tuple[Reference, Reference]] | list[tuple[Reference, Reference, MappingContext]]: """Get mappings with preferred curies.""" rows = [] for predicate in v.extended_match_typedefs: for xref_reference in itt.chain( self.get_property_objects(predicate), self.get_relationships(predicate) ): rows.append((predicate, xref_reference)) if include_xrefs: for xref_reference in self.xrefs: rows.append((v.has_dbxref, xref_reference)) for equivalent_to in self.equivalent_to: rows.append((v.equivalent_class, equivalent_to)) rv = sorted(set(rows)) if not add_context: return rv return [(k, v, self._get_mapping_context(k, v)) for k, v in rv] def _get_object_annotation_target( self, p: Reference, o: Reference | OBOLiteral, ap: Reference ) -> Reference | None: match self._get_annotation(p, o, ap): case OBOLiteral(): raise TypeError case Reference() as target: return target case None: return None case _: raise TypeError def _get_str_annotation_target( self, p: Reference, o: Reference | OBOLiteral, ap: Reference ) -> str | None: match self._get_annotation(p, o, ap): case OBOLiteral(value, _): return value case Reference(): raise TypeError case None: return None case _: raise TypeError def _get_mapping_context(self, p: Reference, o: Reference) -> MappingContext: return MappingContext( justification=self._get_object_annotation_target(p, o, v.mapping_has_justification) or unspecified_matching, contributor=self._get_object_annotation_target(p, o, v.has_contributor), confidence=self._get_str_annotation_target(p, o, v.mapping_has_confidence), ) def _definition_fp(self) -> str: definition = obo_escape_slim(self.definition) if self.definition else "" dp = self._get_definition_provenance() if dp: return f'"{definition}" [{comma_separate_references(dp)}]' else: return f'"{definition}"' def _get_definition_provenance(self) -> Sequence[Reference | OBOLiteral]: if self.definition is None: return [] return [ annotation.value for annotation in self._get_annotations(v.has_description, self.definition) if annotation.predicate.pair == v.has_dbxref.pair ] @property def provenance(self) -> Sequence[Reference | OBOLiteral]: """Get definition provenance.""" # return as a tuple to make sure nobody is appending on it return ( *self.get_property_objects(v.is_mentioned_by), # This gets all of the xrefs on _any_ axiom, # which includes the definition provenance *( annotation.value for annotation in itt.chain.from_iterable(self._axioms.values()) if annotation.predicate.pair == v.has_dbxref.pair ), ) def append_definition_xref(self, reference: ReferenceHint) -> Self: """Add a reference to this term's definition.""" if not self.definition: raise ValueError("can not append definition provenance if no definition is set") self._append_annotation( v.has_description, OBOLiteral.string(self.definition), Annotation(v.has_dbxref, _ensure_ref(reference)), ) return self def append_provenance( self, reference: Reference, *, annotations: Iterable[Annotation] | None = None, ) -> Self: """Append a creative work that mentions this term.""" warnings.warn("use append_mentioned_by instead", DeprecationWarning, stacklevel=2) return self.append_mentioned_by(reference, annotations=annotations) def append_mentioned_by( self, reference: Reference, *, annotations: Iterable[Annotation] | None = None, ) -> Self: """Append a creative work that mentions this term.""" return self.annotate_object(v.is_mentioned_by, reference, annotations=annotations) ReferenceHint: TypeAlias = ( Reference | Referenced | curies.Reference | curies.NamedReference | tuple[str, str] | str ) def _ensure_ref( reference: ReferenceHint, *, ontology_prefix: str | None = None, ) -> Reference: if isinstance(reference, Referenced): return reference.reference if isinstance(reference, tuple): return Reference(prefix=reference[0], identifier=reference[1]) if isinstance(reference, Reference): return reference if isinstance(reference, curies.NamedReference): return Reference( prefix=reference.prefix, identifier=reference.identifier, name=reference.name ) if isinstance(reference, curies.Reference): return Reference(prefix=reference.prefix, identifier=reference.identifier) match _parse_str_or_curie_or_uri_helper(reference, ontology_prefix=ontology_prefix): case Reference() as parsed_reference: return parsed_reference case NotCURIEError() as exc: if ontology_prefix and _is_valid_identifier(reference): return default_reference(ontology_prefix, reference) else: raise exc case ParseError() as exc: raise exc raise TypeError def _chain_tag( tag: str, chains: list[list[Reference]] | None, ontology_prefix: str ) -> Iterable[str]: for chain in chains or []: yield f"{tag}: {multi_reference_escape(chain, ontology_prefix=ontology_prefix, add_name_comment=True)}" def _tag_property_targets( tag: str, stanza: Stanza, prod: ReferenceHint, *, ontology_prefix: str ) -> Iterable[str]: for x in stanza.get_property_values(_ensure_ref(prod)): if isinstance(x, Reference): yield f"{tag}: {reference_escape(x, ontology_prefix=ontology_prefix, add_name_comment=True)}" def _iterate_obo_relations( relations: Mapping[Reference, Sequence[Reference | OBOLiteral]], annotations: AnnotationsDict, *, ontology_prefix: str, skip_predicate_objects: Iterable[Reference] | None = None, skip_predicate_literals: Iterable[Reference] | None = None, typedefs: Mapping[ReferenceTuple, TypeDef], ) -> Iterable[str]: """Iterate over relations/property values for OBO.""" skip_predicate_objects = set(skip_predicate_objects or []) skip_predicate_literals = set(skip_predicate_literals or []) for predicate, values in sorted(relations.items()): _typedef_warn(prefix=ontology_prefix, predicate=predicate, typedefs=typedefs) pc = reference_escape(predicate, ontology_prefix=ontology_prefix) start = f"{pc} " for value in sorted(values, key=_reference_or_literal_key): match value: case OBOLiteral(dd, datatype, _language): if predicate in skip_predicate_literals: continue end = f'"{_escape_literal(dd)}" {get_preferred_curie(datatype)}' name = None case curies.Reference(): # it's a reference if predicate in skip_predicate_objects: # this allows us to special case out iterating over # ones that are configured with their own tags continue end = reference_escape(value, ontology_prefix=ontology_prefix) name = value.name case _: raise TypeError(f"got unexpected type {type(values)} with value: {values}") end += _get_obo_trailing_modifiers( predicate, value, annotations, ontology_prefix=ontology_prefix ) if predicate.name and name: end += f" ! {predicate.name} {name}" yield start + end def _escape_literal(s: str) -> str: return s.replace('"', '\\"') def _reference_or_literal_key(x: Reference | OBOLiteral) -> tuple[int, Reference | OBOLiteral]: if isinstance(x, Reference): return 0, x else: return 1, x def _get_obo_trailing_modifiers( p: ReferenceHint, o: Reference | OBOLiteral, annotations_dict: AnnotationsDict, *, ontology_prefix: str, ) -> str: """Lookup then format a sequence of annotations for OBO trailing modifiers.""" if annotations := annotations_dict.get(_property_resolve(p, o), []): return _format_obo_trailing_modifiers(annotations, ontology_prefix=ontology_prefix) return "" def _format_obo_trailing_modifiers( annotations: Sequence[Annotation], *, ontology_prefix: str ) -> str: """Format a sequence of annotations for OBO trailing modifiers. :param annotations: A list of annnotations :param ontology_prefix: The ontology prefix :returns: The trailing modifiers string See https://owlcollab.github.io/oboformat/doc/GO.format.obo-1_4.html#S.1.4 trailing modifiers can be both annotations and some other implementation-specific things, so split up the place where annotations are put in here. """ modifiers: list[tuple[str, str]] = [] for prop in sorted(annotations, key=Annotation._sort_key): left = reference_escape(prop.predicate, ontology_prefix=ontology_prefix) match prop.value: case Reference(): right = reference_escape(prop.value, ontology_prefix=ontology_prefix) case OBOLiteral(value, datatype, _language): if datatype == v.xsd_string: right = f'"{obo_escape_slim(value)}"' else: right = value case _: raise TypeError(f"invalid prop value: {type(prop.value)} - {prop.value}") modifiers.append((left, right)) inner = ", ".join(f"{key}={value}" for key, value in modifiers) return " {" + inner + "}" #: A set of warnings, used to make sure we don't show the same one over and over _TYPEDEF_WARNINGS: set[tuple[str, Reference]] = set() def _typedef_warn( prefix: str, predicate: Reference, typedefs: Mapping[ReferenceTuple, TypeDef] ) -> None: from pyobo.struct.typedef import default_typedefs if predicate.pair in default_typedefs or predicate.pair in typedefs: return None key = prefix, predicate if key not in _TYPEDEF_WARNINGS: _TYPEDEF_WARNINGS.add(key) if predicate.prefix == "obo": # Throw our hands up in the air. By using `obo` as the prefix, # we already threw using "real" definitions out the window logger.warning( f"[{prefix}] predicate with OBO prefix not defined: {predicate.curie}." f"\n\tThis might be because you used an unqualified prefix in an OBO file, " f"which automatically gets an OBO prefix." ) else: logger.warning(f"[{prefix}] typedef not defined: {predicate.curie}") class MappingContext(BaseModel): """Context for a mapping, corresponding to SSSOM.""" justification: Reference = unspecified_matching contributor: Reference | None = None confidence: float | None = None model_config = ConfigDict( frozen=True, # Makes the model immutable and hashable ) def _get_prefixes_from_annotations(annotations: Iterable[Annotation]) -> set[str]: return set(_get_references_from_annotations(annotations)) def _get_references_from_annotations( annotations: Iterable[Annotation], ) -> dict[str, set[Reference]]: rv: defaultdict[str, set[Reference]] = defaultdict(set) for left, right in annotations: rv[left.prefix].add(left) if isinstance(right, Reference): rv[right.prefix].add(right) return dict(rv) def _get_stanza_name_synonym(stanza: Stanza) -> LiteralMapping: return LiteralMapping( text=stanza.reference.name, reference=stanza.reference, predicate=_v.has_label, type=None, provenance=[p for p in stanza.provenance if isinstance(p, curies.Reference)], contributor=None, # TODO comment=None, # TODO source=stanza.reference.prefix, date=None, # TODO ) def _convert_synoynym(stanza: Stanza, synonym: Synonym) -> LiteralMapping: o = OBOLiteral.string(synonym.name, language=synonym.language) # TODO make this indexing reusable? similar code used for SSSOM export idx: dict[Reference, Reference | OBOLiteral] = { annotation.predicate: annotation.value for annotation in stanza._get_annotations(synonym.predicate, o) } comment = _safe_str(idx.get(v.comment)) contributor = _safe_str(idx.get(v.has_contributor)) date = _safe_str(idx.get(v.has_date)) return LiteralMapping( text=synonym.name, language=synonym.language, reference=stanza.reference, predicate=synonym.predicate, type=synonym.type, provenance=[p for p in synonym.provenance if isinstance(p, curies.Reference)], contributor=contributor, comment=comment, source=stanza.reference.prefix, date=date, ) def _safe_str(x: Reference | OBOLiteral | None) -> str | None: if x is None: return None return reference_or_literal_to_str(x)