Source code for pyobo.struct.obo.reader

"""OBO Readers."""

from __future__ import annotations

import logging
import typing as t
from collections import Counter
from collections.abc import Iterable, Mapping
from datetime import datetime
from io import StringIO
from pathlib import Path
from textwrap import dedent
from typing import Any, TypeAlias

import bioregistry
import networkx as nx
from bioregistry import NormalizedNamableReference as Reference
from curies import ReferenceTuple
from curies.preprocessing import BlocklistError
from curies.vocabulary import SynonymScope
from more_itertools import pairwise
from pystow.utils import open_zipfile, safe_open
from tqdm.auto import tqdm

from .reader_utils import (
    _chomp_axioms,
    _chomp_references,
    _chomp_specificity,
    _chomp_typedef,
    _parse_provenance_list,
)
from .. import vocabulary as v
from ..reference import OBOLiteral, _obo_parse_identifier, default_reference
from ..struct import (
    Obo,
    Synonym,
    SynonymTypeDef,
    Term,
    TypeDef,
    build_ontology,
)
from ..struct_utils import Annotation, Stanza
from ..typedef import default_typedefs, has_comment, has_ontology_root_term
from ...constants import DATE_FORMAT, PROVENANCE_PREFIXES
from ...identifier_utils import (
    NotCURIEError,
    ParseError,
    UnparsableIRIError,
    _is_valid_identifier,
    _parse_str_or_curie_or_uri_helper,
    get_rules,
)
from ...utils.cache import write_gzipped_graph
from ...utils.misc import _prioritize_version

__all__ = [
    "from_obo_path",
    "from_obonet",
    "from_str",
]

logger = logging.getLogger(__name__)


[docs] def from_obo_path( path: str | Path, prefix: str | None = None, *, strict: bool = False, version: str | None, upgrade: bool = True, use_tqdm: bool = False, ignore_obsolete: bool = False, _cache_path: Path | None = None, ) -> Obo: """Get the OBO graph from a path.""" path = Path(path).expanduser().resolve() if path.suffix.endswith(".zip"): logger.info("[%s] parsing zipped OBO with obonet from %s", prefix or "<unknown>", path) with open_zipfile(path, path.name.removesuffix(".zip")) as file: graph = _read_obo(file, prefix, ignore_obsolete=ignore_obsolete, use_tqdm=use_tqdm) else: logger.info("[%s] parsing OBO with obonet from %s", prefix or "<unknown>", path) with safe_open(path, operation="read") as file: graph = _read_obo(file, prefix, ignore_obsolete=ignore_obsolete, use_tqdm=use_tqdm) if prefix: # Make sure the graph is named properly _clean_graph_ontology(graph, prefix) if _cache_path: logger.info("[%s] writing obonet cache to %s", prefix, _cache_path) write_gzipped_graph(path=_cache_path, graph=graph) # Convert to an Obo instance and return return from_obonet(graph, strict=strict, version=version, upgrade=upgrade, use_tqdm=use_tqdm)
def _read_obo( lines: Iterable[str], prefix: str | None, ignore_obsolete: bool, use_tqdm: bool = True, ) -> nx.MultiDiGraph: import obonet tqdm_kwargs = { "unit_scale": True, "desc": f"[{prefix or ''}] parsing OBO", "leave": True, } return obonet.read_obo( tqdm(lines, disable=not use_tqdm, **tqdm_kwargs), ignore_obsolete=ignore_obsolete, ) def _normalize_prefix_strict(prefix: str) -> str: n = bioregistry.normalize_prefix(prefix) if n is None: raise ValueError(f"unknown prefix: {prefix}") return n def from_str( text: str, *, strict: bool = False, version: str | None = None, upgrade: bool = True, ignore_obsolete: bool = False, use_tqdm: bool = False, ) -> Obo: """Read an ontology from a string representation.""" import obonet text = dedent(text).strip() io = StringIO() io.write(text) io.seek(0) graph = obonet.read_obo(io, ignore_obsolete=ignore_obsolete) return from_obonet(graph, strict=strict, version=version, upgrade=upgrade, use_tqdm=use_tqdm)
[docs] def from_obonet( graph: nx.MultiDiGraph, *, strict: bool = False, version: str | None = None, upgrade: bool = True, use_tqdm: bool = False, ) -> Obo: """Get all the terms from a OBO graph.""" ontology_prefix_raw = graph.graph["ontology"] ontology_prefix = _normalize_prefix_strict(ontology_prefix_raw) logger.info("[%s] extracting OBO using obonet", ontology_prefix) date = _get_date(graph=graph, ontology_prefix=ontology_prefix) name = _get_name(graph=graph, ontology_prefix=ontology_prefix) imports = graph.graph.get("import") macro_config = MacroConfig(graph.graph, strict=strict, ontology_prefix=ontology_prefix) data_version = _prioritize_version( data_version=graph.graph.get("data-version") or None, ontology_prefix=ontology_prefix, version=version, date=date, ) if data_version and "/" in data_version: raise ValueError( f"[{ontology_prefix}] slashes not allowed in data versions because of filesystem usage: {data_version}" ) missing_typedefs: set[ReferenceTuple] = set() subset_typedefs = _get_subsetdefs(graph.graph, ontology_prefix=ontology_prefix, strict=strict) root_terms: list[Reference] = [] property_values: list[Annotation] = [] for ann in iterate_node_properties( graph.graph, ontology_prefix=ontology_prefix, upgrade=upgrade, node=Reference(prefix="obo", identifier=ontology_prefix), strict=strict, context="graph property", ): if ann.predicate.pair == has_ontology_root_term.pair: match ann.value: case OBOLiteral(): logger.warning( "[%s] tried to use a literal as an ontology root: %s", ontology_prefix, ann.value.value, ) continue case Reference(): root_terms.append(ann.value) else: property_values.append(ann) for remark in graph.graph.get("remark", []): property_values.append(Annotation(has_comment.reference, OBOLiteral.string(remark))) idspaces: dict[str, str] = {} for x in graph.graph.get("idspace", []): prefix, uri_prefix, *_ = (y.strip() for y in x.split(" ", 2)) idspaces[prefix] = uri_prefix #: CURIEs to typedefs typedefs: Mapping[ReferenceTuple, TypeDef] = { typedef.pair: typedef for typedef in iterate_typedefs( graph, ontology_prefix=ontology_prefix, strict=strict, upgrade=upgrade, macro_config=macro_config, ) } synonym_typedefs: Mapping[ReferenceTuple, SynonymTypeDef] = { synonym_typedef.pair: synonym_typedef for synonym_typedef in iterate_graph_synonym_typedefs( graph, ontology_prefix=ontology_prefix, strict=strict, upgrade=upgrade, ) } terms = _get_terms( graph, strict=strict, ontology_prefix=ontology_prefix, upgrade=upgrade, typedefs=typedefs, missing_typedefs=missing_typedefs, synonym_typedefs=synonym_typedefs, subset_typedefs=subset_typedefs, macro_config=macro_config, use_tqdm=use_tqdm, ) return build_ontology( prefix=ontology_prefix, name=name, auto_generated_by=graph.graph.get("auto-generated-by"), typedefs=list(typedefs.values()), synonym_typedefs=list(synonym_typedefs.values()), date=date, version=data_version, idspaces=idspaces, root_terms=root_terms, subsetdefs=subset_typedefs, properties=property_values, imports=imports, # ontology_iri # ontology_version_iri terms=terms, )
def _get_terms( graph: nx.MultiDiGraph, *, strict: bool, ontology_prefix: str, upgrade: bool, typedefs: Mapping[ReferenceTuple, TypeDef], synonym_typedefs: Mapping[ReferenceTuple, SynonymTypeDef], subset_typedefs: SubsetTypeDefs, missing_typedefs: set[ReferenceTuple], macro_config: MacroConfig, use_tqdm: bool = False, ) -> list[Term]: terms = [] for reference, data in _iter_obo_graph( graph=graph, strict=strict, ontology_prefix=ontology_prefix, use_tqdm=use_tqdm, upgrade=upgrade, ): if reference.prefix != ontology_prefix: continue if not data: # this allows us to skip anything that isn't really defined # caveat: this misses terms that are just defined with an ID continue term = Term( reference=reference, builtin=_get_boolean(data, "builtin"), is_anonymous=_get_boolean(data, "is_anonymous"), is_obsolete=_get_boolean(data, "is_obsolete"), namespace=data.get("namespace"), ) _process_alts(term, data, ontology_prefix=ontology_prefix, strict=strict) _process_parents(term, data, ontology_prefix=ontology_prefix, strict=strict) _process_synonyms( term, data, ontology_prefix=ontology_prefix, strict=strict, upgrade=upgrade, synonym_typedefs=synonym_typedefs, ) _process_xrefs( term, data, ontology_prefix=ontology_prefix, strict=strict, macro_config=macro_config, upgrade=upgrade, ) _process_properties( term, data, ontology_prefix=ontology_prefix, strict=strict, upgrade=upgrade, typedefs=typedefs, ) _process_relations( term, data, ontology_prefix=ontology_prefix, strict=strict, upgrade=upgrade, typedefs=typedefs, missing_typedefs=missing_typedefs, ) _process_replaced_by(term, data, ontology_prefix=ontology_prefix, strict=strict) _process_subsets( term, data, ontology_prefix=ontology_prefix, strict=strict, subset_typedefs=subset_typedefs, ) _process_intersection_of(term, data, ontology_prefix=ontology_prefix, strict=strict) _process_union_of(term, data, ontology_prefix=ontology_prefix, strict=strict) _process_equivalent_to(term, data, ontology_prefix=ontology_prefix, strict=strict) _process_disjoint_from(term, data, ontology_prefix=ontology_prefix, strict=strict) _process_consider(term, data, ontology_prefix=ontology_prefix, strict=strict) _process_comment(term, data) _process_description(term, data, ontology_prefix=ontology_prefix, strict=strict) _process_creation_date(term, data) terms.append(term) return terms def _process_description( term: Stanza, data: dict[str, Any], *, ontology_prefix: str, strict: bool ) -> None: definition, definition_references = get_definition( data, node=term.reference, strict=strict, ontology_prefix=ontology_prefix ) term.definition = definition if term.definition: for definition_reference in definition_references: term._append_annotation( v.has_description, OBOLiteral.string(term.definition), Annotation(v.has_dbxref, definition_reference), ) def _process_comment(term: Stanza, data: dict[str, Any]) -> None: if comment := data.get("comment"): term.append_comment(comment) def _process_creation_date(term: Stanza, data: dict[str, Any]) -> None: date_str = data.get("creation_date") if not date_str: return if isinstance(date_str, list): date_str = date_str[0] try: term.append_creation_date(date_str) except ValueError: logger.warning("[%s] failed to parse creation_date: %s", term.reference.curie, date_str) def _process_union_of( term: Stanza, data: dict[str, Any], *, ontology_prefix: str, strict: bool ) -> None: for reference in iterate_node_reference_tag( term, "union_of", data=data, ontology_prefix=ontology_prefix, strict=strict, node=term.reference, ): term.append_union_of(reference) def _process_equivalent_to( term: Stanza, data: dict[str, Any], *, ontology_prefix: str, strict: bool ) -> None: for reference in iterate_node_reference_tag( term, "equivalent_to", data=data, ontology_prefix=ontology_prefix, strict=strict, node=term.reference, ): term.append_equivalent_to(reference) def _process_disjoint_from( term: Stanza, data: dict[str, Any], *, ontology_prefix: str, strict: bool ) -> None: for reference in iterate_node_reference_tag( term, "disjoint_from", data=data, ontology_prefix=ontology_prefix, strict=strict, node=term.reference, ): term.append_disjoint_from(reference) def _process_alts( term: Stanza, data: dict[str, Any], *, ontology_prefix: str, strict: bool ) -> None: for alt_reference in iterate_node_reference_tag( term, "alt_id", data, node=term.reference, strict=strict, ontology_prefix=ontology_prefix ): term.append_alt(alt_reference) def _process_parents( term: Stanza, data: dict[str, Any], *, ontology_prefix: str, strict: bool ) -> None: for tag in ["is_a", "instance_of"]: for parent in iterate_node_reference_tag( term, tag, data, node=term.reference, strict=strict, ontology_prefix=ontology_prefix ): term.append_parent(parent) def _process_synonyms( term: Stanza, data: dict[str, Any], *, ontology_prefix: str, strict: bool, upgrade: bool, synonym_typedefs: Mapping[ReferenceTuple, SynonymTypeDef], ) -> None: synonyms = list( iterate_node_synonyms( data, synonym_typedefs, node=term.reference, strict=strict, ontology_prefix=ontology_prefix, upgrade=upgrade, ) ) for synonym in synonyms: term.append_synonym(synonym) def _process_xrefs( term: Stanza, data: dict[str, Any], *, ontology_prefix: str, strict: bool, macro_config: MacroConfig, upgrade: bool, ) -> None: for reference, provenance in iterate_node_xrefs( data=data, strict=strict, ontology_prefix=ontology_prefix, node=term.reference, upgrade=upgrade, ): _handle_xref(term, reference, provenance=provenance, macro_config=macro_config) def _process_properties( term: Stanza, data: dict[str, Any], *, ontology_prefix: str, strict: bool, upgrade: bool, typedefs: Mapping[ReferenceTuple, TypeDef], ) -> None: for ann in iterate_node_properties( data, node=term.reference, strict=strict, ontology_prefix=ontology_prefix, upgrade=upgrade, context="stanza property", ): if ann.predicate.pair not in typedefs: pass # TODO logging # TODO parse axioms term.append_property(ann) def _process_relations( term: Stanza, data: dict[str, Any], *, ontology_prefix: str, strict: bool, upgrade: bool, typedefs: Mapping[ReferenceTuple, TypeDef], missing_typedefs: set[ReferenceTuple], ) -> None: relations_references = list( iterate_node_relationships( data, node=term.reference, strict=strict, ontology_prefix=ontology_prefix, upgrade=upgrade, ) ) for relation, reference in relations_references: if ( relation.pair not in typedefs and relation.pair not in default_typedefs and relation.pair not in missing_typedefs ): missing_typedefs.add(relation.pair) logger.warning("[%s] has no typedef for %s", ontology_prefix, relation.curie) logger.debug("[%s] available typedefs: %s", ontology_prefix, set(typedefs)) # TODO parse axioms term.append_relationship(relation, reference) def _process_replaced_by( stanza: Stanza, data: dict[str, Any], *, ontology_prefix: str, strict: bool ) -> None: for reference in iterate_node_reference_tag( stanza, "replaced_by", data, node=stanza.reference, strict=strict, ontology_prefix=ontology_prefix, ): stanza.append_replaced_by(reference) UNDEFINED_SUBSETS: set[Reference] = set() def _process_subsets( stanza: Stanza, data: dict[str, Any], *, ontology_prefix: str, strict: bool, subset_typedefs: SubsetTypeDefs, ) -> None: for reference in iterate_node_reference_tag( stanza, "subset", data, node=stanza.reference, strict=strict, ontology_prefix=ontology_prefix, counter=SUBSET_ERROR_COUNTER, ): if reference not in subset_typedefs: if reference not in UNDEFINED_SUBSETS: logger.debug("[%s] undefined subset: %s", stanza.curie, reference) UNDEFINED_SUBSETS.add(reference) stanza.append_subset(reference) # needed to parse OPMI _BOOLEAN_TRUE_VALUES = {"true", "1", 1} _BOOLEAN_FALSE_VALUES = {"false", "0", 0} def _get_boolean(data: dict[str, Any], tag: str) -> bool | None: value = data.get(tag) if value is None: return None if isinstance(value, list): value = value[0] if value in _BOOLEAN_FALSE_VALUES: return False if value in _BOOLEAN_TRUE_VALUES: return True raise ValueError(f"unhandled value for boolean: ({type(value)}) {value}") def _get_reference( data: dict[str, Any], tag: str, *, ontology_prefix: str, strict: bool, **kwargs: Any ) -> Reference | None: value = data.get(tag) if value is None: return None if isinstance(value, list): value = value[0] return _obo_parse_identifier( value, ontology_prefix=ontology_prefix, strict=strict, context=tag, **kwargs ) class MacroConfig: """A configuration data class for reader macros.""" def __init__( self, data: Mapping[str, list[str]] | None = None, *, strict: bool, ontology_prefix: str ) -> None: """Instantiate the configuration from obonet graph metadata.""" if data is None: data = {} self.treat_xrefs_as_equivalent: set[str] = set() for prefix in data.get("treat-xrefs-as-equivalent", []): prefix_norm = bioregistry.normalize_prefix(prefix) if prefix_norm is None: continue self.treat_xrefs_as_equivalent.add(prefix_norm) self.treat_xrefs_as_genus_differentia: dict[str, tuple[Reference, Reference]] = {} for line in data.get("treat-xrefs-as-genus-differentia", []): try: gd_prefix, gd_predicate, gd_target = line.split() except ValueError: # this happens in `plana`, where there's an incorrectly written # line `CARO part_of NCBITaxon:79327; CL part_of NCBITaxon:79327` tqdm.write( f"[{ontology_prefix}] failed to parse treat-xrefs-as-genus-differentia: {line}" ) continue gd_prefix_norm = bioregistry.normalize_prefix(gd_prefix) if gd_prefix_norm is None: continue gd_predicate_re = _obo_parse_identifier( gd_predicate, ontology_prefix=ontology_prefix, strict=strict ) if gd_predicate_re is None: continue gd_target_re = _obo_parse_identifier( gd_target, ontology_prefix=ontology_prefix, strict=strict ) if gd_target_re is None: continue self.treat_xrefs_as_genus_differentia[gd_prefix_norm] = (gd_predicate_re, gd_target_re) self.treat_xrefs_as_relationship: dict[str, Reference] = {} for line in data.get("treat-xrefs-as-relationship", []): try: gd_prefix, gd_predicate = line.split() except ValueError: tqdm.write( f"[{ontology_prefix}] failed to parse treat-xrefs-as-relationship: {line}" ) continue gd_prefix_norm = bioregistry.normalize_prefix(gd_prefix) if gd_prefix_norm is None: continue gd_predicate_re = _obo_parse_identifier( gd_predicate, ontology_prefix=ontology_prefix, strict=strict ) if gd_predicate_re is None: continue self.treat_xrefs_as_relationship[gd_prefix_norm] = gd_predicate_re self.treat_xrefs_as_is_a: set[str] = set() for prefix in data.get("treat-xrefs-as-is_a", []): gd_prefix_norm = bioregistry.normalize_prefix(prefix) if gd_prefix_norm is None: continue self.treat_xrefs_as_is_a.add(gd_prefix_norm) def _handle_xref( term: Stanza, xref: Reference, *, provenance: list[Reference | OBOLiteral], macro_config: MacroConfig | None = None, ) -> Stanza: annotations = [Annotation(v.has_dbxref, p) for p in provenance] if macro_config is not None: if xref.prefix in macro_config.treat_xrefs_as_equivalent: return term.append_equivalent(xref, annotations=annotations) elif object_property := macro_config.treat_xrefs_as_genus_differentia.get(xref.prefix): # TODO how to add annotations here? if annotations: logger.warning( "[%s] unable to add provenance to xref upgraded to intersection_of: %s", term.reference.curie, xref, ) return term.append_intersection_of(xref).append_intersection_of(object_property) elif predicate := macro_config.treat_xrefs_as_relationship.get(xref.prefix): return term.append_relationship(predicate, xref, annotations=annotations) elif xref.prefix in macro_config.treat_xrefs_as_is_a: return term.append_parent(xref, annotations=annotations) # TODO this is not what spec calls for, maybe # need a flag in macro config for this if xref.prefix in PROVENANCE_PREFIXES: return term.append_mentioned_by(xref, annotations=annotations) return term.append_xref(xref, annotations=annotations) SUBSET_ERROR_COUNTER: Counter[tuple[str, str]] = Counter() SubsetTypeDefs: TypeAlias = dict[Reference, str] def _get_subsetdefs( graph: nx.MultiDiGraph, ontology_prefix: str, *, strict: bool = False ) -> SubsetTypeDefs: rv = {} for subsetdef in graph.get("subsetdef", []): left, _, right = subsetdef.partition(" ") if not right: logger.warning("[%s] subsetdef did not have two parts", ontology_prefix, subsetdef) continue left_ref = _obo_parse_identifier( left, ontology_prefix=ontology_prefix, name=right, line=subsetdef, counter=SUBSET_ERROR_COUNTER, strict=strict, ) if left_ref is None: continue right = right.strip('"') rv[left_ref] = right return rv def _clean_graph_ontology(graph: nx.MultiDiGraph, prefix: str) -> None: """Update the ontology entry in the graph's metadata, if necessary.""" if "ontology" not in graph.graph: logger.debug('[%s] missing "ontology" key', prefix) graph.graph["ontology"] = prefix elif not graph.graph["ontology"].isalpha(): logger.debug( "[%s] ontology prefix `%s` has a strange format. replacing with prefix", prefix, graph.graph["ontology"], ) graph.graph["ontology"] = prefix def _iter_obo_graph( graph: nx.MultiDiGraph, *, strict: bool = False, ontology_prefix: str, use_tqdm: bool = False, upgrade: bool, ) -> Iterable[tuple[Reference, dict[str, Any]]]: """Iterate over the nodes in the graph with the prefix stripped (if it's there).""" for node, data in tqdm( graph.nodes(data=True), disable=not use_tqdm, unit_scale=True, desc=f"[{ontology_prefix}]" ): name = data.get("name") match _parse_str_or_curie_or_uri_helper( node, ontology_prefix=ontology_prefix, name=name, upgrade=upgrade, context="stanza ID", ): case Reference() as reference: yield reference, data case NotCURIEError() as exc: if _is_valid_identifier(node): yield default_reference(ontology_prefix, node, name=name), data elif strict: raise exc else: logger.warning(str(exc)) case ParseError() as exc: if strict: raise exc else: logger.warning(str(exc)) # if blacklisted, just skip it with no warning def _get_date(graph: nx.MultiDiGraph, ontology_prefix: str) -> datetime | None: try: rv = datetime.strptime(graph.graph["date"], DATE_FORMAT) except KeyError: logger.info("[%s] does not report a date", ontology_prefix) return None except ValueError: logger.info( "[%s] reports a date that can't be parsed: %s", ontology_prefix, graph.graph["date"] ) return None else: return rv def _get_name(graph: nx.MultiDiGraph, ontology_prefix: str) -> str: try: rv = t.cast(str, graph.graph["name"]) except KeyError: logger.info("[%s] does not report a name", ontology_prefix) return ontology_prefix else: return rv def iterate_graph_synonym_typedefs( graph: nx.MultiDiGraph, *, ontology_prefix: str, strict: bool = False, upgrade: bool ) -> Iterable[SynonymTypeDef]: """Get synonym type definitions from an :mod:`obonet` graph.""" for line in graph.graph.get("synonymtypedef", []): # TODO handle trailing comments line, _, specificity = (x.strip() for x in line.rpartition('"')) specificity = specificity.upper() if not specificity: specificity = None elif specificity not in t.get_args(SynonymScope): if strict: raise ValueError(f"invalid synonym specificty: {specificity}") logger.warning("[%s] invalid synonym specificty: %s", ontology_prefix, specificity) specificity = None curie, name = line.split(" ", 1) # the name should be in quotes, so strip them out name = name.strip().strip('"') # TODO unquote the string? reference = _obo_parse_identifier( curie, ontology_prefix=ontology_prefix, name=name, upgrade=upgrade, strict=strict, ) if reference is None: logger.warning("[%s] unable to parse synonym typedef ID %s", ontology_prefix, curie) continue yield SynonymTypeDef(reference=reference, specificity=specificity) def iterate_typedefs( graph: nx.MultiDiGraph, *, ontology_prefix: str, strict: bool = False, upgrade: bool, macro_config: MacroConfig | None = None, ) -> Iterable[TypeDef]: """Get type definitions from an :mod:`obonet` graph.""" if macro_config is None: macro_config = MacroConfig(strict=strict, ontology_prefix=ontology_prefix) # can't really have a pre-defined set of synonym typedefs here! synonym_typedefs: Mapping[ReferenceTuple, SynonymTypeDef] = {} typedefs: Mapping[ReferenceTuple, TypeDef] = {} subset_typedefs: SubsetTypeDefs = {} # FIXME missing_typedefs: set[ReferenceTuple] = set() for data in graph.graph.get("typedefs", []): if "id" in data: typedef_id = data["id"] elif "identifier" in data: typedef_id = data["identifier"] else: raise KeyError("typedef is missing an `id`") name = data.get("name") if name is None: logger.debug("[%s] typedef %s is missing a name", ontology_prefix, typedef_id) reference = _obo_parse_identifier( typedef_id, strict=strict, ontology_prefix=ontology_prefix, name=name, upgrade=upgrade ) if reference is None: logger.warning("[%s] unable to parse typedef ID %s", ontology_prefix, typedef_id) continue typedef = TypeDef( reference=reference, namespace=data.get("namespace"), is_metadata_tag=_get_boolean(data, "is_metadata_tag"), is_class_level=_get_boolean(data, "is_class_level"), builtin=_get_boolean(data, "builtin"), is_obsolete=_get_boolean(data, "is_obsolete"), is_anonymous=_get_boolean(data, "is_anonymous"), is_anti_symmetric=_get_boolean(data, "is_anti_symmetric"), is_symmetric=_get_boolean(data, "is_symmetric"), is_reflexive=_get_boolean(data, "is_reflexive"), is_cyclic=_get_boolean(data, "is_cyclic"), is_transitive=_get_boolean(data, "is_transitive"), is_functional=_get_boolean(data, "is_functional"), is_inverse_functional=_get_boolean(data, "is_inverse_functional"), domain=_get_reference(data, "domain", ontology_prefix=ontology_prefix, strict=strict), range=_get_reference(data, "range", ontology_prefix=ontology_prefix, strict=strict), inverse=_get_reference( data, "inverse_of", ontology_prefix=ontology_prefix, strict=strict ), ) _process_alts(typedef, data, ontology_prefix=ontology_prefix, strict=strict) _process_parents(typedef, data, ontology_prefix=ontology_prefix, strict=strict) _process_synonyms( typedef, data, ontology_prefix=ontology_prefix, strict=strict, upgrade=upgrade, synonym_typedefs=synonym_typedefs, ) _process_xrefs( typedef, data, ontology_prefix=ontology_prefix, strict=strict, macro_config=macro_config, upgrade=upgrade, ) _process_properties( typedef, data, ontology_prefix=ontology_prefix, strict=strict, upgrade=upgrade, typedefs=typedefs, ) _process_relations( typedef, data, ontology_prefix=ontology_prefix, strict=strict, upgrade=upgrade, typedefs=typedefs, missing_typedefs=missing_typedefs, ) _process_replaced_by(typedef, data, ontology_prefix=ontology_prefix, strict=strict) _process_subsets( typedef, data, ontology_prefix=ontology_prefix, strict=strict, subset_typedefs=subset_typedefs, ) _process_intersection_of(typedef, data, ontology_prefix=ontology_prefix, strict=strict) _process_union_of(typedef, data, ontology_prefix=ontology_prefix, strict=strict) _process_equivalent_to(typedef, data, ontology_prefix=ontology_prefix, strict=strict) _process_disjoint_from(typedef, data, ontology_prefix=ontology_prefix, strict=strict) _process_consider(typedef, data, ontology_prefix=ontology_prefix, strict=strict) _process_comment(typedef, data) _process_description(typedef, data, ontology_prefix=ontology_prefix, strict=strict) _process_creation_date(typedef, data) # the next 4 are typedef-specific _process_equivalent_to_chain(typedef, data, ontology_prefix=ontology_prefix, strict=strict) _process_holds_over_chain(typedef, data, ontology_prefix=ontology_prefix, strict=strict) typedef.disjoint_over.extend( iterate_node_reference_tag( typedef, "disjoint_over", data, node=typedef.reference, ontology_prefix=ontology_prefix, strict=strict, ) ) typedef.transitive_over.extend( iterate_node_reference_tag( typedef, "transitive_over", data, node=typedef.reference, ontology_prefix=ontology_prefix, strict=strict, ) ) yield typedef def _process_consider( stanza: Stanza, data: dict[str, Any], *, ontology_prefix: str, strict: bool = False ) -> None: for reference in iterate_node_reference_tag( stanza, "consider", data, node=stanza.reference, ontology_prefix=ontology_prefix, strict=strict, ): stanza.append_see_also(reference) def _process_equivalent_to_chain( typedef: TypeDef, data: dict[str, Any], *, ontology_prefix: str, strict: bool = False ) -> None: for chain in _iterate_chain( "equivalent_to_chain", typedef, data, ontology_prefix=ontology_prefix, strict=strict ): typedef.equivalent_to_chain.append(chain) def _process_holds_over_chain( typedef: TypeDef, data: dict[str, Any], *, ontology_prefix: str, strict: bool = False ) -> None: for chain in _iterate_chain( "holds_over_chain", typedef, data, ontology_prefix=ontology_prefix, strict=strict ): typedef.holds_over_chain.append(chain) def _iterate_chain( tag: str, typedef: TypeDef, data: dict[str, Any], *, ontology_prefix: str, strict: bool = False ) -> Iterable[list[Reference]]: for chain in data.get(tag, []): # chain is a list of CURIEs predicate_chain = _process_chain_helper(typedef, chain, ontology_prefix=ontology_prefix) if predicate_chain is None: logger.warning( "[%s - %s] could not parse line: %s: %s", ontology_prefix, typedef.curie, tag, chain, ) else: yield predicate_chain def _process_chain_helper( term: Stanza, chain: str, ontology_prefix: str, strict: bool = False ) -> list[Reference] | None: rv = [] for curie in chain.split(): curie = curie.strip() r = _obo_parse_identifier( curie, ontology_prefix=ontology_prefix, strict=strict, node=term.reference ) if r is None: return None rv.append(r) return rv def get_definition( data: dict[str, Any], *, node: Reference, ontology_prefix: str, strict: bool = False ) -> tuple[None | str, list[Reference | OBOLiteral]]: """Extract the definition from the data.""" definition = data.get("def") # it's allowed not to have a definition if not definition: return None, [] return _extract_definition( definition, node=node, strict=strict, ontology_prefix=ontology_prefix ) def _extract_definition( s: str, *, node: Reference, strict: bool = False, ontology_prefix: str, ) -> tuple[None | str, list[Reference | OBOLiteral]]: """Extract the definitions.""" if not s.startswith('"'): logger.warning(f"[{node.curie}] definition does not start with a quote") return None, [] try: definition, rest = _quote_split(s) except ValueError as e: logger.warning("[%s] failed to parse definition quotes: %s", node.curie, str(e)) return None, [] if not rest.startswith("["): logger.debug("[%s] no square brackets for provenance on line: %s", node.curie, s) provenance = [] else: rest = rest.lstrip("[").rstrip("]") # FIXME this doesn't account for trailing annotations provenance = _parse_provenance_list( rest, node=node, ontology_prefix=ontology_prefix, counter=DEFINITION_PROVENANCE_COUNTER, scope_text="definition provenance", line=s, strict=strict, ) return definition or None, provenance def get_first_nonescaped_quote(s: str) -> int | None: """Get the first non-escaped quote.""" if not s: return None if s[0] == '"': # special case first position return 0 for i, (a, b) in enumerate(pairwise(s), start=1): if b == '"' and a != "\\": return i return None def _quote_split(s: str) -> tuple[str, str]: if not s.startswith('"'): raise ValueError(f"'{s}' does not start with a quote") s = s.removeprefix('"') i = get_first_nonescaped_quote(s) if i is None: raise ValueError(f"no closing quote found in `{s}`") return _clean_definition(s[:i].strip()), s[i + 1 :].strip() def _clean_definition(s: str) -> str: # if '\t' in s: # logger.warning('has tab') return s.replace('\\"', '"').replace("\n", " ").replace("\t", " ").replace(r"\d", "") def _extract_synonym( s: str, synonym_typedefs: Mapping[ReferenceTuple, SynonymTypeDef], *, node: Reference, strict: bool = False, ontology_prefix: str, upgrade: bool, ) -> Synonym | None: # TODO check if the synonym is written like a CURIE... it shouldn't but I've seen it happen try: name, rest = _quote_split(s) except ValueError: logger.warning("[%s] invalid synonym: %s", node.curie, s) return None specificity, rest = _chomp_specificity(rest) synonym_typedef, rest = _chomp_typedef( rest, synonym_typedefs=synonym_typedefs, strict=strict, node=node, ontology_prefix=ontology_prefix, upgrade=upgrade, ) provenance, rest = _chomp_references( rest, strict=strict, node=node, ontology_prefix=ontology_prefix, line=s, ) annotations = _chomp_axioms(rest, node=node, strict=strict) return Synonym( name=name, specificity=specificity, type=synonym_typedef.reference if synonym_typedef else None, provenance=list(provenance or []), annotations=annotations, ) #: A counter for errors in parsing provenance DEFINITION_PROVENANCE_COUNTER: Counter[tuple[str, str]] = Counter() def iterate_node_synonyms( data: dict[str, Any], synonym_typedefs: Mapping[ReferenceTuple, SynonymTypeDef], *, node: Reference, strict: bool = False, ontology_prefix: str, upgrade: bool, ) -> Iterable[Synonym]: """Extract synonyms from a :mod:`obonet` node's data. Example strings - "LTEC I" EXACT [Orphanet:93938,DOI:xxxx] - "LTEC I" EXACT [Orphanet:93938] - "LTEC I" [Orphanet:93938] - "LTEC I" [] """ for s in data.get("synonym", []): s = _extract_synonym( s, synonym_typedefs, node=node, strict=strict, ontology_prefix=ontology_prefix, upgrade=upgrade, ) if s is not None: yield s def iterate_node_properties( data: dict[str, Any], *, node: Reference, strict: bool = False, ontology_prefix: str, upgrade: bool, context: str, ) -> Iterable[Annotation]: """Extract properties from a :mod:`obonet` node's data.""" for prop_value_type in data.get("property_value", []): if yv := _handle_prop( prop_value_type, node=node, strict=strict, ontology_prefix=ontology_prefix, upgrade=upgrade, context=context, ): yield yv #: Keep track of property-value pairs for which the value couldn't be parsed, #: such as `dc:conformsTo autoimmune:inflammation.yaml` in MONDO UNHANDLED_PROP_OBJECTS: Counter[tuple[str, str]] = Counter() UNHANDLED_PROPS: Counter[tuple[str, str]] = Counter() def _handle_prop( prop_value_type: str, *, node: Reference, strict: bool = False, ontology_prefix: str, upgrade: bool, context: str | None, ) -> Annotation | None: try: prop, value_type = prop_value_type.split(" ", 1) except ValueError: logger.warning("[%s] property_value is missing a space: %s", node.curie, prop_value_type) return None prop_reference = _get_prop( prop, node=node, strict=strict, ontology_prefix=ontology_prefix, upgrade=upgrade, line=prop_value_type, counter=UNHANDLED_PROPS, context=context, ) if prop_reference is None: return None value_type = value_type.strip() datatype: Reference | None if " " not in value_type: value, datatype = value_type, None else: value, datatype_raw = (s.strip() for s in value_type.rsplit(" ", 1)) match _parse_str_or_curie_or_uri_helper( datatype_raw, ontology_prefix=ontology_prefix, node=node, predicate=prop_reference, line=prop_value_type, upgrade=upgrade, context="property datatype", ): case Reference() as datatype_: datatype = datatype_ case BlocklistError(): return None case ParseError() as exc: if strict: raise exc else: logger.warning(str(exc)) return None # if it's an empty string, like the ones removed in https://github.com/oborel/obo-relations/pull/830, # just quit if value == '""': return None quoted = value.startswith('"') and value.endswith('"') value = value.strip('"').strip() # first, special case datetimes. Whether it's quoted or not, # we always deal with this first if datatype and datatype.curie == "xsd:dateTime": try: obo_literal = OBOLiteral.datetime(value) except ValueError: logger.warning( "[%s - %s] could not parse date: %s", node.curie, prop_reference.curie, value ) return None else: return Annotation(prop_reference, obo_literal) if datatype and datatype.curie == "xsd:anyURI": match _parse_str_or_curie_or_uri_helper( value, node=node, predicate=prop_reference, ontology_prefix=ontology_prefix, line=prop_value_type, upgrade=upgrade, context="property object", ): case Reference() as obj_reference: return Annotation(prop_reference, obj_reference) case BlocklistError(): return None case UnparsableIRIError(): return Annotation(prop_reference, OBOLiteral.uri(value)) case ParseError() as exc: if strict: raise exc else: logger.warning(str(exc)) return None # if it's quoted and there's a data try parsing as a CURIE/URI anyway (this is a bit # aggressive, but more useful than spec). if quoted: # give a try parsing it anyway, just in case ;) match _parse_str_or_curie_or_uri_helper( value, ontology_prefix=ontology_prefix, node=node, line=prop_value_type, upgrade=upgrade, predicate=prop_reference, context="property object", ): case Reference() as obj_reference: return Annotation(prop_reference, obj_reference) case BlocklistError(): return None case ParseError(): if datatype: return Annotation(prop_reference, OBOLiteral(value, datatype, None)) else: return Annotation(prop_reference, OBOLiteral.string(value)) else: if datatype: logger.debug( "[%s] throwing away datatype since no quotes were used: %s", node.curie, value_type ) # if it wasn't quoted and there was no datatype, go for parsing as an object match _obo_parse_identifier( value, strict=strict, ontology_prefix=ontology_prefix, node=node, predicate=prop_reference, line=prop_value_type, context="property object", counter=UNHANDLED_PROP_OBJECTS, ): case Reference() as obj_reference: return Annotation(prop_reference, obj_reference) case None: return None return None def _get_prop( property_id: str, *, node: Reference, strict: bool, ontology_prefix: str, upgrade: bool, line: str, counter: Counter[tuple[str, str]] | None = None, context: str | None = None, ) -> Reference | None: if rv := _parse_default_prop(property_id, ontology_prefix): return rv return _obo_parse_identifier( property_id, strict=strict, node=node, ontology_prefix=ontology_prefix, upgrade=upgrade, counter=counter, context=context, line=line, ) def _parse_default_prop(property_id: str, ontology_prefix: str) -> Reference | None: for delim in "#/": sw = f"http://purl.obolibrary.org/obo/{ontology_prefix}{delim}" if property_id.startswith(sw): identifier = property_id.removeprefix(sw) return default_reference(ontology_prefix, identifier) return None def iterate_node_reference_tag( stanza: Stanza, tag: str, data: dict[str, Any], *, node: Reference, strict: bool = False, ontology_prefix: str, upgrade: bool = True, counter: Counter[tuple[str, str]] | None = None, ) -> Iterable[Reference]: """Extract a list of CURIEs from the data.""" for str_or_curie_or_uri in data.get(tag, []): reference = _obo_parse_identifier( str_or_curie_or_uri, strict=strict, node=node, ontology_prefix=ontology_prefix, upgrade=upgrade, counter=counter, ) if reference is not None: yield reference elif tag == "subset": # this is to avoid the millions of 2:STAR and 3:STAR errors when parsing ChEBI that makes # it take forever. In general, most of the subset identifiers are totally borked. if str_or_curie_or_uri not in SUBSET_INVALIDS: logger.warning( "[%s] %s - could not parse subset identifier: %s", stanza.curie, tag, str_or_curie_or_uri, ) SUBSET_INVALIDS.add(str_or_curie_or_uri) else: logger.warning( "[%s] %s - could not parse identifier: %s", stanza.curie, tag, str_or_curie_or_uri ) SUBSET_INVALIDS: set[str] = set() def _process_intersection_of( term: Stanza, data: dict[str, Any], *, strict: bool = False, ontology_prefix: str, upgrade: bool = True, ) -> None: """Extract a list of CURIEs from the data.""" for line in data.get("intersection_of", []): predicate_id, _, target_id = line.partition(" ") predicate = _obo_parse_identifier( predicate_id, strict=strict, node=term.reference, ontology_prefix=ontology_prefix, upgrade=upgrade, ) if predicate is None: logger.warning("[%s] - could not parse intersection_of: %s", ontology_prefix, line) continue if target_id: # this means that there's a second part, so let's try parsing it target = _obo_parse_identifier( target_id, strict=strict, node=term.reference, predicate=predicate, ontology_prefix=ontology_prefix, upgrade=upgrade, ) if target is None: logger.warning( "[%s] could not parse intersection_of target: %s", ontology_prefix, line ) continue term.append_intersection_of(predicate, target) else: term.append_intersection_of(predicate) def iterate_node_relationships( data: dict[str, Any], *, node: Reference, strict: bool = False, ontology_prefix: str, upgrade: bool, ) -> Iterable[tuple[Reference, Reference]]: """Extract relationships from a :mod:`obonet` node's data.""" for line in data.get("relationship", []): relation_curie, target_curie = line.split(" ") predicate = _obo_parse_identifier( relation_curie, strict=strict, ontology_prefix=ontology_prefix, node=node, upgrade=upgrade, line=line, context="relationship predicate", ) match predicate: # TODO extend with other exception handling case None: logger.warning("[%s] could not parse relation %s", node.curie, relation_curie) continue match _parse_str_or_curie_or_uri_helper( target_curie, ontology_prefix=ontology_prefix, node=node, predicate=predicate, line=line, context="relationship target", upgrade=upgrade, ): case Reference() as target: yield predicate, target case ParseError() as exc: if strict: raise exc else: logger.warning(str(exc)) def iterate_node_xrefs( *, data: dict[str, Any], strict: bool = False, ontology_prefix: str, node: Reference, upgrade: bool, ) -> Iterable[tuple[Reference, list[Reference | OBOLiteral]]]: """Extract xrefs from a :mod:`obonet` node's data.""" for line in data.get("xref", []): line = line.strip() if pair := _parse_xref_line( line.strip(), strict=strict, node=node, ontology_prefix=ontology_prefix, upgrade=upgrade, ): yield pair def _parse_xref_line( line: str, *, strict: bool = False, ontology_prefix: str, node: Reference, upgrade: bool ) -> tuple[Reference, list[Reference | OBOLiteral]] | None: xref, _, rest = line.partition(" [") rules = get_rules() if rules.str_is_blocked(xref, context=ontology_prefix) or ":" not in xref: return None # sometimes xref to self... weird xref = rules.remap_prefix(xref, context=ontology_prefix) split_space = " " in xref if split_space: _xref_split = xref.split(" ", 1) if _xref_split[1][0] not in {'"', "("}: logger.debug("[%s] Problem with space in xref %s", node.curie, xref) return None xref = _xref_split[0] xref_ref = _parse_str_or_curie_or_uri_helper( xref, ontology_prefix=ontology_prefix, node=node, line=line, context="xref", upgrade=upgrade ) match xref_ref: case BlocklistError(): return None case ParseError() as exc: if strict: raise exc else: if not XREF_PROVENANCE_COUNTER[ontology_prefix, xref]: logger.warning(str(exc)) XREF_PROVENANCE_COUNTER[ontology_prefix, xref] += 1 return None if rest: rest_front, _, _rest_rest = rest.partition("]") provenance = _parse_provenance_list( rest_front, node=node, ontology_prefix=ontology_prefix, counter=XREF_PROVENANCE_COUNTER, scope_text="xref provenance", line=line, strict=strict, ) else: provenance = [] return xref_ref, provenance XREF_PROVENANCE_COUNTER: Counter[tuple[str, str]] = Counter()