Source code for pyobo.xrefdb.canonicalizer

# -*- coding: utf-8 -*-

"""Tools for canonicalizing a CURIE based on a priority list."""

from dataclasses import dataclass, field
from functools import lru_cache
from typing import Iterable, List, Mapping, Optional, Set, Tuple

import networkx as nx
import pandas as pd
from more_itertools import pairwise
from tqdm.auto import tqdm

from .priority import DEFAULT_PRIORITY_LIST
from .xrefs_pipeline import get_graph_from_xref_df
from .. import resource_utils
from ..utils.io import get_reader, get_writer

__all__ = [
    "Canonicalizer",
    "all_shortest_paths",
    "single_source_shortest_path",
    "get_equivalent",
    "get_priority_curie",
    "remap_file_stream",
]


[docs]@dataclass class Canonicalizer: """Wraps a graph and priority list to allow getting the best identifier.""" #: A graph from :func:`get_graph_from_xref_df` graph: nx.Graph #: A list of prefixes. The ones with the lower index are higher priority priority: Optional[List[str]] = None #: Longest length paths allowed cutoff: int = 5 _priority: Mapping[str, int] = field(init=False) def __post_init__(self): """Initialize the priority map based on the priority list.""" if self.priority is None: self.priority = DEFAULT_PRIORITY_LIST self._priority = {entry: len(self.priority) - i for i, entry in enumerate(self.priority)} def _key(self, curie: str) -> Optional[int]: prefix = self.graph.nodes[curie]["prefix"] return self._priority.get(prefix) def _get_priority_dict(self, curie: str) -> Mapping[str, int]: return dict(self._iterate_priority_targets(curie)) def _iterate_priority_targets(self, curie: str) -> Iterable[Tuple[str, int]]: for target in nx.single_source_shortest_path(self.graph, curie, cutoff=self.cutoff): priority = self._key(target) if priority is not None: yield target, priority elif target == curie: yield target, 0 else: yield target, -1
[docs] def canonicalize(self, curie: str) -> str: """Get the best CURIE from the given CURIE.""" if curie not in self.graph: return curie priority_dict = self._get_priority_dict(curie) return max(priority_dict, key=priority_dict.get) # type:ignore
[docs] @classmethod def get_default(cls, priority: Optional[Iterable[str]] = None) -> "Canonicalizer": """Get the default canonicalizer.""" if priority is not None: priority = tuple(priority) return cls._get_default_helper(priority=priority)
@classmethod @lru_cache() def _get_default_helper(cls, priority: Optional[Tuple[str, ...]] = None) -> "Canonicalizer": """Help get the default canonicalizer.""" graph = cls._get_default_graph() return cls(graph=graph, priority=list(priority) if priority else None) @staticmethod @lru_cache() def _get_default_graph() -> nx.Graph: df = resource_utils.ensure_inspector_javert_df() graph = get_graph_from_xref_df(df) return graph
[docs] def iterate_flat_mapping(self, use_tqdm: bool = True) -> Iterable[Tuple[str, str]]: """Iterate over the canonical mapping from all nodes to their canonical CURIEs.""" nodes = self.graph.nodes() if use_tqdm: nodes = tqdm( nodes, total=self.graph.number_of_nodes(), desc="building flat mapping", unit_scale=True, unit="CURIE", ) for node in nodes: yield node, self.canonicalize(node)
[docs] def get_flat_mapping(self, use_tqdm: bool = True) -> Mapping[str, str]: """Get a canonical mapping from all nodes to their canonical CURIEs.""" return dict(self.iterate_flat_mapping(use_tqdm=use_tqdm))
[docs] def single_source_shortest_path( self, curie: str, cutoff: Optional[int] = None, ) -> Optional[Mapping[str, List[Mapping[str, str]]]]: """Get all shortest paths between given entity and its equivalent entities.""" return single_source_shortest_path(graph=self.graph, curie=curie, cutoff=cutoff)
[docs] def all_shortest_paths( self, source_curie: str, target_curie: str ) -> List[List[Mapping[str, str]]]: """Get all shortest paths between the two entities.""" return all_shortest_paths( graph=self.graph, source_curie=source_curie, target_curie=target_curie )
[docs] @classmethod def from_df(cls, df: pd.DataFrame) -> "Canonicalizer": """Instantiate from a dataframe.""" return cls(graph=get_graph_from_xref_df(df))
def all_shortest_paths( graph: nx.Graph, source_curie: str, target_curie: str ) -> List[List[Mapping[str, str]]]: """Get all shortest paths between the two CURIEs.""" _paths = nx.all_shortest_paths(graph, source=source_curie, target=target_curie) return [ [dict(source=s, target=t, provenance=graph[s][t]["source"]) for s, t in pairwise(_path)] for _path in _paths ] def single_source_shortest_path( graph: nx.Graph, curie: str, cutoff: Optional[int] = None, ) -> Optional[Mapping[str, List[Mapping[str, str]]]]: """Get the shortest path from the CURIE to all elements of its equivalence class. Things that didn't work: Unresponsive ------------ .. code-block:: python for curies in tqdm(nx.connected_components(graph), desc='filling connected components', unit_scale=True): for c1, c2 in itt.combinations(curies, r=2): if not graph.has_edge(c1, c2): graph.add_edge(c1, c2, inferred=True) Way too slow ------------ .. code-block:: python for curie in tqdm(graph, total=graph.number_of_nodes(), desc='mapping connected components', unit_scale=True): for incident_curie in nx.node_connected_component(graph, curie): if not graph.has_edge(curie, incident_curie): graph.add_edge(curie, incident_curie, inferred=True) Also consider the condensation of the graph: https://networkx.github.io/documentation/stable/reference/algorithms/generated/networkx.algorithms.components.condensation.html#networkx.algorithms.components.condensation """ if curie not in graph: return None rv = nx.single_source_shortest_path(graph, curie, cutoff=cutoff) return { k: [dict(source=s, target=t, provenance=graph[s][t]["provenance"]) for s, t in pairwise(v)] for k, v in rv.items() if k != curie # don't map to self }
[docs]def get_equivalent(curie: str, cutoff: Optional[int] = None) -> Set[str]: """Get equivalent CURIEs.""" canonicalizer = Canonicalizer.get_default() r = canonicalizer.single_source_shortest_path(curie=curie, cutoff=cutoff) return set(r or [])
[docs]def get_priority_curie(curie: str) -> str: """Get the priority CURIE mapped to the best namespace.""" canonicalizer = Canonicalizer.get_default() return canonicalizer.canonicalize(curie)
def remap_file_stream(file_in, file_out, column: int, sep="\t") -> None: """Remap a file.""" reader = get_reader(file_in, sep=sep) writer = get_writer(file_out, sep=sep) for row in reader: row[column] = get_priority_curie(row[column]) writer.writerow(row)