"""Utiltites on top of the reference."""
from __future__ import annotations
import datetime
import itertools as itt
import logging
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Iterable, Mapping, Sequence
from typing import TYPE_CHECKING, Literal, NamedTuple, Self, TypeAlias, overload
import curies
from curies import ReferenceTuple
from curies import vocabulary as _v
from curies.vocabulary import SynonymScope
from pydantic import BaseModel, ConfigDict
from ssslm import LiteralMapping
from . import vocabulary as v
from .reference import (
OBOLiteral,
Reference,
Referenced,
comma_separate_references,
default_reference,
get_preferred_curie,
multi_reference_escape,
reference_escape,
reference_or_literal_to_str,
unspecified_matching,
)
from .utils import obo_escape_slim
from ..identifier_utils import (
NotCURIEError,
ParseError,
_is_valid_identifier,
_parse_str_or_curie_or_uri_helper,
)
if TYPE_CHECKING:
from pyobo.struct.struct import Synonym, TypeDef
__all__ = [
"Annotation",
"AnnotationsDict",
"HasReferencesMixin",
"ReferenceHint",
"Stanza",
]
logger = logging.getLogger(__name__)
[docs]
class Annotation(NamedTuple):
"""A tuple representing a predicate-object pair."""
predicate: Reference
value: Reference | OBOLiteral
[docs]
@classmethod
def float(cls, predicate: Reference | TypeDef, value: float) -> Self:
"""Return a literal property for a float."""
from .struct import TypeDef
if isinstance(predicate, TypeDef):
predicate = predicate.reference
return cls(predicate, OBOLiteral.float(value))
[docs]
@classmethod
def uri(cls, predicate: Reference | TypeDef, uri: str) -> Self:
"""Return a literal property for a URI."""
from .struct import TypeDef
if isinstance(predicate, TypeDef):
predicate = predicate.reference
return cls(predicate, OBOLiteral.uri(uri))
[docs]
@classmethod
def string(
cls, predicate: Reference | TypeDef, value: str, *, language: str | None = None
) -> Self:
"""Return a literal property for a float."""
from .struct import TypeDef
if isinstance(predicate, TypeDef):
predicate = predicate.reference
return cls(predicate, OBOLiteral.string(value, language=language))
@staticmethod
def _sort_key(x: Annotation) -> tuple[Reference, tuple[int, Reference | OBOLiteral]]:
return x.predicate, _reference_or_literal_key(x.value)
def _property_resolve(p: ReferenceHint, o: Reference | Referenced | OBOLiteral) -> Annotation:
p = _ensure_ref(p)
if isinstance(o, Referenced):
o = o.reference
return Annotation(p, o)
PropertiesHint: TypeAlias = dict[Reference, list[Reference | OBOLiteral]]
RelationsHint: TypeAlias = dict[Reference, list[Reference]]
AnnotationsDict: TypeAlias = dict[Annotation, list[Annotation]]
# note that an intersection is not valid in ROBOT with a literal, even though this _might_ make sense.
IntersectionOfHint: TypeAlias = list[Reference | tuple[Reference, Reference]]
UnionOfHint: TypeAlias = list[Reference]
StanzaType: TypeAlias = Literal["Term", "Instance", "TypeDef"]
stanza_type_to_prop: dict[StanzaType, Reference] = {
"Term": v.is_a,
"Instance": v.rdf_type,
"TypeDef": v.subproperty_of,
}
stanza_type_to_eq_prop: dict[StanzaType, Reference] = {
"Term": v.equivalent_class,
"Instance": v.owl_same_as,
"TypeDef": v.equivalent_property,
}
class HasReferencesMixin(ABC):
"""A class that can report on the references it contains."""
def _get_prefixes(self) -> set[str]:
return set(self._get_references())
@abstractmethod
def _get_references(self) -> dict[str, set[Reference]]:
raise NotImplementedError
class Stanza(Referenced, HasReferencesMixin):
"""A high-level class for stanzas."""
reference: Reference
relationships: RelationsHint
properties: PropertiesHint
xrefs: list[Reference]
parents: list[Reference]
intersection_of: IntersectionOfHint
equivalent_to: list[Reference]
union_of: UnionOfHint
subsets: list[Reference]
disjoint_from: list[Reference]
synonyms: list[Synonym]
type: StanzaType
_axioms: AnnotationsDict
#: An annotation for obsolescence. By default, is None, but this means that it is not obsolete.
is_obsolete: bool | None
#: A description of the entity
definition: str | None = None
@staticmethod
def _reference(
reference: Reference, ontology_prefix: str, add_name_comment: bool = False
) -> str:
return reference_escape(
reference, ontology_prefix=ontology_prefix, add_name_comment=add_name_comment
)
def _get_prefixes(self) -> set[str]:
return set(self._get_references())
def _get_references(self) -> dict[str, set[Reference]]:
"""Get all prefixes used by the typedef."""
rv: defaultdict[str, set[Reference]] = defaultdict(set)
def _add(r: Reference) -> None:
rv[r.prefix].add(r)
_add(self.reference)
for synonym in self.synonyms:
for prefix, references in synonym._get_references().items():
rv[prefix].update(references)
if self.xrefs:
# xrefs themselves added in the chain below
_add(v.has_dbxref)
for predicate, values in self.properties.items():
_add(predicate)
for value in values:
if isinstance(value, Reference):
_add(value)
elif isinstance(value, OBOLiteral):
_add(v._c(value.datatype))
for parent in itt.chain(
self.parents,
self.union_of,
self.equivalent_to,
self.disjoint_from,
self.subsets,
self.xrefs,
):
_add(parent)
for intersection_of in self.intersection_of:
match intersection_of:
case Reference():
_add(intersection_of)
case (intersection_predicate, intersection_value):
_add(intersection_predicate)
_add(intersection_value)
for rel_predicate, rel_values in self.relationships.items():
_add(rel_predicate)
for r in rel_values:
_add(r)
for p_o, annotations_ in self._axioms.items():
_add(p_o.predicate)
if isinstance(p_o.value, Reference):
_add(p_o.value)
for prefix, references in _get_references_from_annotations(annotations_).items():
rv[prefix].update(references)
return rv
def get_literal_mappings(self) -> list[LiteralMapping]:
"""Get synonym objects for this term, including one for its label."""
rv = [_convert_synoynym(self, synonym) for synonym in self.synonyms]
if self.reference.name:
rv.append(_get_stanza_name_synonym(self))
return rv
def append_relationship(
self,
typedef: ReferenceHint,
reference: ReferenceHint,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append a relationship."""
typedef = _ensure_ref(typedef)
reference = _ensure_ref(reference)
self.relationships[typedef].append(reference)
self._extend_annotations(typedef, reference, annotations)
return self
def _extend_annotations(
self, p: Reference, o: Reference | OBOLiteral, annotations: Iterable[Annotation] | None
) -> None:
if annotations is None:
return
for annotation in annotations:
self._append_annotation(p, o, annotation)
def _append_annotation(
self, p: ReferenceHint, o: Reference | OBOLiteral, annotation: Annotation
) -> None:
self._axioms[_property_resolve(p, o)].append(annotation)
# TODO check different usages of this
def append_equivalent(
self,
reference: ReferenceHint,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append an equivalent class axiom."""
return self.append_relationship(
stanza_type_to_eq_prop[self.type], reference, annotations=annotations
)
def append_equivalent_to(
self, reference: ReferenceHint, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Append to the "equivalent to" list."""
reference = _ensure_ref(reference)
self.equivalent_to.append(reference)
self._extend_annotations(stanza_type_to_eq_prop[self.type], reference, annotations)
return self
def append_xref(
self,
reference: ReferenceHint,
*,
mapping_justification: Reference | None = None,
confidence: float | None = None,
contributor: Reference | None = None,
annotations: list[Annotation] | None = None,
) -> Self:
"""Append an xref."""
reference = _ensure_ref(reference)
self.xrefs.append(reference)
if annotations is None:
annotations = []
annotations.extend(
self._prepare_mapping_annotations(
mapping_justification=mapping_justification,
confidence=confidence,
contributor=contributor,
)
)
self._extend_annotations(v.has_dbxref, reference, annotations)
return self
def _prepare_mapping_annotations(
self,
*,
mapping_justification: Reference | None = None,
confidence: float | None = None,
contributor: Reference | None = None,
) -> Iterable[Annotation]:
if mapping_justification is not None:
yield Annotation(v.mapping_has_justification, mapping_justification)
if contributor is not None:
yield Annotation(v.has_contributor, contributor)
if confidence is not None:
yield Annotation.float(v.mapping_has_confidence, confidence)
def append_parent(
self,
reference: ReferenceHint,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Add a parent to this entity."""
reference = _ensure_ref(reference)
if reference not in self.parents:
self.parents.append(reference)
self._extend_annotations(stanza_type_to_prop[self.type], reference, annotations)
return self
def append_intersection_of(
self,
/,
reference: ReferenceHint | tuple[ReferenceHint, ReferenceHint],
r2: ReferenceHint | None = None,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append an intersection of."""
if r2 is not None:
if isinstance(reference, tuple):
raise TypeError
self.intersection_of.append((_ensure_ref(reference), _ensure_ref(r2)))
elif isinstance(reference, tuple):
self.intersection_of.append((_ensure_ref(reference[0]), _ensure_ref(reference[1])))
else:
self.intersection_of.append(_ensure_ref(reference))
return self
def append_union_of(self, reference: ReferenceHint) -> Self:
"""Append to the "union of" list."""
self.union_of.append(_ensure_ref(reference))
return self
def _iterate_intersection_of_obo(self, *, ontology_prefix: str) -> Iterable[str]:
for element in sorted(self.intersection_of, key=self._intersection_of_key):
match element:
case Reference():
end = reference_escape(
element, ontology_prefix=ontology_prefix, add_name_comment=True
)
case (predicate, object):
match object:
case Reference():
end = multi_reference_escape(
[predicate, object],
ontology_prefix=ontology_prefix,
add_name_comment=True,
)
case OBOLiteral():
raise NotImplementedError
case _:
raise TypeError
yield f"intersection_of: {end}"
@staticmethod
def _intersection_of_key(
io: Reference | tuple[Reference, Reference],
) -> tuple[Literal[0], Reference] | tuple[Literal[1], tuple[Reference, Reference]]:
if isinstance(io, Reference):
return 0, io
else:
return 1, io
def _iterate_xref_obo(self, *, ontology_prefix: str) -> Iterable[str]:
for xref in sorted(self.xrefs):
xref_yv = f"xref: {reference_escape(xref, ontology_prefix=ontology_prefix, add_name_comment=False)}"
xref_yv += _get_obo_trailing_modifiers(
v.has_dbxref, xref, self._axioms, ontology_prefix=ontology_prefix
)
if xref.name:
xref_yv += f" ! {xref.name}"
yield xref_yv
def _get_annotations(
self, p: ReferenceHint, o: Reference | Referenced | OBOLiteral | str
) -> list[Annotation]:
if isinstance(o, str):
o = OBOLiteral.string(o)
return self._axioms.get(_property_resolve(p, o), [])
def _get_annotation(
self, p: ReferenceHint, o: Reference | OBOLiteral, ap: Reference
) -> Reference | OBOLiteral | None:
ap_norm = _ensure_ref(ap)
for annotation in self._get_annotations(p, o):
if annotation.predicate.pair == ap_norm.pair:
return annotation.value
return None
def append_property(
self, prop: Annotation, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Annotate a property."""
self.properties[prop.predicate].append(prop.value)
self._extend_annotations(prop.predicate, prop.value, annotations)
return self
def annotate_literal(
self,
prop: ReferenceHint,
value: OBOLiteral,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append an object annotation."""
prop = _ensure_ref(prop)
self.properties[prop].append(value)
self._extend_annotations(prop, value, annotations)
return self
def annotate_string(
self,
prop: ReferenceHint,
value: str,
*,
annotations: Iterable[Annotation] | None = None,
language: str | None = None,
) -> Self:
"""Append an object annotation."""
return self.annotate_literal(
prop, OBOLiteral.string(value, language=language), annotations=annotations
)
def annotate_boolean(
self,
prop: ReferenceHint,
value: bool,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append an object annotation."""
return self.annotate_literal(prop, OBOLiteral.boolean(value), annotations=annotations)
def annotate_integer(
self,
prop: ReferenceHint,
value: int | str,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append an object annotation."""
return self.annotate_literal(prop, OBOLiteral.integer(value), annotations=annotations)
def annotate_float(
self, prop: ReferenceHint, value: float, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Append a float annotation."""
return self.annotate_literal(prop, OBOLiteral.float(value), annotations=annotations)
def annotate_decimal(
self, prop: ReferenceHint, value: float, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Append a decimal annotation."""
return self.annotate_literal(prop, OBOLiteral.decimal(value), annotations=annotations)
def annotate_year(
self,
prop: ReferenceHint,
value: int | str,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append a year annotation."""
return self.annotate_literal(prop, OBOLiteral.year(value), annotations=annotations)
def annotate_uri(
self, prop: ReferenceHint, value: str, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Append a URI annotation."""
return self.annotate_literal(prop, OBOLiteral.uri(value), annotations=annotations)
def annotate_datetime(
self,
prop: ReferenceHint,
value: datetime.datetime | str,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append a datetime annotation."""
return self.annotate_literal(prop, OBOLiteral.datetime(value), annotations=annotations)
def annotate_date(
self,
prop: ReferenceHint,
value: datetime.datetime | datetime.date | str,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append a date annotation."""
return self.annotate_literal(prop, OBOLiteral.date(value), annotations=annotations)
def _iterate_obo_properties(
self,
*,
ontology_prefix: str,
skip_predicate_objects: Iterable[Reference] | None = None,
skip_predicate_literals: Iterable[Reference] | None = None,
typedefs: Mapping[ReferenceTuple, TypeDef],
) -> Iterable[str]:
for line in _iterate_obo_relations(
self.properties,
self._axioms,
ontology_prefix=ontology_prefix,
skip_predicate_objects=skip_predicate_objects,
skip_predicate_literals=skip_predicate_literals,
typedefs=typedefs,
):
yield f"property_value: {line}"
def _iterate_obo_relations(
self, *, ontology_prefix: str, typedefs: Mapping[ReferenceTuple, TypeDef]
) -> Iterable[str]:
for line in _iterate_obo_relations(
self.relationships,
self._axioms,
ontology_prefix=ontology_prefix,
typedefs=typedefs,
):
yield f"relationship: {line}"
def append_subset(self, subset: ReferenceHint) -> Self:
"""Add a subset."""
self.subsets.append(_ensure_ref(subset))
return self
def append_disjoint_from(self, reference: ReferenceHint) -> Self:
"""Add a disjoint from."""
self.disjoint_from.append(_ensure_ref(reference))
return self
def annotate_object(
self,
typedef: ReferenceHint,
value: ReferenceHint,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append an object annotation."""
typedef = _ensure_ref(typedef)
value = _ensure_ref(value)
self.properties[typedef].append(value)
self._extend_annotations(typedef, value, annotations)
return self
def append_contributor(self, reference: ReferenceHint) -> Self:
"""Append contributor."""
return self.annotate_object(v.has_contributor, reference)
def append_creation_date(self, date: datetime.datetime | str) -> Self:
"""Append contributor."""
return self.annotate_datetime(v.obo_creation_date, date)
def get_see_also(self) -> list[Reference]:
"""Get all see also objects."""
return self.get_property_objects(v.see_also)
def get_replaced_by(self) -> list[Reference]:
"""Get all replaced by."""
return self.get_property_objects(v.term_replaced_by)
def append_replaced_by(
self, reference: Reference, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Add a replaced by property."""
return self.annotate_object(v.term_replaced_by, reference, annotations=annotations)
def iterate_relations(self) -> Iterable[tuple[Reference, Reference]]:
"""Iterate over pairs of typedefs and targets."""
for typedef, targets in sorted(self.relationships.items()):
for target in sorted(targets):
yield typedef, target
def iterate_object_properties(self) -> Iterable[tuple[Reference, Reference]]:
"""Iterate over properties with references as their targets."""
for predicate, values in self.properties.items():
for value in values:
if isinstance(value, Reference):
yield predicate, value
def iterate_literal_properties(self) -> Iterable[tuple[Reference, OBOLiteral]]:
"""Iterate over properties with literals as their targets."""
for predicate, values in self.properties.items():
for value in values:
if isinstance(value, OBOLiteral):
yield predicate, value
def get_relationships(self, typedef: ReferenceHint) -> list[Reference]:
"""Get relationships from the given type."""
return self.relationships.get(_ensure_ref(typedef), [])
# docstr-coverage:excused `overload`
@overload
def get_relationship(
self, typedef: ReferenceHint, *, strict: Literal[False] = ...
) -> Reference | None: ...
# docstr-coverage:excused `overload`
@overload
def get_relationship(
self, typedef: ReferenceHint, *, strict: Literal[True] = ...
) -> Reference: ...
def get_relationship(self, typedef: ReferenceHint, *, strict: bool = False) -> Reference | None:
"""Get a single relationship of the given type."""
r = self.get_relationships(typedef)
if not r:
if strict:
raise ValueError
return None
if len(r) > 1:
raise ValueError(f"multiple relationships returned: {r}")
return r[0]
def iterate_relation_targets(self, typedef: ReferenceHint) -> list[Reference]:
"""Iterate over pairs of typedefs and targets."""
return sorted(self.relationships.get(_ensure_ref(typedef), []))
def get_property_annotations(self) -> list[Annotation]:
"""Iterate over pairs of property and values."""
return [
Annotation(prop, value)
for prop, values in sorted(self.properties.items())
for value in sorted(values, key=_reference_or_literal_key)
]
def get_property_values(self, typedef: ReferenceHint) -> list[Reference | OBOLiteral]:
"""Iterate over references or values."""
return sorted(self.properties.get(_ensure_ref(typedef), []))
def get_property_objects(self, prop: ReferenceHint) -> list[Reference]:
"""Get properties from the given key."""
return sorted(
reference
for reference in self.properties.get(_ensure_ref(prop), [])
if isinstance(reference, curies.Reference)
)
def append_exact_synonym(
self,
synonym: str | Synonym,
*,
type: Reference | Referenced | None = None,
provenance: Sequence[Reference | OBOLiteral] | None = None,
annotations: Iterable[Annotation] | None = None,
language: str | None = None,
) -> Self:
"""Add an exact synonym."""
return self.append_synonym(
synonym,
type=type,
specificity="EXACT",
provenance=provenance,
annotations=annotations,
language=language,
)
def append_synonym(
self,
synonym: str | Synonym,
*,
type: Reference | Referenced | None = None,
specificity: SynonymScope | None = None,
provenance: Sequence[Reference | OBOLiteral] | None = None,
annotations: Iterable[Annotation] | None = None,
language: str | None = None,
) -> Self:
"""Add a synonym."""
if isinstance(type, Referenced):
type = type.reference
if isinstance(synonym, str):
from pyobo.struct.struct import Synonym
synonym = Synonym(
synonym,
type=type,
specificity=specificity,
provenance=list(provenance or []),
annotations=list(annotations or []),
language=language,
)
self.synonyms.append(synonym)
return self
def append_alt(
self, alt: Reference, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Add an alternative identifier."""
return self.annotate_object(v.alternative_term, alt, annotations=annotations)
def append_see_also(
self, reference: ReferenceHint, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Add a see also property."""
_reference = _ensure_ref(reference)
return self.annotate_object(v.see_also, _reference, annotations=annotations)
def append_comment(
self,
value: str,
*,
annotations: Iterable[Annotation] | None = None,
language: str | None = None,
) -> Self:
"""Add a comment property."""
return self.annotate_string(v.comment, value, annotations=annotations, language=language)
def get_comments(self) -> list[str]:
"""Get all comment strings."""
return [x.value for x in self.get_property_values(v.comment) if isinstance(x, OBOLiteral)]
@property
def alt_ids(self) -> Sequence[Reference]:
"""Get alternative terms."""
return tuple(self.get_property_objects(v.alternative_term))
def get_edges(self, *, include_xrefs: bool = True) -> list[tuple[Reference, Reference]]:
"""Get edges."""
return list(self._iter_edges(include_xrefs=include_xrefs))
def _iter_parents(self) -> Iterable[tuple[Reference, Reference]]:
parent_prop = stanza_type_to_prop[self.type]
for parent in itt.chain(self.parents, self.union_of):
yield parent_prop, parent
def _iter_intersections(self) -> Iterable[tuple[Reference, Reference]]:
parent_prop = stanza_type_to_prop[self.type]
for intersection_of in self.intersection_of:
match intersection_of:
case Reference():
yield parent_prop, intersection_of
case (predicate, target):
yield predicate, target
def _iter_edges(self, *, include_xrefs: bool = True) -> Iterable[tuple[Reference, Reference]]:
# The following are "object" properties, meaning
# they're part of the definition of the object
yield from self.iterate_relations()
yield from self._iter_parents()
yield from self._iter_intersections()
for equivalent_to in self.equivalent_to:
yield stanza_type_to_eq_prop[self.type], equivalent_to
# The following are "annotation" properties
for subset in self.subsets:
yield v.in_subset, subset
yield from self.iterate_object_properties()
if include_xrefs:
for xref_reference in self.xrefs:
yield v.has_dbxref, xref_reference
# TODO disjoint_from
# docstr-coverage:excused `overload`
@overload
def get_mappings(
self, *, include_xrefs: bool = ..., add_context: Literal[False] = ...
) -> list[tuple[Reference, Reference]]: ...
# docstr-coverage:excused `overload`
@overload
def get_mappings(
self, *, include_xrefs: bool = ..., add_context: Literal[True] = ...
) -> list[tuple[Reference, Reference, MappingContext]]: ...
def get_mappings(
self, *, include_xrefs: bool = True, add_context: bool = False
) -> list[tuple[Reference, Reference]] | list[tuple[Reference, Reference, MappingContext]]:
"""Get mappings with preferred curies."""
rows = []
for predicate in v.extended_match_typedefs:
for xref_reference in itt.chain(
self.get_property_objects(predicate), self.get_relationships(predicate)
):
rows.append((predicate, xref_reference))
if include_xrefs:
for xref_reference in self.xrefs:
rows.append((v.has_dbxref, xref_reference))
for equivalent_to in self.equivalent_to:
rows.append((v.equivalent_class, equivalent_to))
rv = sorted(set(rows))
if not add_context:
return rv
return [(k, v, self._get_mapping_context(k, v)) for k, v in rv]
def _get_object_annotation_target(
self, p: Reference, o: Reference | OBOLiteral, ap: Reference
) -> Reference | None:
match self._get_annotation(p, o, ap):
case OBOLiteral():
raise TypeError
case Reference() as target:
return target
case None:
return None
case _:
raise TypeError
def _get_str_annotation_target(
self, p: Reference, o: Reference | OBOLiteral, ap: Reference
) -> str | None:
match self._get_annotation(p, o, ap):
case OBOLiteral(value, _):
return value
case Reference():
raise TypeError
case None:
return None
case _:
raise TypeError
def _get_mapping_context(self, p: Reference, o: Reference) -> MappingContext:
return MappingContext(
justification=self._get_object_annotation_target(p, o, v.mapping_has_justification)
or unspecified_matching,
contributor=self._get_object_annotation_target(p, o, v.has_contributor),
confidence=self._get_str_annotation_target(p, o, v.mapping_has_confidence),
)
def _definition_fp(self) -> str:
definition = obo_escape_slim(self.definition) if self.definition else ""
dp = self._get_definition_provenance()
if dp:
return f'"{definition}" [{comma_separate_references(dp)}]'
else:
return f'"{definition}"'
def _get_definition_provenance(self) -> Sequence[Reference | OBOLiteral]:
if self.definition is None:
return []
return [
annotation.value
for annotation in self._get_annotations(v.has_description, self.definition)
if annotation.predicate.pair == v.has_dbxref.pair
]
@property
def provenance(self) -> Sequence[Reference | OBOLiteral]:
"""Get definition provenance."""
# return as a tuple to make sure nobody is appending on it
return (
*self.get_property_objects(v.is_mentioned_by),
# This gets all of the xrefs on _any_ axiom,
# which includes the definition provenance
*(
annotation.value
for annotation in itt.chain.from_iterable(self._axioms.values())
if annotation.predicate.pair == v.has_dbxref.pair
),
)
def append_definition_xref(self, reference: ReferenceHint) -> Self:
"""Add a reference to this term's definition."""
if not self.definition:
raise ValueError("can not append definition provenance if no definition is set")
self._append_annotation(
v.has_description,
OBOLiteral.string(self.definition),
Annotation(v.has_dbxref, _ensure_ref(reference)),
)
return self
def append_mentioned_by(
self,
reference: Reference,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append a creative work that mentions this term."""
return self.annotate_object(v.is_mentioned_by, reference, annotations=annotations)
ReferenceHint: TypeAlias = (
Reference | Referenced | curies.Reference | curies.NamedReference | tuple[str, str] | str
)
def _ensure_ref(
reference: ReferenceHint,
*,
ontology_prefix: str | None = None,
) -> Reference:
if isinstance(reference, Referenced):
return reference.reference
if isinstance(reference, tuple):
return Reference(prefix=reference[0], identifier=reference[1])
if isinstance(reference, Reference):
return reference
if isinstance(reference, curies.NamedReference):
return Reference(
prefix=reference.prefix, identifier=reference.identifier, name=reference.name
)
if isinstance(reference, curies.Reference):
return Reference(prefix=reference.prefix, identifier=reference.identifier)
match _parse_str_or_curie_or_uri_helper(reference, ontology_prefix=ontology_prefix):
case Reference() as parsed_reference:
return parsed_reference
case NotCURIEError() as exc:
if ontology_prefix and _is_valid_identifier(reference):
return default_reference(ontology_prefix, reference)
else:
raise exc
case ParseError() as exc:
raise exc
raise TypeError
def _chain_tag(
tag: str, chains: list[list[Reference]] | None, ontology_prefix: str
) -> Iterable[str]:
for chain in chains or []:
yield f"{tag}: {multi_reference_escape(chain, ontology_prefix=ontology_prefix, add_name_comment=True)}"
def _tag_property_targets(
tag: str, stanza: Stanza, prod: ReferenceHint, *, ontology_prefix: str
) -> Iterable[str]:
for x in stanza.get_property_values(_ensure_ref(prod)):
if isinstance(x, Reference):
yield f"{tag}: {reference_escape(x, ontology_prefix=ontology_prefix, add_name_comment=True)}"
def _iterate_obo_relations(
relations: Mapping[Reference, Sequence[Reference | OBOLiteral]],
annotations: AnnotationsDict,
*,
ontology_prefix: str,
skip_predicate_objects: Iterable[Reference] | None = None,
skip_predicate_literals: Iterable[Reference] | None = None,
typedefs: Mapping[ReferenceTuple, TypeDef],
) -> Iterable[str]:
"""Iterate over relations/property values for OBO."""
skip_predicate_objects = set(skip_predicate_objects or [])
skip_predicate_literals = set(skip_predicate_literals or [])
for predicate, values in sorted(relations.items()):
_typedef_warn(prefix=ontology_prefix, predicate=predicate, typedefs=typedefs)
pc = reference_escape(predicate, ontology_prefix=ontology_prefix)
start = f"{pc} "
for value in sorted(values, key=_reference_or_literal_key):
match value:
case OBOLiteral(dd, datatype, _language):
if predicate in skip_predicate_literals:
continue
end = f'"{_escape_literal(dd)}" {get_preferred_curie(datatype)}'
name = None
case curies.Reference(): # it's a reference
if predicate in skip_predicate_objects:
# this allows us to special case out iterating over
# ones that are configured with their own tags
continue
end = reference_escape(value, ontology_prefix=ontology_prefix)
name = value.name
case _:
raise TypeError(f"got unexpected type {type(values)} with value: {values}")
end += _get_obo_trailing_modifiers(
predicate, value, annotations, ontology_prefix=ontology_prefix
)
if predicate.name and name:
end += f" ! {predicate.name} {name}"
yield start + end
def _escape_literal(s: str) -> str:
return s.replace('"', '\\"')
def _reference_or_literal_key(x: Reference | OBOLiteral) -> tuple[int, Reference | OBOLiteral]:
if isinstance(x, Reference):
return 0, x
else:
return 1, x
def _get_obo_trailing_modifiers(
p: ReferenceHint,
o: Reference | OBOLiteral,
annotations_dict: AnnotationsDict,
*,
ontology_prefix: str,
) -> str:
"""Lookup then format a sequence of annotations for OBO trailing modifiers."""
if annotations := annotations_dict.get(_property_resolve(p, o), []):
return _format_obo_trailing_modifiers(annotations, ontology_prefix=ontology_prefix)
return ""
def _format_obo_trailing_modifiers(
annotations: Sequence[Annotation], *, ontology_prefix: str
) -> str:
"""Format a sequence of annotations for OBO trailing modifiers.
:param annotations: A list of annnotations
:param ontology_prefix: The ontology prefix
:returns: The trailing modifiers string
See https://owlcollab.github.io/oboformat/doc/GO.format.obo-1_4.html#S.1.4 trailing
modifiers can be both annotations and some other implementation-specific things, so
split up the place where annotations are put in here.
"""
modifiers: list[tuple[str, str]] = []
for prop in sorted(annotations, key=Annotation._sort_key):
left = reference_escape(prop.predicate, ontology_prefix=ontology_prefix)
match prop.value:
case Reference():
right = reference_escape(prop.value, ontology_prefix=ontology_prefix)
case OBOLiteral(value, datatype, _language):
if datatype == v.xsd_string:
right = f'"{obo_escape_slim(value)}"'
else:
right = value
case _:
raise TypeError(f"invalid prop value: {type(prop.value)} - {prop.value}")
modifiers.append((left, right))
inner = ", ".join(f"{key}={value}" for key, value in modifiers)
return " {" + inner + "}"
#: A set of warnings, used to make sure we don't show the same one over and over
_TYPEDEF_WARNINGS: set[tuple[str, Reference]] = set()
def _typedef_warn(
prefix: str, predicate: Reference, typedefs: Mapping[ReferenceTuple, TypeDef]
) -> None:
from pyobo.struct.typedef import default_typedefs
if predicate.pair in default_typedefs or predicate.pair in typedefs:
return None
key = prefix, predicate
if key not in _TYPEDEF_WARNINGS:
_TYPEDEF_WARNINGS.add(key)
if predicate.prefix == "obo":
# Throw our hands up in the air. By using `obo` as the prefix,
# we already threw using "real" definitions out the window
logger.warning(
f"[{prefix}] predicate with OBO prefix not defined: {predicate.curie}."
f"\n\tThis might be because you used an unqualified prefix in an OBO file, "
f"which automatically gets an OBO prefix."
)
else:
logger.warning(f"[{prefix}] typedef not defined: {predicate.curie}")
class MappingContext(BaseModel):
"""Context for a mapping, corresponding to SSSOM."""
justification: Reference = unspecified_matching
contributor: Reference | None = None
confidence: float | None = None
model_config = ConfigDict(
frozen=True, # Makes the model immutable and hashable
)
def _get_prefixes_from_annotations(annotations: Iterable[Annotation]) -> set[str]:
return set(_get_references_from_annotations(annotations))
def _get_references_from_annotations(
annotations: Iterable[Annotation],
) -> dict[str, set[Reference]]:
rv: defaultdict[str, set[Reference]] = defaultdict(set)
for left, right in annotations:
rv[left.prefix].add(left)
if isinstance(right, Reference):
rv[right.prefix].add(right)
return dict(rv)
def _get_stanza_name_synonym(stanza: Stanza) -> LiteralMapping:
return LiteralMapping(
text=stanza.reference.name,
reference=stanza.reference,
predicate=_v.has_label,
type=None,
provenance=[p for p in stanza.provenance if isinstance(p, curies.Reference)],
contributor=None, # TODO
comment=None, # TODO
source=stanza.reference.prefix,
date=None, # TODO
)
def _convert_synoynym(stanza: Stanza, synonym: Synonym) -> LiteralMapping:
o = OBOLiteral.string(synonym.name, language=synonym.language)
# TODO make this indexing reusable? similar code used for SSSOM export
idx: dict[Reference, Reference | OBOLiteral] = {
annotation.predicate: annotation.value
for annotation in stanza._get_annotations(synonym.predicate, o)
}
comment = _safe_str(idx.get(v.comment))
contributor = _safe_str(idx.get(v.has_contributor))
date = _safe_str(idx.get(v.has_date))
return LiteralMapping(
text=synonym.name,
language=synonym.language,
reference=stanza.reference,
predicate=synonym.predicate,
type=synonym.type,
provenance=[p for p in synonym.provenance if isinstance(p, curies.Reference)],
contributor=contributor,
comment=comment,
source=stanza.reference.prefix,
date=date,
)
def _safe_str(x: Reference | OBOLiteral | None) -> str | None:
if x is None:
return None
return reference_or_literal_to_str(x)