Source code for pyobo.api.hierarchy

"""High-level API for hierarchies."""

import logging
import warnings
from collections.abc import Iterable
from functools import lru_cache
from typing import overload

import networkx as nx
from curies import ReferenceTuple
from typing_extensions import NotRequired, Unpack

from .edges import get_edges
from .names import get_name, get_references
from .properties import get_literal_properties
from .utils import _get_pi
from ..constants import GetOntologyKwargs
from ..struct import has_member, has_part, is_a, member_of, part_of
from ..struct.reference import Reference
from ..struct.struct_utils import ReferenceHint, _ensure_ref

__all__ = [
    "get_ancestors",
    "get_children",
    "get_descendants",
    "get_hierarchy",
    "get_subhierarchy",
    "has_ancestor",
    "is_descendent",
]

logger = logging.getLogger(__name__)


class HierarchyKwargs(GetOntologyKwargs):
    """Keyword argument hints for hierarchy getter functions."""

    include_part_of: NotRequired[bool]
    include_has_member: NotRequired[bool]


[docs] def get_hierarchy( prefix: str, *, extra_relations: Iterable[ReferenceHint] | None = None, properties: Iterable[ReferenceHint] | None = None, **kwargs: Unpack[HierarchyKwargs], ) -> nx.DiGraph: """Get hierarchy of parents as a directed graph. :param prefix: The name of the namespace. :param include_part_of: Add "part of" relations. Only works if the relations are properly defined using bfo:0000050 ! part of or bfo:0000051 ! has part :param include_has_member: Add "has member" relations. These aren't part of the BFO, but are hacked into PyOBO using :data:`pyobo.struct.typedef.has_member` for relationships like from protein families to their actual proteins. :param extra_relations: Other relations that you want to include in the hierarchy. For example, it might be useful to include the positively_regulates :param properties: Properties to include in the data part of each node. For example, might want to include SMILES strings with the ChEBI tree. :param force: should the resources be reloaded when extracting relations? :returns: A directional graph representing the hierarchy This function thinly wraps :func:`_get_hierarchy_helper` to make it easier to work with the lru_cache mechanism. """ return _get_hierarchy_helper( prefix=prefix, extra_relations=_tp(prefix, extra_relations), properties=_tp(prefix, properties), **kwargs, )
def _tp(prefix: str, references: Iterable[ReferenceHint] | None) -> tuple[Reference, ...]: return tuple( sorted(_ensure_ref(reference, ontology_prefix=prefix) for reference in references or []) ) @lru_cache def _get_hierarchy_helper( prefix: str, *, extra_relations: tuple[Reference, ...], properties: tuple[Reference, ...], include_part_of: bool = False, include_has_member: bool = False, **kwargs: Unpack[GetOntologyKwargs], ) -> nx.DiGraph: predicates, reverse_predicates = _get_predicate_sets( extra_relations, include_part_of, include_has_member ) rv = nx.DiGraph() rv.add_nodes_from(get_references(prefix, **kwargs)) for s, p, o in get_edges(prefix, **kwargs): if p in predicates: rv.add_edge(s, o, relation=p) elif p in reverse_predicates: rv.add_edge(o, s, relation=p) properties_ = set(properties) for s, p, op in get_literal_properties(prefix, **kwargs): if s in rv and p in properties_: rv.nodes[s][p] = op.value return rv def _get_predicate_sets( extra_relations: Iterable[Reference], include_part_of: bool, include_has_member: bool ) -> tuple[set[Reference], set[Reference]]: predicates: set[Reference] = {is_a.reference, *extra_relations} reverse_predicates: set[Reference] = set() if include_part_of: predicates.add(part_of.reference) reverse_predicates.add(has_part.reference) if include_has_member: predicates.add(has_member.reference) reverse_predicates.add(member_of.reference) return predicates, reverse_predicates # docstr-coverage:excused `overload` @overload def is_descendent( prefix: str, identifier: str, ancestor_prefix: str = ..., ancestor_identifier: str = ..., /, **kwargs: Unpack[HierarchyKwargs], ) -> bool: ... # docstr-coverage:excused `overload` @overload def is_descendent( descendant: Reference, ancestor: Reference, _1: None = ..., _2: None = ..., /, **kwargs: Unpack[HierarchyKwargs], ) -> bool: ...
[docs] def is_descendent( prefix: str | Reference, identifier: str | Reference, ancestor_prefix: str | None = None, ancestor_identifier: str | None = None, /, **kwargs: Unpack[HierarchyKwargs], ) -> bool: """Check that the first identifier has the second as a descendent. :param prefix: The prefix for the descendant :param identifier: The local unique identifier for the descendant :param ancestor_prefix: The prefix for the ancestor :param ancestor_identifier: The local unique identifier for the ancestor :param kwargs: Keyword arguments for :func:`get_hierarchy` :return: If the decendant has the given ancestor Check that ``GO:0070246`` (natural killer cell apoptotic process) is a descendant of ``GO:0006915`` (apoptotic process) >>> nk_apoptosis = Reference.from_curie( ... "GO:0070246", name="natural killer cell apoptotic process" ... ) >>> apoptosis = Reference.from_curie("GO:0006915", name="apoptotic process") >>> assert is_descendent(nk_apoptosis, apoptosis) Using deprecated old-style arguments: >>> assert is_descendent("go", "0070246", "go", "0006915") """ descendant, ancestor = _get_double_reference( prefix, identifier, ancestor_prefix, ancestor_identifier ) descendants = get_descendants(ancestor, **kwargs) return descendants is not None and descendant in descendants
[docs] @lru_cache def get_descendants( prefix: str | Reference | ReferenceTuple, identifier: str | None = None, /, **kwargs: Unpack[HierarchyKwargs], ) -> set[Reference] | None: """Get all the descendants (children) of the term as CURIEs.""" t = _get_pi(prefix, identifier) hierarchy = get_hierarchy(prefix=t.prefix, **kwargs) if t not in hierarchy: return None return nx.ancestors(hierarchy, t) # note this is backwards
[docs] @lru_cache def get_children( prefix: str | Reference | ReferenceTuple, identifier: str | None = None, /, **kwargs: Unpack[HierarchyKwargs], ) -> set[Reference] | None: """Get all the descendants (children) of the term as CURIEs.""" t = _get_pi(prefix, identifier) hierarchy = get_hierarchy(prefix=t.prefix, **kwargs) if t not in hierarchy: return None return set(hierarchy.predecessors(t))
# docstr-coverage:excused `overload` @overload def has_ancestor( prefix: str, identifier: str, ancestor_prefix: str = ..., ancestor_identifier: str = ..., /, **kwargs: Unpack[HierarchyKwargs], ) -> bool: ... # docstr-coverage:excused `overload` @overload def has_ancestor( descendant: Reference, ancestor: Reference, _1: None = ..., _2: None = ..., /, **kwargs: Unpack[HierarchyKwargs], ) -> bool: ...
[docs] def has_ancestor( prefix: str | Reference, identifier: str | Reference, ancestor_prefix: str | None = None, ancestor_identifier: str | None = None, /, **kwargs: Unpack[HierarchyKwargs], ) -> bool: """Check that the first identifier has the second as an ancestor. :param prefix: The prefix for the descendant :param identifier: The local unique identifier for the descendant :param ancestor_prefix: The prefix for the ancestor :param ancestor_identifier: The local unique identifier for the ancestor :param kwargs: Keyword arguments for :func:`get_hierarchy` :return: If the decendant has the given ancestor Check that ``GO:0008219`` (cell death) is an ancestor of ``GO:0006915`` (apoptotic process): >>> apoptosis = Reference.from_curie("GO:0006915", name="apoptotic process") >>> cell_death = Reference.from_curie("GO:0008219", name="cell death") >>> assert has_ancestor(apoptosis, cell_death) The same, using the deprecated argumentation style: >>> assert has_ancestor("go", "0006915", "go", "0008219") """ descendant, ancestor = _get_double_reference( prefix, identifier, ancestor_prefix, ancestor_identifier ) ancestors = get_ancestors(descendant, **kwargs) return ancestors is not None and ancestor in ancestors
def _get_double_reference( a: str | Reference, b: str | Reference, c: str | None, d: str | None ) -> tuple[Reference, Reference]: if isinstance(a, Reference) and isinstance(b, Reference): return a, b elif all(isinstance(x, str) for x in (a, b, c, d)): warnings.warn("passing strings is deprecated", DeprecationWarning, stacklevel=2) return Reference(prefix=a, identifier=b), Reference(prefix=c, identifier=d) else: raise TypeError
[docs] @lru_cache def get_ancestors( prefix: str | Reference | ReferenceTuple, identifier: str | None = None, /, **kwargs: Unpack[HierarchyKwargs], ) -> set[Reference] | None: """Get all the ancestors (parents) of the term as CURIEs.""" t = _get_pi(prefix, identifier) hierarchy = get_hierarchy(prefix=t.prefix, **kwargs) if t not in hierarchy: return None return nx.descendants(hierarchy, t) # note this is backwards
[docs] def get_subhierarchy( prefix: str | Reference | ReferenceTuple, identifier: str | None = None, /, **kwargs: Unpack[HierarchyKwargs], ) -> nx.DiGraph: """Get the subhierarchy for a given node.""" t = _get_pi(prefix, identifier) hierarchy = get_hierarchy(prefix=t.prefix, **kwargs) logger.info("getting descendants of %s ! %s", t.curie, get_name(t)) descendants = set(nx.ancestors(hierarchy, t)) | {t} # note this is backwards logger.info("inducing subgraph") sg = hierarchy.subgraph(descendants).copy() logger.info("subgraph has %d nodes/%d edges", sg.number_of_nodes(), sg.number_of_edges()) return sg