Source code for pyobo.reader

# -*- coding: utf-8 -*-

"""OBO Readers."""

import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable, List, Mapping, Optional, Tuple, Union

import bioregistry
import networkx as nx
from more_itertools import pairwise
from tqdm.auto import tqdm

from .constants import DATE_FORMAT, PROVENANCE_PREFIXES
from .identifier_utils import MissingPrefix, normalize_curie
from .registries import curie_has_blacklisted_prefix, curie_is_blacklisted, remap_prefix
from .struct import (
    Obo,
    Reference,
    Synonym,
    SynonymSpecificities,
    SynonymSpecificity,
    SynonymTypeDef,
    Term,
    TypeDef,
    make_ad_hoc_ontology,
)
from .struct.typedef import default_typedefs, develops_from, has_part, part_of
from .utils.misc import cleanup_version

__all__ = [
    "from_obo_path",
    "from_obonet",
]

logger = logging.getLogger(__name__)

# FIXME use bioontologies
# RELATION_REMAPPINGS: Mapping[str, Tuple[str, str]] = bioontologies.upgrade.load()
RELATION_REMAPPINGS: Mapping[str, Tuple[str, str]] = {
    "part_of": part_of.pair,
    "has_part": has_part.pair,
    "develops_from": develops_from.pair,
    "seeAlso": ("rdf", "seeAlso"),
    "dc-contributor": ("dc", "contributor"),
    "dc-creator": ("dc", "creator"),
}


[docs]def from_obo_path( path: Union[str, Path], prefix: Optional[str] = None, *, strict: bool = True ) -> Obo: """Get the OBO graph from a path.""" import obonet logger.info("[%s] parsing with obonet from %s", prefix or "", path) with open(path) as file: graph = obonet.read_obo( tqdm( file, unit_scale=True, desc=f'[{prefix or ""}] parsing obo', disable=None, leave=False, ) ) if prefix: # Make sure the graph is named properly _clean_graph_ontology(graph, prefix) # Convert to an Obo instance and return return from_obonet(graph, strict=strict)
[docs]def from_obonet(graph: nx.MultiDiGraph, *, strict: bool = True) -> "Obo": # noqa:C901 """Get all of the terms from a OBO graph.""" _ontology = graph.graph["ontology"] ontology = bioregistry.normalize_prefix(_ontology) # probably always okay if ontology is None: raise ValueError(f"unknown prefix: {_ontology}") logger.info("[%s] extracting OBO using obonet", ontology) date = _get_date(graph=graph, ontology=ontology) name = _get_name(graph=graph, ontology=ontology) data_version = graph.graph.get("data-version") if not data_version: if date is not None: data_version = date.strftime("%Y-%m-%d") logger.info( "[%s] does not report a version. falling back to date: %s", ontology, data_version, ) else: logger.warning("[%s] does not report a version nor a date", ontology) else: data_version = cleanup_version(data_version=data_version, prefix=ontology) if data_version is not None: logger.info("[%s] using version %s", ontology, data_version) elif date is not None: logger.info( "[%s] unrecognized version format, falling back to date: %s", ontology, data_version, ) data_version = date.strftime("%Y-%m-%d") else: logger.warning( "[%s] UNRECOGNIZED VERSION FORMAT AND MISSING DATE: %s", ontology, data_version ) if data_version and "/" in data_version: raise ValueError(f"[{ontology}] will not accept slash in data version: {data_version}") #: Parsed CURIEs to references (even external ones) references: Mapping[Tuple[str, str], Reference] = { (prefix, identifier): Reference( prefix=prefix, identifier=identifier, # if name isn't available, it means its external to this ontology name=data.get("name"), ) for prefix, identifier, data in _iter_obo_graph(graph=graph) } #: CURIEs to typedefs typedefs: Mapping[Tuple[str, str], TypeDef] = { (typedef.prefix, typedef.identifier): typedef for typedef in iterate_graph_typedefs(graph, ontology) } synonym_typedefs: Mapping[str, SynonymTypeDef] = { synonym_typedef.id: synonym_typedef for synonym_typedef in iterate_graph_synonym_typedefs(graph) } missing_typedefs = set() terms = [] n_alt_ids, n_parents, n_synonyms, n_relations, n_properties, n_xrefs = 0, 0, 0, 0, 0, 0 for prefix, identifier, data in _iter_obo_graph(graph=graph): if prefix != ontology or not data: continue reference = references[ontology, identifier] try: node_xrefs = list(iterate_node_xrefs(prefix=prefix, data=data, strict=strict)) except MissingPrefix as e: e.reference = reference raise e xrefs, provenance = [], [] for node_xref in node_xrefs: if node_xref.prefix in PROVENANCE_PREFIXES: provenance.append(node_xref) else: xrefs.append(node_xref) n_xrefs += len(xrefs) definition, definition_references = get_definition( data, prefix=prefix, identifier=identifier ) if definition_references: provenance.extend(definition_references) try: alt_ids = list(iterate_node_alt_ids(data, strict=strict)) except MissingPrefix as e: e.reference = reference raise e n_alt_ids += len(alt_ids) try: parents = list( iterate_node_parents( data, prefix=prefix, identifier=identifier, strict=strict, ) ) except MissingPrefix as e: e.reference = reference raise e n_parents += len(parents) synonyms = list( iterate_node_synonyms( data, synonym_typedefs, prefix=prefix, identifier=identifier, strict=strict, ) ) n_synonyms += len(synonyms) term = Term( reference=reference, definition=definition, parents=parents, synonyms=synonyms, xrefs=xrefs, provenance=provenance, alt_ids=alt_ids, ) try: relations_references = list( iterate_node_relationships( data, prefix=ontology, identifier=identifier, strict=strict, ) ) except MissingPrefix as e: e.reference = reference raise e for relation, reference in relations_references: if (relation.prefix, relation.identifier) in typedefs: typedef = typedefs[relation.prefix, relation.identifier] elif (relation.prefix, relation.identifier) in default_typedefs: typedef = default_typedefs[relation.prefix, relation.identifier] else: if (relation.prefix, relation.identifier) not in missing_typedefs: missing_typedefs.add((relation.prefix, relation.identifier)) logger.warning("[%s] has no typedef for %s", ontology, relation) logger.debug("[%s] available typedefs: %s", ontology, set(typedefs)) continue n_relations += 1 term.append_relationship(typedef, reference) for prop, value in iterate_node_properties(data, term=term): n_properties += 1 term.append_property(prop, value) terms.append(term) logger.info( f"[{ontology}] got {len(references):,} references, {len(typedefs):,} typedefs, {len(terms):,} terms," f" {n_alt_ids:,} alt ids, {n_parents:,} parents, {n_synonyms:,} synonyms, {n_xrefs:,} xrefs," f" {n_relations:,} relations, and {n_properties:,} properties", ) return make_ad_hoc_ontology( _ontology=ontology, _name=name, _auto_generated_by=graph.graph.get("auto-generated-by"), _format_version=graph.graph.get("format-version"), _typedefs=list(typedefs.values()), _synonym_typedefs=list(synonym_typedefs.values()), _date=date, _data_version=data_version, terms=terms, )
def _clean_graph_ontology(graph, prefix: str) -> None: """Update the ontology entry in the graph's metadata, if necessary.""" if "ontology" not in graph.graph: logger.warning('[%s] missing "ontology" key', prefix) graph.graph["ontology"] = prefix elif not graph.graph["ontology"].isalpha(): logger.warning( "[%s] ontology=%s has a strange format. replacing with prefix", prefix, graph.graph["ontology"], ) graph.graph["ontology"] = prefix def _iter_obo_graph( graph: nx.MultiDiGraph, *, strict: bool = True, ) -> Iterable[Tuple[str, str, Mapping[str, Any]]]: """Iterate over the nodes in the graph with the prefix stripped (if it's there).""" for node, data in graph.nodes(data=True): prefix, identifier = normalize_curie(node, strict=strict) if prefix is None or identifier is None: continue yield prefix, identifier, data def _get_date(graph, ontology: str) -> Optional[datetime]: try: rv = datetime.strptime(graph.graph["date"], DATE_FORMAT) except KeyError: logger.info("[%s] does not report a date", ontology) return None except ValueError: logger.info("[%s] reports a date that can't be parsed: %s", ontology, graph.graph["date"]) return None else: return rv def _get_name(graph, ontology: str) -> str: try: rv = graph.graph["name"] except KeyError: logger.info("[%s] does not report a name", ontology) rv = ontology return rv def iterate_graph_synonym_typedefs(graph: nx.MultiDiGraph) -> Iterable[SynonymTypeDef]: """Get synonym type definitions from an :mod:`obonet` graph.""" for s in graph.graph.get("synonymtypedef", []): sid, name = s.split(" ", 1) name = name.strip().strip('"') yield SynonymTypeDef(id=sid, name=name) def iterate_graph_typedefs( graph: nx.MultiDiGraph, default_prefix: str, *, strict: bool = True ) -> Iterable[TypeDef]: """Get type definitions from an :mod:`obonet` graph.""" for typedef in graph.graph.get("typedefs", []): if "id" in typedef: curie = typedef["id"] elif "identifier" in typedef: curie = typedef["identifier"] else: raise KeyError name = typedef.get("name") if name is None: logger.warning("[%s] typedef %s is missing a name", graph.graph["ontology"], curie) if ":" in curie: reference = Reference.from_curie(curie, name=name, strict=strict) else: reference = Reference(prefix=graph.graph["ontology"], identifier=curie, name=name) if reference is None: logger.warning("[%s] unable to parse typedef CURIE %s", graph.graph["ontology"], curie) continue xrefs = [] for curie in typedef.get("xref", []): _xref = Reference.from_curie(curie, strict=strict) if _xref: xrefs.append(_xref) yield TypeDef(reference=reference, xrefs=xrefs) def get_definition( data, *, prefix: str, identifier: str ) -> Union[Tuple[None, None], Tuple[str, List[Reference]]]: definition = data.get("def") # it's allowed not to have a definition if not definition: return None, None return _extract_definition(definition, prefix=prefix, identifier=identifier) def _extract_definition( s: str, *, prefix: str, identifier: str, strict: bool = False, ) -> Union[Tuple[None, None], Tuple[str, List[Reference]]]: """Extract the definitions.""" if not s.startswith('"'): raise ValueError("definition does not start with a quote") try: definition, rest = _quote_split(s) except ValueError: logger.warning("[%s:%s] could not parse definition: %s", prefix, identifier, s) return None, None if not rest.startswith("[") or not rest.endswith("]"): logger.warning("[%s:%s] problem with definition: %s", prefix, identifier, s) provenance = [] else: provenance = _parse_trailing_ref_list(rest, strict=strict) return definition, provenance def _get_first_nonquoted(s: str) -> Optional[int]: for i, (a, b) in enumerate(pairwise(s), start=1): if b == '"' and a != "\\": return i return None def _quote_split(s: str) -> Tuple[str, str]: s = s.lstrip('"') i = _get_first_nonquoted(s) if i is None: raise ValueError return _clean_definition(s[:i].strip()), s[i + 1 :].strip() def _clean_definition(s: str) -> str: # if '\t' in s: # logger.warning('has tab') return ( s.replace('\\"', '"').replace("\n", " ").replace("\t", " ").replace("\d", "") # noqa:W605 ) def _extract_synonym( s: str, synonym_typedefs: Mapping[str, SynonymTypeDef], *, prefix: str, identifier: str, strict: bool = True, ) -> Optional[Synonym]: # TODO check if the synonym is written like a CURIE... it shouldn't but I've seen it happen try: name, rest = _quote_split(s) except ValueError: logger.warning("[%s:%s] invalid synonym: %s", prefix, identifier, s) return None specificity: Optional[SynonymSpecificity] = None for _specificity in SynonymSpecificities: if rest.startswith(_specificity): specificity = _specificity rest = rest[len(_specificity) :].strip() break stype: Optional[SynonymTypeDef] = None if specificity is not None: # go fishing for a synonym type definition for std in synonym_typedefs: if rest.startswith(std): stype = synonym_typedefs[std] rest = rest[len(std) :].strip() break if not rest.startswith("[") or not rest.endswith("]"): logger.warning("[%s:%s] problem with synonym: %s", prefix, identifier, s) return None provenance = _parse_trailing_ref_list(rest, strict=strict) return Synonym(name=name, specificity=specificity or "EXACT", type=stype, provenance=provenance) def _parse_trailing_ref_list(rest, *, strict: bool = True): rest = rest.lstrip("[").rstrip("]") return [ Reference.from_curie(curie.strip(), strict=strict) for curie in rest.split(",") if curie.strip() ] def iterate_node_synonyms( data: Mapping[str, Any], synonym_typedefs: Mapping[str, SynonymTypeDef], *, prefix: str, identifier: str, strict: bool = False, ) -> Iterable[Synonym]: """Extract synonyms from a :mod:`obonet` node's data. Example strings: - "LTEC I" EXACT [Orphanet:93938,DOI:xxxx] - "LTEC I" EXACT [Orphanet:93938] - "LTEC I" [Orphanet:93938] - "LTEC I" [] """ for s in data.get("synonym", []): s = _extract_synonym( s, synonym_typedefs, prefix=prefix, identifier=identifier, strict=strict ) if s is not None: yield s HANDLED_PROPERTY_TYPES = { "xsd:string": str, "xsd:dateTime": datetime, } def iterate_node_properties( data: Mapping[str, Any], *, property_prefix: Optional[str] = None, term=None ) -> Iterable[Tuple[str, str]]: """Extract properties from a :mod:`obonet` node's data.""" for prop_value_type in data.get("property_value", []): try: prop, value_type = prop_value_type.split(" ", 1) except ValueError: logger.info("malformed property: %s on %s", prop_value_type, term and term.curie) continue if property_prefix is not None and prop.startswith(property_prefix): prop = prop[len(property_prefix) :] try: value, _ = value_type.rsplit(" ", 1) # second entry is the value type except ValueError: # logger.debug(f'property missing datatype. defaulting to string - {prop_value_type}') value = value_type # could assign type to be 'xsd:string' by default value = value.strip('"') yield prop, value def iterate_node_parents( data: Mapping[str, Any], *, prefix: str, identifier: str, strict: bool = True, ) -> Iterable[Reference]: """Extract parents from a :mod:`obonet` node's data.""" for parent_curie in data.get("is_a", []): reference = Reference.from_curie(parent_curie, strict=strict) if reference is None: logger.warning( "[%s:%s] could not parse parent curie: %s", prefix, identifier, parent_curie ) continue yield reference def iterate_node_alt_ids(data: Mapping[str, Any], *, strict: bool = True) -> Iterable[Reference]: """Extract alternate identifiers from a :mod:`obonet` node's data.""" for curie in data.get("alt_id", []): reference = Reference.from_curie(curie, strict=strict) if reference is not None: yield reference def iterate_node_relationships( data: Mapping[str, Any], *, prefix: str, identifier: str, strict: bool = True, ) -> Iterable[Tuple[Reference, Reference]]: """Extract relationships from a :mod:`obonet` node's data.""" for s in data.get("relationship", []): relation_curie, target_curie = s.split(" ") relation_prefix: Optional[str] relation_identifier: Optional[str] if relation_curie in RELATION_REMAPPINGS: relation_prefix, relation_identifier = RELATION_REMAPPINGS[relation_curie] else: relation_prefix, relation_identifier = normalize_curie(relation_curie) if relation_prefix is not None and relation_identifier is not None: relation = Reference(prefix=relation_prefix, identifier=relation_identifier) elif prefix is not None: relation = Reference(prefix=prefix, identifier=relation_curie) else: logger.debug("unhandled relation: %s", relation_curie) relation = Reference(prefix="obo", identifier=relation_curie) # TODO replace with omni-parser from :mod:`curies` target = Reference.from_curie(target_curie, strict=strict) if target is None: logger.warning( "[%s:%s] %s could not parse target %s", prefix, identifier, relation, target_curie ) continue yield relation, target def iterate_node_xrefs( *, prefix: str, data: Mapping[str, Any], strict: bool = True ) -> Iterable[Reference]: """Extract xrefs from a :mod:`obonet` node's data.""" for xref in data.get("xref", []): xref = xref.strip() if curie_has_blacklisted_prefix(xref) or curie_is_blacklisted(xref) or ":" not in xref: continue # sometimes xref to self... weird xref = remap_prefix(xref) split_space = " " in xref if split_space: _xref_split = xref.split(" ", 1) if _xref_split[1][0] not in {'"', "("}: logger.debug("[%s] Problem with space in xref %s", prefix, xref) continue xref = _xref_split[0] yv = Reference.from_curie(xref, strict=strict) if yv is not None: yield yv