"""Utiltites on top of the reference."""
from __future__ import annotations
import datetime
import itertools as itt
import logging
import warnings
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Iterable, Mapping, Sequence
from typing import TYPE_CHECKING, Literal, NamedTuple, Self, TypeAlias, overload
import curies
from curies import ReferenceTuple
from curies import vocabulary as _v
from curies.vocabulary import SynonymScope
from pydantic import BaseModel, ConfigDict
from ssslm import LiteralMapping
from . import vocabulary as v
from .reference import (
OBOLiteral,
Reference,
Referenced,
comma_separate_references,
default_reference,
get_preferred_curie,
multi_reference_escape,
reference_escape,
reference_or_literal_to_str,
unspecified_matching,
)
from .utils import obo_escape_slim
from ..identifier_utils import (
NotCURIEError,
ParseError,
_is_valid_identifier,
_parse_str_or_curie_or_uri_helper,
)
if TYPE_CHECKING:
from pyobo.struct.struct import Synonym, TypeDef
__all__ = [
"Annotation",
"AnnotationsDict",
"HasReferencesMixin",
"ReferenceHint",
"Stanza",
]
logger = logging.getLogger(__name__)
[docs]
class Annotation(NamedTuple):
"""A tuple representing a predicate-object pair."""
predicate: Reference
value: Reference | OBOLiteral
[docs]
@classmethod
def float(cls, predicate: Reference | TypeDef, value: float) -> Self:
"""Return a literal property for a float."""
from .struct import TypeDef
if isinstance(predicate, TypeDef):
predicate = predicate.reference
return cls(predicate, OBOLiteral.float(value))
[docs]
@classmethod
def uri(cls, predicate: Reference | TypeDef, uri: str) -> Self:
"""Return a literal property for a URI."""
from .struct import TypeDef
if isinstance(predicate, TypeDef):
predicate = predicate.reference
return cls(predicate, OBOLiteral.uri(uri))
[docs]
@classmethod
def string(
cls, predicate: Reference | TypeDef, value: str, *, language: str | None = None
) -> Self:
"""Return a literal property for a float."""
from .struct import TypeDef
if isinstance(predicate, TypeDef):
predicate = predicate.reference
return cls(predicate, OBOLiteral.string(value, language=language))
@staticmethod
def _sort_key(x: Annotation) -> tuple[Reference, tuple[int, Reference | OBOLiteral]]:
return x.predicate, _reference_or_literal_key(x.value)
def _property_resolve(p: ReferenceHint, o: Reference | Referenced | OBOLiteral) -> Annotation:
p = _ensure_ref(p)
if isinstance(o, Referenced):
o = o.reference
return Annotation(p, o)
PropertiesHint: TypeAlias = dict[Reference, list[Reference | OBOLiteral]]
RelationsHint: TypeAlias = dict[Reference, list[Reference]]
AnnotationsDict: TypeAlias = dict[Annotation, list[Annotation]]
# note that an intersection is not valid in ROBOT with a literal, even though this _might_ make sense.
IntersectionOfHint: TypeAlias = list[Reference | tuple[Reference, Reference]]
UnionOfHint: TypeAlias = list[Reference]
StanzaType: TypeAlias = Literal["Term", "Instance", "TypeDef"]
stanza_type_to_prop: dict[StanzaType, Reference] = {
"Term": v.is_a,
"Instance": v.rdf_type,
"TypeDef": v.subproperty_of,
}
stanza_type_to_eq_prop: dict[StanzaType, Reference] = {
"Term": v.equivalent_class,
"Instance": v.owl_same_as,
"TypeDef": v.equivalent_property,
}
class HasReferencesMixin(ABC):
"""A class that can report on the references it contains."""
def _get_prefixes(self) -> set[str]:
return set(self._get_references())
@abstractmethod
def _get_references(self) -> dict[str, set[Reference]]:
raise NotImplementedError
class Stanza(Referenced, HasReferencesMixin):
"""A high-level class for stanzas."""
reference: Reference
relationships: RelationsHint
properties: PropertiesHint
xrefs: list[Reference]
parents: list[Reference]
intersection_of: IntersectionOfHint
equivalent_to: list[Reference]
union_of: UnionOfHint
subsets: list[Reference]
disjoint_from: list[Reference]
synonyms: list[Synonym]
type: StanzaType
_axioms: AnnotationsDict
#: An annotation for obsolescence. By default, is None, but this means that it is not obsolete.
is_obsolete: bool | None
#: A description of the entity
definition: str | None = None
@staticmethod
def _reference(
reference: Reference, ontology_prefix: str, add_name_comment: bool = False
) -> str:
return reference_escape(
reference, ontology_prefix=ontology_prefix, add_name_comment=add_name_comment
)
def _get_prefixes(self) -> set[str]:
return set(self._get_references())
def _get_references(self) -> dict[str, set[Reference]]:
"""Get all prefixes used by the typedef."""
rv: defaultdict[str, set[Reference]] = defaultdict(set)
def _add(r: Reference) -> None:
rv[r.prefix].add(r)
_add(self.reference)
for synonym in self.synonyms:
for prefix, references in synonym._get_references().items():
rv[prefix].update(references)
if self.xrefs:
# xrefs themselves added in the chain below
_add(v.has_dbxref)
for predicate, values in self.properties.items():
_add(predicate)
for value in values:
if isinstance(value, Reference):
_add(value)
elif isinstance(value, OBOLiteral):
_add(v._c(value.datatype))
for parent in itt.chain(
self.parents,
self.union_of,
self.equivalent_to,
self.disjoint_from,
self.subsets,
self.xrefs,
):
_add(parent)
for intersection_of in self.intersection_of:
match intersection_of:
case Reference():
_add(intersection_of)
case (intersection_predicate, intersection_value):
_add(intersection_predicate)
_add(intersection_value)
for rel_predicate, rel_values in self.relationships.items():
_add(rel_predicate)
for r in rel_values:
_add(r)
for p_o, annotations_ in self._axioms.items():
_add(p_o.predicate)
if isinstance(p_o.value, Reference):
_add(p_o.value)
for prefix, references in _get_references_from_annotations(annotations_).items():
rv[prefix].update(references)
return rv
def get_literal_mappings(self) -> list[LiteralMapping]:
"""Get synonym objects for this term, including one for its label."""
rv = [_convert_synoynym(self, synonym) for synonym in self.synonyms]
if self.reference.name:
rv.append(_get_stanza_name_synonym(self))
return rv
def append_relationship(
self,
typedef: ReferenceHint,
reference: ReferenceHint,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append a relationship."""
typedef = _ensure_ref(typedef)
reference = _ensure_ref(reference)
self.relationships[typedef].append(reference)
self._extend_annotations(typedef, reference, annotations)
return self
def _extend_annotations(
self, p: Reference, o: Reference | OBOLiteral, annotations: Iterable[Annotation] | None
) -> None:
if annotations is None:
return
for annotation in annotations:
self._append_annotation(p, o, annotation)
def _append_annotation(
self, p: ReferenceHint, o: Reference | OBOLiteral, annotation: Annotation
) -> None:
self._axioms[_property_resolve(p, o)].append(annotation)
# TODO check different usages of this
def append_equivalent(
self,
reference: ReferenceHint,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append an equivalent class axiom."""
return self.append_relationship(
stanza_type_to_eq_prop[self.type], reference, annotations=annotations
)
def append_equivalent_to(
self, reference: ReferenceHint, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Append to the "equivalent to" list."""
reference = _ensure_ref(reference)
self.equivalent_to.append(reference)
self._extend_annotations(stanza_type_to_eq_prop[self.type], reference, annotations)
return self
def append_xref(
self,
reference: ReferenceHint,
*,
mapping_justification: Reference | None = None,
confidence: float | None = None,
contributor: Reference | None = None,
annotations: list[Annotation] | None = None,
) -> Self:
"""Append an xref."""
reference = _ensure_ref(reference)
self.xrefs.append(reference)
if annotations is None:
annotations = []
annotations.extend(
self._prepare_mapping_annotations(
mapping_justification=mapping_justification,
confidence=confidence,
contributor=contributor,
)
)
self._extend_annotations(v.has_dbxref, reference, annotations)
return self
def _prepare_mapping_annotations(
self,
*,
mapping_justification: Reference | None = None,
confidence: float | None = None,
contributor: Reference | None = None,
) -> Iterable[Annotation]:
if mapping_justification is not None:
yield Annotation(v.mapping_has_justification, mapping_justification)
if contributor is not None:
yield Annotation(v.has_contributor, contributor)
if confidence is not None:
yield Annotation.float(v.mapping_has_confidence, confidence)
def append_parent(
self,
reference: ReferenceHint,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Add a parent to this entity."""
reference = _ensure_ref(reference)
if reference not in self.parents:
self.parents.append(reference)
self._extend_annotations(stanza_type_to_prop[self.type], reference, annotations)
return self
def append_intersection_of(
self,
/,
reference: ReferenceHint | tuple[ReferenceHint, ReferenceHint],
r2: ReferenceHint | None = None,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append an intersection of."""
if r2 is not None:
if isinstance(reference, tuple):
raise TypeError
self.intersection_of.append((_ensure_ref(reference), _ensure_ref(r2)))
elif isinstance(reference, tuple):
self.intersection_of.append((_ensure_ref(reference[0]), _ensure_ref(reference[1])))
else:
self.intersection_of.append(_ensure_ref(reference))
return self
def append_union_of(self, reference: ReferenceHint) -> Self:
"""Append to the "union of" list."""
self.union_of.append(_ensure_ref(reference))
return self
def _iterate_intersection_of_obo(self, *, ontology_prefix: str) -> Iterable[str]:
for element in sorted(self.intersection_of, key=self._intersection_of_key):
match element:
case Reference():
end = reference_escape(
element, ontology_prefix=ontology_prefix, add_name_comment=True
)
case (predicate, object):
match object:
case Reference():
end = multi_reference_escape(
[predicate, object],
ontology_prefix=ontology_prefix,
add_name_comment=True,
)
case OBOLiteral():
raise NotImplementedError
case _:
raise TypeError
yield f"intersection_of: {end}"
@staticmethod
def _intersection_of_key(
io: Reference | tuple[Reference, Reference],
) -> tuple[Literal[0], Reference] | tuple[Literal[1], tuple[Reference, Reference]]:
if isinstance(io, Reference):
return 0, io
else:
return 1, io
def _iterate_xref_obo(self, *, ontology_prefix: str) -> Iterable[str]:
for xref in sorted(self.xrefs):
xref_yv = f"xref: {reference_escape(xref, ontology_prefix=ontology_prefix, add_name_comment=False)}"
xref_yv += _get_obo_trailing_modifiers(
v.has_dbxref, xref, self._axioms, ontology_prefix=ontology_prefix
)
if xref.name:
xref_yv += f" ! {xref.name}"
yield xref_yv
def _get_annotations(
self, p: ReferenceHint, o: Reference | Referenced | OBOLiteral | str
) -> list[Annotation]:
if isinstance(o, str):
o = OBOLiteral.string(o)
return self._axioms.get(_property_resolve(p, o), [])
def _get_annotation(
self, p: ReferenceHint, o: Reference | OBOLiteral, ap: Reference
) -> Reference | OBOLiteral | None:
ap_norm = _ensure_ref(ap)
for annotation in self._get_annotations(p, o):
if annotation.predicate.pair == ap_norm.pair:
return annotation.value
return None
def append_property(
self, prop: Annotation, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Annotate a property."""
self.properties[prop.predicate].append(prop.value)
self._extend_annotations(prop.predicate, prop.value, annotations)
return self
def annotate_literal(
self,
prop: ReferenceHint,
value: OBOLiteral,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append an object annotation."""
prop = _ensure_ref(prop)
self.properties[prop].append(value)
self._extend_annotations(prop, value, annotations)
return self
def annotate_string(
self,
prop: ReferenceHint,
value: str,
*,
annotations: Iterable[Annotation] | None = None,
language: str | None = None,
) -> Self:
"""Append an object annotation."""
return self.annotate_literal(
prop, OBOLiteral.string(value, language=language), annotations=annotations
)
def annotate_boolean(
self,
prop: ReferenceHint,
value: bool,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append an object annotation."""
return self.annotate_literal(prop, OBOLiteral.boolean(value), annotations=annotations)
def annotate_integer(
self,
prop: ReferenceHint,
value: int | str,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append an object annotation."""
return self.annotate_literal(prop, OBOLiteral.integer(value), annotations=annotations)
def annotate_float(
self, prop: ReferenceHint, value: float, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Append a float annotation."""
return self.annotate_literal(prop, OBOLiteral.float(value), annotations=annotations)
def annotate_decimal(
self, prop: ReferenceHint, value: float, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Append a decimal annotation."""
return self.annotate_literal(prop, OBOLiteral.decimal(value), annotations=annotations)
def annotate_year(
self,
prop: ReferenceHint,
value: int | str,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append a year annotation."""
return self.annotate_literal(prop, OBOLiteral.year(value), annotations=annotations)
def annotate_uri(
self, prop: ReferenceHint, value: str, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Append a URI annotation."""
return self.annotate_literal(prop, OBOLiteral.uri(value), annotations=annotations)
def annotate_datetime(
self,
prop: ReferenceHint,
value: datetime.datetime | str,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append a datetime annotation."""
return self.annotate_literal(prop, OBOLiteral.datetime(value), annotations=annotations)
def annotate_date(
self,
prop: ReferenceHint,
value: datetime.datetime | datetime.date | str,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append a date annotation."""
return self.annotate_literal(prop, OBOLiteral.date(value), annotations=annotations)
def _iterate_obo_properties(
self,
*,
ontology_prefix: str,
skip_predicate_objects: Iterable[Reference] | None = None,
skip_predicate_literals: Iterable[Reference] | None = None,
typedefs: Mapping[ReferenceTuple, TypeDef],
) -> Iterable[str]:
for line in _iterate_obo_relations(
self.properties,
self._axioms,
ontology_prefix=ontology_prefix,
skip_predicate_objects=skip_predicate_objects,
skip_predicate_literals=skip_predicate_literals,
typedefs=typedefs,
):
yield f"property_value: {line}"
def _iterate_obo_relations(
self, *, ontology_prefix: str, typedefs: Mapping[ReferenceTuple, TypeDef]
) -> Iterable[str]:
for line in _iterate_obo_relations(
self.relationships,
self._axioms,
ontology_prefix=ontology_prefix,
typedefs=typedefs,
):
yield f"relationship: {line}"
def append_subset(self, subset: ReferenceHint) -> Self:
"""Add a subset."""
self.subsets.append(_ensure_ref(subset))
return self
def append_disjoint_from(self, reference: ReferenceHint) -> Self:
"""Add a disjoint from."""
self.disjoint_from.append(_ensure_ref(reference))
return self
def annotate_object(
self,
typedef: ReferenceHint,
value: ReferenceHint,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append an object annotation."""
typedef = _ensure_ref(typedef)
value = _ensure_ref(value)
self.properties[typedef].append(value)
self._extend_annotations(typedef, value, annotations)
return self
def append_contributor(self, reference: ReferenceHint) -> Self:
"""Append contributor."""
return self.annotate_object(v.has_contributor, reference)
def append_creation_date(self, date: datetime.datetime | str) -> Self:
"""Append contributor."""
return self.annotate_datetime(v.obo_creation_date, date)
def get_see_also(self) -> list[Reference]:
"""Get all see also objects."""
return self.get_property_objects(v.see_also)
def get_replaced_by(self) -> list[Reference]:
"""Get all replaced by."""
return self.get_property_objects(v.term_replaced_by)
def append_replaced_by(
self, reference: Reference, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Add a replaced by property."""
return self.annotate_object(v.term_replaced_by, reference, annotations=annotations)
def iterate_relations(self) -> Iterable[tuple[Reference, Reference]]:
"""Iterate over pairs of typedefs and targets."""
for typedef, targets in sorted(self.relationships.items()):
for target in sorted(targets):
yield typedef, target
def iterate_object_properties(self) -> Iterable[tuple[Reference, Reference]]:
"""Iterate over properties with references as their targets."""
for predicate, values in self.properties.items():
for value in values:
if isinstance(value, Reference):
yield predicate, value
def iterate_literal_properties(self) -> Iterable[tuple[Reference, OBOLiteral]]:
"""Iterate over properties with literals as their targets."""
for predicate, values in self.properties.items():
for value in values:
if isinstance(value, OBOLiteral):
yield predicate, value
def get_relationships(self, typedef: ReferenceHint) -> list[Reference]:
"""Get relationships from the given type."""
return self.relationships.get(_ensure_ref(typedef), [])
# docstr-coverage:excused `overload`
@overload
def get_relationship(
self, typedef: ReferenceHint, *, strict: Literal[False] = ...
) -> Reference | None: ...
# docstr-coverage:excused `overload`
@overload
def get_relationship(
self, typedef: ReferenceHint, *, strict: Literal[True] = ...
) -> Reference: ...
def get_relationship(self, typedef: ReferenceHint, *, strict: bool = False) -> Reference | None:
"""Get a single relationship of the given type."""
r = self.get_relationships(typedef)
if not r:
if strict:
raise ValueError
return None
if len(r) > 1:
raise ValueError(f"multiple relationships returned: {r}")
return r[0]
def iterate_relation_targets(self, typedef: ReferenceHint) -> list[Reference]:
"""Iterate over pairs of typedefs and targets."""
return sorted(self.relationships.get(_ensure_ref(typedef), []))
def get_property_annotations(self) -> list[Annotation]:
"""Iterate over pairs of property and values."""
return [
Annotation(prop, value)
for prop, values in sorted(self.properties.items())
for value in sorted(values, key=_reference_or_literal_key)
]
def get_property_values(self, typedef: ReferenceHint) -> list[Reference | OBOLiteral]:
"""Iterate over references or values."""
return sorted(self.properties.get(_ensure_ref(typedef), []))
def get_property_objects(self, prop: ReferenceHint) -> list[Reference]:
"""Get properties from the given key."""
return sorted(
reference
for reference in self.properties.get(_ensure_ref(prop), [])
if isinstance(reference, curies.Reference)
)
def append_exact_synonym(
self,
synonym: str | Synonym,
*,
type: Reference | Referenced | None = None,
provenance: Sequence[Reference | OBOLiteral] | None = None,
annotations: Iterable[Annotation] | None = None,
language: str | None = None,
) -> Self:
"""Add an exact synonym."""
return self.append_synonym(
synonym,
type=type,
specificity="EXACT",
provenance=provenance,
annotations=annotations,
language=language,
)
def append_synonym(
self,
synonym: str | Synonym,
*,
type: Reference | Referenced | None = None,
specificity: SynonymScope | None = None,
provenance: Sequence[Reference | OBOLiteral] | None = None,
annotations: Iterable[Annotation] | None = None,
language: str | None = None,
) -> Self:
"""Add a synonym."""
if isinstance(type, Referenced):
type = type.reference
if isinstance(synonym, str):
from pyobo.struct.struct import Synonym
synonym = Synonym(
synonym,
type=type,
specificity=specificity,
provenance=list(provenance or []),
annotations=list(annotations or []),
language=language,
)
self.synonyms.append(synonym)
return self
def append_alt(
self, alt: Reference, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Add an alternative identifier."""
return self.annotate_object(v.alternative_term, alt, annotations=annotations)
def append_see_also(
self, reference: ReferenceHint, *, annotations: Iterable[Annotation] | None = None
) -> Self:
"""Add a see also property."""
_reference = _ensure_ref(reference)
return self.annotate_object(v.see_also, _reference, annotations=annotations)
def append_comment(
self,
value: str,
*,
annotations: Iterable[Annotation] | None = None,
language: str | None = None,
) -> Self:
"""Add a comment property."""
return self.annotate_string(v.comment, value, annotations=annotations, language=language)
def get_comments(self) -> list[str]:
"""Get all comment strings."""
return [x.value for x in self.get_property_values(v.comment) if isinstance(x, OBOLiteral)]
@property
def alt_ids(self) -> Sequence[Reference]:
"""Get alternative terms."""
return tuple(self.get_property_objects(v.alternative_term))
def get_edges(self, *, include_xrefs: bool = True) -> list[tuple[Reference, Reference]]:
"""Get edges."""
return list(self._iter_edges(include_xrefs=include_xrefs))
def _iter_parents(self) -> Iterable[tuple[Reference, Reference]]:
parent_prop = stanza_type_to_prop[self.type]
for parent in itt.chain(self.parents, self.union_of):
yield parent_prop, parent
def _iter_intersections(self) -> Iterable[tuple[Reference, Reference]]:
parent_prop = stanza_type_to_prop[self.type]
for intersection_of in self.intersection_of:
match intersection_of:
case Reference():
yield parent_prop, intersection_of
case (predicate, target):
yield predicate, target
def _iter_edges(self, *, include_xrefs: bool = True) -> Iterable[tuple[Reference, Reference]]:
# The following are "object" properties, meaning
# they're part of the definition of the object
yield from self.iterate_relations()
yield from self._iter_parents()
yield from self._iter_intersections()
for equivalent_to in self.equivalent_to:
yield stanza_type_to_eq_prop[self.type], equivalent_to
# The following are "annotation" properties
for subset in self.subsets:
yield v.in_subset, subset
yield from self.iterate_object_properties()
if include_xrefs:
for xref_reference in self.xrefs:
yield v.has_dbxref, xref_reference
# TODO disjoint_from
# docstr-coverage:excused `overload`
@overload
def get_mappings(
self, *, include_xrefs: bool = ..., add_context: Literal[False] = ...
) -> list[tuple[Reference, Reference]]: ...
# docstr-coverage:excused `overload`
@overload
def get_mappings(
self, *, include_xrefs: bool = ..., add_context: Literal[True] = ...
) -> list[tuple[Reference, Reference, MappingContext]]: ...
def get_mappings(
self, *, include_xrefs: bool = True, add_context: bool = False
) -> list[tuple[Reference, Reference]] | list[tuple[Reference, Reference, MappingContext]]:
"""Get mappings with preferred curies."""
rows = []
for predicate in v.extended_match_typedefs:
for xref_reference in itt.chain(
self.get_property_objects(predicate), self.get_relationships(predicate)
):
rows.append((predicate, xref_reference))
if include_xrefs:
for xref_reference in self.xrefs:
rows.append((v.has_dbxref, xref_reference))
for equivalent_to in self.equivalent_to:
rows.append((v.equivalent_class, equivalent_to))
rv = sorted(set(rows))
if not add_context:
return rv
return [(k, v, self._get_mapping_context(k, v)) for k, v in rv]
def _get_object_annotation_target(
self, p: Reference, o: Reference | OBOLiteral, ap: Reference
) -> Reference | None:
match self._get_annotation(p, o, ap):
case OBOLiteral():
raise TypeError
case Reference() as target:
return target
case None:
return None
case _:
raise TypeError
def _get_str_annotation_target(
self, p: Reference, o: Reference | OBOLiteral, ap: Reference
) -> str | None:
match self._get_annotation(p, o, ap):
case OBOLiteral(value, _):
return value
case Reference():
raise TypeError
case None:
return None
case _:
raise TypeError
def _get_mapping_context(self, p: Reference, o: Reference) -> MappingContext:
return MappingContext(
justification=self._get_object_annotation_target(p, o, v.mapping_has_justification)
or unspecified_matching,
contributor=self._get_object_annotation_target(p, o, v.has_contributor),
confidence=self._get_str_annotation_target(p, o, v.mapping_has_confidence),
)
def _definition_fp(self) -> str:
definition = obo_escape_slim(self.definition) if self.definition else ""
dp = self._get_definition_provenance()
if dp:
return f'"{definition}" [{comma_separate_references(dp)}]'
else:
return f'"{definition}"'
def _get_definition_provenance(self) -> Sequence[Reference | OBOLiteral]:
if self.definition is None:
return []
return [
annotation.value
for annotation in self._get_annotations(v.has_description, self.definition)
if annotation.predicate.pair == v.has_dbxref.pair
]
@property
def provenance(self) -> Sequence[Reference | OBOLiteral]:
"""Get definition provenance."""
# return as a tuple to make sure nobody is appending on it
return (
*self.get_property_objects(v.is_mentioned_by),
# This gets all of the xrefs on _any_ axiom,
# which includes the definition provenance
*(
annotation.value
for annotation in itt.chain.from_iterable(self._axioms.values())
if annotation.predicate.pair == v.has_dbxref.pair
),
)
def append_definition_xref(self, reference: ReferenceHint) -> Self:
"""Add a reference to this term's definition."""
if not self.definition:
raise ValueError("can not append definition provenance if no definition is set")
self._append_annotation(
v.has_description,
OBOLiteral.string(self.definition),
Annotation(v.has_dbxref, _ensure_ref(reference)),
)
return self
def append_provenance(
self,
reference: Reference,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append a creative work that mentions this term."""
warnings.warn("use append_mentioned_by instead", DeprecationWarning, stacklevel=2)
return self.append_mentioned_by(reference, annotations=annotations)
def append_mentioned_by(
self,
reference: Reference,
*,
annotations: Iterable[Annotation] | None = None,
) -> Self:
"""Append a creative work that mentions this term."""
return self.annotate_object(v.is_mentioned_by, reference, annotations=annotations)
ReferenceHint: TypeAlias = (
Reference | Referenced | curies.Reference | curies.NamedReference | tuple[str, str] | str
)
def _ensure_ref(
reference: ReferenceHint,
*,
ontology_prefix: str | None = None,
) -> Reference:
if isinstance(reference, Referenced):
return reference.reference
if isinstance(reference, tuple):
return Reference(prefix=reference[0], identifier=reference[1])
if isinstance(reference, Reference):
return reference
if isinstance(reference, curies.NamedReference):
return Reference(
prefix=reference.prefix, identifier=reference.identifier, name=reference.name
)
if isinstance(reference, curies.Reference):
return Reference(prefix=reference.prefix, identifier=reference.identifier)
match _parse_str_or_curie_or_uri_helper(reference, ontology_prefix=ontology_prefix):
case Reference() as parsed_reference:
return parsed_reference
case NotCURIEError() as exc:
if ontology_prefix and _is_valid_identifier(reference):
return default_reference(ontology_prefix, reference)
else:
raise exc
case ParseError() as exc:
raise exc
raise TypeError
def _chain_tag(
tag: str, chains: list[list[Reference]] | None, ontology_prefix: str
) -> Iterable[str]:
for chain in chains or []:
yield f"{tag}: {multi_reference_escape(chain, ontology_prefix=ontology_prefix, add_name_comment=True)}"
def _tag_property_targets(
tag: str, stanza: Stanza, prod: ReferenceHint, *, ontology_prefix: str
) -> Iterable[str]:
for x in stanza.get_property_values(_ensure_ref(prod)):
if isinstance(x, Reference):
yield f"{tag}: {reference_escape(x, ontology_prefix=ontology_prefix, add_name_comment=True)}"
def _iterate_obo_relations(
relations: Mapping[Reference, Sequence[Reference | OBOLiteral]],
annotations: AnnotationsDict,
*,
ontology_prefix: str,
skip_predicate_objects: Iterable[Reference] | None = None,
skip_predicate_literals: Iterable[Reference] | None = None,
typedefs: Mapping[ReferenceTuple, TypeDef],
) -> Iterable[str]:
"""Iterate over relations/property values for OBO."""
skip_predicate_objects = set(skip_predicate_objects or [])
skip_predicate_literals = set(skip_predicate_literals or [])
for predicate, values in sorted(relations.items()):
_typedef_warn(prefix=ontology_prefix, predicate=predicate, typedefs=typedefs)
pc = reference_escape(predicate, ontology_prefix=ontology_prefix)
start = f"{pc} "
for value in sorted(values, key=_reference_or_literal_key):
match value:
case OBOLiteral(dd, datatype, _language):
if predicate in skip_predicate_literals:
continue
end = f'"{_escape_literal(dd)}" {get_preferred_curie(datatype)}'
name = None
case curies.Reference(): # it's a reference
if predicate in skip_predicate_objects:
# this allows us to special case out iterating over
# ones that are configured with their own tags
continue
end = reference_escape(value, ontology_prefix=ontology_prefix)
name = value.name
case _:
raise TypeError(f"got unexpected type {type(values)} with value: {values}")
end += _get_obo_trailing_modifiers(
predicate, value, annotations, ontology_prefix=ontology_prefix
)
if predicate.name and name:
end += f" ! {predicate.name} {name}"
yield start + end
def _escape_literal(s: str) -> str:
return s.replace('"', '\\"')
def _reference_or_literal_key(x: Reference | OBOLiteral) -> tuple[int, Reference | OBOLiteral]:
if isinstance(x, Reference):
return 0, x
else:
return 1, x
def _get_obo_trailing_modifiers(
p: ReferenceHint,
o: Reference | OBOLiteral,
annotations_dict: AnnotationsDict,
*,
ontology_prefix: str,
) -> str:
"""Lookup then format a sequence of annotations for OBO trailing modifiers."""
if annotations := annotations_dict.get(_property_resolve(p, o), []):
return _format_obo_trailing_modifiers(annotations, ontology_prefix=ontology_prefix)
return ""
def _format_obo_trailing_modifiers(
annotations: Sequence[Annotation], *, ontology_prefix: str
) -> str:
"""Format a sequence of annotations for OBO trailing modifiers.
:param annotations: A list of annnotations
:param ontology_prefix: The ontology prefix
:returns: The trailing modifiers string
See https://owlcollab.github.io/oboformat/doc/GO.format.obo-1_4.html#S.1.4 trailing
modifiers can be both annotations and some other implementation-specific things, so
split up the place where annotations are put in here.
"""
modifiers: list[tuple[str, str]] = []
for prop in sorted(annotations, key=Annotation._sort_key):
left = reference_escape(prop.predicate, ontology_prefix=ontology_prefix)
match prop.value:
case Reference():
right = reference_escape(prop.value, ontology_prefix=ontology_prefix)
case OBOLiteral(value, datatype, _language):
if datatype == v.xsd_string:
right = f'"{obo_escape_slim(value)}"'
else:
right = value
case _:
raise TypeError(f"invalid prop value: {type(prop.value)} - {prop.value}")
modifiers.append((left, right))
inner = ", ".join(f"{key}={value}" for key, value in modifiers)
return " {" + inner + "}"
#: A set of warnings, used to make sure we don't show the same one over and over
_TYPEDEF_WARNINGS: set[tuple[str, Reference]] = set()
def _typedef_warn(
prefix: str, predicate: Reference, typedefs: Mapping[ReferenceTuple, TypeDef]
) -> None:
from pyobo.struct.typedef import default_typedefs
if predicate.pair in default_typedefs or predicate.pair in typedefs:
return None
key = prefix, predicate
if key not in _TYPEDEF_WARNINGS:
_TYPEDEF_WARNINGS.add(key)
if predicate.prefix == "obo":
# Throw our hands up in the air. By using `obo` as the prefix,
# we already threw using "real" definitions out the window
logger.warning(
f"[{prefix}] predicate with OBO prefix not defined: {predicate.curie}."
f"\n\tThis might be because you used an unqualified prefix in an OBO file, "
f"which automatically gets an OBO prefix."
)
else:
logger.warning(f"[{prefix}] typedef not defined: {predicate.curie}")
class MappingContext(BaseModel):
"""Context for a mapping, corresponding to SSSOM."""
justification: Reference = unspecified_matching
contributor: Reference | None = None
confidence: float | None = None
model_config = ConfigDict(
frozen=True, # Makes the model immutable and hashable
)
def _get_prefixes_from_annotations(annotations: Iterable[Annotation]) -> set[str]:
return set(_get_references_from_annotations(annotations))
def _get_references_from_annotations(
annotations: Iterable[Annotation],
) -> dict[str, set[Reference]]:
rv: defaultdict[str, set[Reference]] = defaultdict(set)
for left, right in annotations:
rv[left.prefix].add(left)
if isinstance(right, Reference):
rv[right.prefix].add(right)
return dict(rv)
def _get_stanza_name_synonym(stanza: Stanza) -> LiteralMapping:
return LiteralMapping(
text=stanza.reference.name,
reference=stanza.reference,
predicate=_v.has_label,
type=None,
provenance=[p for p in stanza.provenance if isinstance(p, curies.Reference)],
contributor=None, # TODO
comment=None, # TODO
source=stanza.reference.prefix,
date=None, # TODO
)
def _convert_synoynym(stanza: Stanza, synonym: Synonym) -> LiteralMapping:
o = OBOLiteral.string(synonym.name, language=synonym.language)
# TODO make this indexing reusable? similar code used for SSSOM export
idx: dict[Reference, Reference | OBOLiteral] = {
annotation.predicate: annotation.value
for annotation in stanza._get_annotations(synonym.predicate, o)
}
comment = _safe_str(idx.get(v.comment))
contributor = _safe_str(idx.get(v.has_contributor))
date = _safe_str(idx.get(v.has_date))
return LiteralMapping(
text=synonym.name,
language=synonym.language,
reference=stanza.reference,
predicate=synonym.predicate,
type=synonym.type,
provenance=[p for p in synonym.provenance if isinstance(p, curies.Reference)],
contributor=contributor,
comment=comment,
source=stanza.reference.prefix,
date=date,
)
def _safe_str(x: Reference | OBOLiteral | None) -> str | None:
if x is None:
return None
return reference_or_literal_to_str(x)