Source code for pyobo.api.hierarchy

# -*- coding: utf-8 -*-

"""High-level API for hierarchies."""

import logging
from functools import lru_cache
from typing import Iterable, Optional, Set, Tuple

import networkx as nx

from .names import get_name
from .properties import get_filtered_properties_mapping
from .relations import get_filtered_relations_df
from ..identifier_utils import wrap_norm_prefix
from ..struct import TypeDef, has_member, is_a, part_of

__all__ = [
    "get_hierarchy",
    "get_subhierarchy",
    "get_descendants",
    "get_ancestors",
    "has_ancestor",
    "is_descendent",
]

from ..struct.reference import Reference

logger = logging.getLogger(__name__)


[docs]def get_hierarchy( prefix: str, *, include_part_of: bool = True, include_has_member: bool = False, extra_relations: Optional[Iterable[TypeDef]] = None, properties: Optional[Iterable[str]] = None, use_tqdm: bool = False, force: bool = False, version: Optional[str] = None, ) -> nx.DiGraph: """Get hierarchy of parents as a directed graph. :param prefix: The name of the namespace. :param include_part_of: Add "part of" relations. Only works if the relations are properly defined using bfo:0000050 ! part of or bfo:0000051 ! has part :param include_has_member: Add "has member" relations. These aren't part of the BFO, but are hacked into PyOBO using :data:`pyobo.struct.typedef.has_member` for relationships like from protein families to their actual proteins. :param extra_relations: Other relations that you want to include in the hierarchy. For example, it might be useful to include the positively_regulates :param properties: Properties to include in the data part of each node. For example, might want to include SMILES strings with the ChEBI tree. :param use_tqdm: Show a progress bar :param force: should the resources be reloaded when extracting relations? :returns: A directional graph representing the hierarchy This function thinly wraps :func:`_get_hierarchy_helper` to make it easier to work with the lru_cache mechanism. """ return _get_hierarchy_helper( prefix=prefix, include_part_of=include_part_of, include_has_member=include_has_member, extra_relations=tuple(sorted(extra_relations or [], key=lambda t: t.curie)), properties=tuple(sorted(properties or [])), use_tqdm=use_tqdm, force=force, version=version, )
@lru_cache() @wrap_norm_prefix def _get_hierarchy_helper( prefix: str, *, extra_relations: Tuple[TypeDef, ...], properties: Tuple[str, ...], include_part_of: bool, include_has_member: bool, use_tqdm: bool, force: bool = False, version: Optional[str] = None, ) -> nx.DiGraph: rv = nx.DiGraph() is_a_df = get_filtered_relations_df( prefix=prefix, relation=is_a, use_tqdm=use_tqdm, force=force, version=version, ) for source_id, target_ns, target_id in is_a_df.values: rv.add_edge(f"{prefix}:{source_id}", f"{target_ns}:{target_id}", relation="is_a") if include_has_member: has_member_df = get_filtered_relations_df( prefix=prefix, relation=has_member, use_tqdm=use_tqdm, force=force, version=version, ) for target_id, source_ns, source_id in has_member_df.values: rv.add_edge(f"{source_ns}:{source_id}", f"{prefix}:{target_id}", relation="is_a") if include_part_of: part_of_df = get_filtered_relations_df( prefix=prefix, relation=part_of, use_tqdm=use_tqdm, force=force, version=version, ) for source_id, target_ns, target_id in part_of_df.values: rv.add_edge(f"{prefix}:{source_id}", f"{target_ns}:{target_id}", relation="part_of") has_part_df = get_filtered_relations_df( prefix=prefix, relation=part_of, use_tqdm=use_tqdm, force=force, version=version, ) for target_id, source_ns, source_id in has_part_df.values: rv.add_edge(f"{source_ns}:{source_id}", f"{prefix}:{target_id}", relation="part_of") for relation in extra_relations: if not isinstance(relation, (TypeDef, Reference)): raise TypeError relation_df = get_filtered_relations_df( prefix=prefix, relation=relation, use_tqdm=use_tqdm, force=force, version=version, ) for source_id, target_ns, target_id in relation_df.values: rv.add_edge( f"{prefix}:{source_id}", f"{target_ns}:{target_id}", relation=relation.identifier ) for prop in properties: props = get_filtered_properties_mapping( prefix=prefix, prop=prop, use_tqdm=use_tqdm, force=force ) for identifier, value in props.items(): curie = f"{prefix}:{identifier}" if curie in rv: rv.nodes[curie][prop] = value return rv
[docs]def is_descendent(prefix, identifier, ancestor_prefix, ancestor_identifier) -> bool: """Check that the first identifier has the second as a descendent. Check that go:0070246 ! natural killer cell apoptotic process is a descendant of go:0006915 ! apoptotic process:: >>> assert is_descendent('go', '0070246', 'go', '0006915') """ descendants = get_descendants(ancestor_prefix, ancestor_identifier) return descendants is not None and f"{prefix}:{identifier}" in descendants
[docs]@lru_cache() def get_descendants( prefix: str, identifier: str, include_part_of: bool = True, include_has_member: bool = False, use_tqdm: bool = False, force: bool = False, **kwargs, ) -> Optional[Set[str]]: """Get all of the descendants (children) of the term as CURIEs.""" hierarchy = get_hierarchy( prefix=prefix, include_has_member=include_has_member, include_part_of=include_part_of, use_tqdm=use_tqdm, force=force, **kwargs, ) curie = f"{prefix}:{identifier}" if curie not in hierarchy: return None return nx.ancestors(hierarchy, curie) # note this is backwards
[docs]def has_ancestor(prefix, identifier, ancestor_prefix, ancestor_identifier) -> bool: """Check that the first identifier has the second as an ancestor. Check that go:0008219 ! cell death is an ancestor of go:0006915 ! apoptotic process:: >>> assert has_ancestor('go', '0006915', 'go', '0008219') """ ancestors = get_ancestors(prefix, identifier) return ancestors is not None and f"{ancestor_prefix}:{ancestor_identifier}" in ancestors
[docs]@lru_cache() def get_ancestors( prefix: str, identifier: str, include_part_of: bool = True, include_has_member: bool = False, use_tqdm: bool = False, force: bool = False, **kwargs, ) -> Optional[Set[str]]: """Get all of the ancestors (parents) of the term as CURIEs.""" hierarchy = get_hierarchy( prefix=prefix, include_has_member=include_has_member, include_part_of=include_part_of, use_tqdm=use_tqdm, force=force, **kwargs, ) curie = f"{prefix}:{identifier}" if curie not in hierarchy: return None return nx.descendants(hierarchy, curie) # note this is backwards
[docs]def get_subhierarchy( prefix: str, identifier: str, include_part_of: bool = True, include_has_member: bool = False, use_tqdm: bool = False, force: bool = False, **kwargs, ) -> nx.DiGraph: """Get the subhierarchy for a given node.""" hierarchy = get_hierarchy( prefix=prefix, include_has_member=include_has_member, include_part_of=include_part_of, use_tqdm=use_tqdm, force=force, **kwargs, ) logger.info( "getting descendants of %s:%s ! %s", prefix, identifier, get_name(prefix, identifier) ) curies = nx.ancestors(hierarchy, f"{prefix}:{identifier}") # note this is backwards logger.info("inducing subgraph") sg = hierarchy.subgraph(curies).copy() logger.info("subgraph has %d nodes/%d edges", sg.number_of_nodes(), sg.number_of_edges()) return sg