"""High-level API for hierarchies."""
from __future__ import annotations
import logging
import warnings
from collections.abc import Iterable
from functools import lru_cache
from typing import Literal, NotRequired, cast
import networkx as nx
from curies import ReferenceTuple
from typing_extensions import Unpack
from .edges import get_edges
from .names import get_name, get_references
from .properties import get_literal_properties
from .utils import _get_pi
from ..constants import GetOntologyKwargs
from ..struct import has_member, has_part, is_a, member_of, part_of
from ..struct.reference import Reference
from ..struct.struct_utils import ReferenceHint, _ensure_ref
__all__ = [
"get_ancestors",
"get_children",
"get_descendants",
"get_hierarchy",
"get_subhierarchy",
"has_ancestor",
"is_descendent",
]
logger = logging.getLogger(__name__)
class HierarchyKwargs(GetOntologyKwargs):
"""Keyword argument hints for hierarchy getter functions."""
include_part_of: NotRequired[bool]
include_has_member: NotRequired[bool]
[docs]
def get_hierarchy(
prefix: str,
*,
extra_relations: Iterable[ReferenceHint] | None = None,
properties: Iterable[ReferenceHint] | None = None,
**kwargs: Unpack[HierarchyKwargs],
) -> nx.DiGraph[Reference]:
"""Get hierarchy of parents as a directed graph.
:param prefix: The name of the namespace.
:param include_part_of: Add "part of" relations. Only works if the relations are
properly defined using bfo:0000050 ! part of or bfo:0000051 ! has part
:param include_has_member: Add "has member" relations. These aren't part of the BFO,
but are hacked into PyOBO using :data:`pyobo.struct.typedef.has_member` for
relationships like from protein families to their actual proteins.
:param extra_relations: Other relations that you want to include in the hierarchy.
For example, it might be useful to include the positively_regulates
:param properties: Properties to include in the data part of each node. For example,
might want to include SMILES strings with the ChEBI tree.
:param force: should the resources be reloaded when extracting relations?
:returns: A directional graph representing the hierarchy
This function thinly wraps :func:`_get_hierarchy_helper` to make it easier to work
with the lru_cache mechanism.
"""
return _get_hierarchy_helper(
prefix=prefix,
extra_relations=_tp(prefix, extra_relations),
properties=_tp(prefix, properties),
**kwargs,
)
def _tp(prefix: str, references: Iterable[ReferenceHint] | None) -> tuple[Reference, ...]:
return tuple(
sorted(_ensure_ref(reference, ontology_prefix=prefix) for reference in references or [])
)
@lru_cache
def _get_hierarchy_helper(
prefix: str,
*,
extra_relations: tuple[Reference, ...],
properties: tuple[Reference, ...],
include_part_of: bool = False,
include_has_member: bool = False,
**kwargs: Unpack[GetOntologyKwargs],
) -> nx.DiGraph:
predicates, reverse_predicates = _get_predicate_sets(
extra_relations, include_part_of, include_has_member
)
rv = nx.DiGraph()
rv.add_nodes_from(get_references(prefix, **kwargs))
for s, p, o in get_edges(prefix, **kwargs):
if p in predicates:
rv.add_edge(s, o, relation=p)
elif p in reverse_predicates:
rv.add_edge(o, s, relation=p)
properties_ = set(properties)
for s, p, op in get_literal_properties(prefix, **kwargs):
if s in rv and p in properties_:
rv.nodes[s][p] = op.value
return rv
def _get_predicate_sets(
extra_relations: Iterable[Reference], include_part_of: bool, include_has_member: bool
) -> tuple[set[Reference], set[Reference]]:
predicates: set[Reference] = {is_a.reference, *extra_relations}
reverse_predicates: set[Reference] = set()
if include_part_of:
predicates.add(part_of.reference)
reverse_predicates.add(has_part.reference)
if include_has_member:
predicates.add(has_member.reference)
reverse_predicates.add(member_of.reference)
return predicates, reverse_predicates
[docs]
def is_descendent(
reference: str | Reference,
ancestor: str | Reference,
/,
**kwargs: Unpack[HierarchyKwargs],
) -> bool:
"""Check if the refeference is a descendant of the ancestor."""
warnings.warn("use has_ancestor() instead", DeprecationWarning, stacklevel=2)
return has_ancestor(reference, ancestor, direction="down", **kwargs)
[docs]
@lru_cache
def get_descendants(
prefix: str | Reference | ReferenceTuple,
identifier: str | None = None,
/,
**kwargs: Unpack[HierarchyKwargs],
) -> set[Reference] | None:
"""Get all the descendants (children) of the term as CURIEs."""
t = _get_pi(prefix, identifier)
hierarchy = get_hierarchy(prefix=t.prefix, **kwargs)
if t not in hierarchy:
return None
return cast(set[Reference], nx.ancestors(hierarchy, t)) # note this is backwards
[docs]
@lru_cache
def get_children(
prefix: str | Reference | ReferenceTuple,
identifier: str | None = None,
/,
**kwargs: Unpack[HierarchyKwargs],
) -> set[Reference] | None:
"""Get all the descendants (children) of the term as CURIEs."""
t = _get_pi(prefix, identifier)
hierarchy = get_hierarchy(prefix=t.prefix, **kwargs)
if t not in hierarchy:
return None
return set(hierarchy.predecessors(t))
[docs]
def has_ancestor(
reference: str | Reference,
ancestor: str | Reference,
/,
direction: Literal["up", "down"] = "up",
**kwargs: Unpack[HierarchyKwargs],
) -> bool:
"""Check that the first identifier has the second as an ancestor.
:param reference: The reference
:param ancestor: The ancestor
:param kwargs: Keyword arguments for :func:`get_hierarchy`
:returns: If the decendant has the given ancestor
Check that ``GO:0008219`` (cell death) is an ancestor of ``GO:0006915`` (apoptotic
process):
>>> apoptosis = Reference.from_curie("GO:0006915", name="apoptotic process")
>>> cell_death = Reference.from_curie("GO:0008219", name="cell death")
>>> assert has_ancestor(apoptosis, cell_death)
Check that ``GO:0070246`` (natural killer cell apoptotic process) is a descendant of
``GO:0006915`` (apoptotic process)
>>> nk_apoptosis = Reference.from_curie(
... "GO:0070246", name="natural killer cell apoptotic process"
... )
>>> apoptosis = Reference.from_curie("GO:0006915", name="apoptotic process")
>>> assert has_ancestor(nk_apoptosis, apoptosis)
"""
reference, ancestor = _get_double_reference(reference, ancestor)
if direction == "up":
ancestors = get_ancestors(reference, **kwargs)
return ancestors is not None and ancestor in ancestors
else:
descendants = get_descendants(ancestor, **kwargs)
return descendants is not None and reference in descendants
def _get_double_reference(
a: str | Reference, b: str | Reference, c: str | None = None, d: str | None = None
) -> tuple[Reference, Reference]:
if c is not None or d is not None:
raise NotImplementedError("passing strings is no longer supported")
if isinstance(a, str):
a = Reference.from_curie(a)
if isinstance(b, str):
b = Reference.from_curie(b)
return a, b
[docs]
@lru_cache
def get_ancestors(
prefix: str | Reference | ReferenceTuple,
identifier: str | None = None,
/,
**kwargs: Unpack[HierarchyKwargs],
) -> set[Reference] | None:
"""Get all the ancestors (parents) of the term as CURIEs."""
t = _get_pi(prefix, identifier)
hierarchy = get_hierarchy(prefix=t.prefix, **kwargs)
if t not in hierarchy:
return None
return cast(set[Reference], nx.descendants(hierarchy, t)) # note this is backwards
[docs]
def get_subhierarchy(
prefix: str | Reference | ReferenceTuple,
identifier: str | None = None,
/,
**kwargs: Unpack[HierarchyKwargs],
) -> nx.DiGraph:
"""Get the subhierarchy for a given node."""
t = _get_pi(prefix, identifier)
hierarchy = get_hierarchy(prefix=t.prefix, **kwargs)
logger.info("getting descendants of %s ! %s", t.curie, get_name(t))
descendants = set(nx.ancestors(hierarchy, t)) | {t} # note this is backwards
logger.info("inducing subgraph")
sg = hierarchy.subgraph(descendants).copy()
logger.info("subgraph has %d nodes/%d edges", sg.number_of_nodes(), sg.number_of_edges())
return sg