"""Utilities for OBO files."""
from __future__ import annotations
import datetime
import json
import logging
import pathlib
import subprocess
import time
import typing
import urllib.error
import zipfile
from collections import Counter
from collections.abc import Callable, Iterable, Mapping, Sequence
from pathlib import Path
from textwrap import indent
from typing import Any, TypeVar
import bioregistry
import click
import pystow.utils
from tabulate import tabulate
from tqdm.auto import tqdm
from typing_extensions import Unpack
from .constants import (
BUILD_SUBDIRECTORY_NAME,
DATABASE_DIRECTORY,
GetOntologyKwargs,
IterHelperHelperDict,
SlimGetOntologyKwargs,
)
from .identifier_utils import ParseError, wrap_norm_prefix
from .plugins import has_nomenclature_plugin, run_nomenclature_plugin
from .struct import Obo
from .struct.obo import from_obo_path, from_obonet
from .utils.io import safe_open_writer
from .utils.path import ensure_path, prefix_directory_join
from .version import get_git_hash, get_version
__all__ = [
"NoBuildError",
"get_ontology",
]
logger = logging.getLogger(__name__)
class NoBuildError(RuntimeError):
"""Base exception for being unable to build."""
class UnhandledFormatError(NoBuildError):
"""Only OWL is available."""
#: The following prefixes can not be loaded through ROBOT without
#: turning off integrity checks
REQUIRES_NO_ROBOT_CHECK = {
"clo",
"vo",
"orphanet.ordo",
"orphanet",
"foodon",
"caloha",
}
[docs]
@wrap_norm_prefix
def get_ontology(
prefix: str,
*,
force: bool = False,
force_process: bool = False,
strict: bool = False,
version: str | None = None,
robot_check: bool = True,
upgrade: bool = True,
cache: bool = True,
use_tqdm: bool = True,
) -> Obo:
"""Get the OBO for a given graph.
:param prefix: The prefix of the ontology to look up
:param version: The pre-looked-up version of the ontology
:param force: Download the data again
:param force_process: Should the OBO cache be rewritten? Automatically set to true
if ``force`` is true
:param strict: Should CURIEs be treated strictly? If true, raises exceptions on
invalid/malformed
:param robot_check: If set to false, will send the ``--check=false`` command to
ROBOT to disregard malformed ontology components. Necessary to load some
ontologies like VO.
:param upgrade: If set to true, will automatically upgrade relationships, such as
``obo:chebi#part_of`` to ``BFO:0000051``
:param cache: Should cached objects be written? defaults to True
:returns: An OBO object
:raises OnlyOWLError: If the OBO foundry only has an OWL document for this resource.
Alternate usage if you have a custom url
.. code-block:: python
from pystow.utils import download
from pyobo import Obo, from_obo_path
url = ...
obo_path = ...
download(url=url, path=path)
obo = from_obo_path(path)
"""
if force:
force_process = True
if prefix == "uberon":
logger.info("UBERON has so much garbage in it that defaulting to non-strict parsing")
strict = False
if force_process:
obonet_json_gz_path = None
elif not cache:
logger.debug("[%s] caching was turned off, so dont look for an obonet file", prefix)
obonet_json_gz_path = None
else:
obonet_json_gz_path = prefix_directory_join(
prefix, BUILD_SUBDIRECTORY_NAME, name=f"{prefix}.obonet.json.gz", version=version
)
logger.debug(
"[%s] caching is turned on, so look for an obonet file at %s",
prefix,
obonet_json_gz_path,
)
if obonet_json_gz_path.is_file() and not force:
from .utils.cache import get_gzipped_graph
logger.debug("[%s] using obonet cache at %s", prefix, obonet_json_gz_path)
return from_obonet(
get_gzipped_graph(obonet_json_gz_path),
strict=strict,
version=version,
upgrade=upgrade,
use_tqdm=use_tqdm,
)
else:
logger.debug("[%s] no obonet cache found at %s", prefix, obonet_json_gz_path)
if has_nomenclature_plugin(prefix):
obo = run_nomenclature_plugin(prefix, version=version)
if cache:
logger.debug("[%s] caching nomenclature plugin", prefix)
obo.write_default(force=force_process)
return obo
ontology_format, path = _ensure_ontology_path(prefix, force=force, version=version)
if path is None:
raise NoBuildError(prefix)
elif ontology_format == "obo":
pass # all gucci
elif ontology_format == "owl":
import bioontologies.robot
_converted_obo_path = path.with_suffix(".obo")
if prefix in REQUIRES_NO_ROBOT_CHECK:
robot_check = False
bioontologies.robot.convert(path, _converted_obo_path, check=robot_check)
path = _converted_obo_path
elif ontology_format == "json":
from .struct.obograph import read_obograph
obo = read_obograph(prefix=prefix, path=path)
if cache:
obo.write_default(force=force_process)
return obo
else:
raise UnhandledFormatError(f"[{prefix}] unhandled ontology file format: {path.suffix}")
obo = from_obo_path(
path,
prefix=prefix,
strict=strict,
version=version,
upgrade=upgrade,
use_tqdm=use_tqdm,
_cache_path=obonet_json_gz_path,
)
if cache:
obo.write_default(force=force_process)
return obo
def _ensure_ontology_path(
prefix: str, force: bool, version: str | None
) -> tuple[str, Path] | tuple[None, None]:
for ontology_format, url in [
("obo", bioregistry.get_obo_download(prefix)),
("owl", bioregistry.get_owl_download(prefix)),
("json", bioregistry.get_json_download(prefix)),
]:
if url is not None:
try:
path = ensure_path(prefix, url=url, force=force, version=version)
except (urllib.error.HTTPError, pystow.utils.DownloadError):
continue
else:
return ontology_format, path
return None, None
SKIP = {
"ncbigene": "too big, refs acquired from other dbs",
"pubchem.compound": "top big, can't deal with this now",
"gaz": "Gazetteer is irrelevant for biology",
"ma": "yanked",
"bila": "yanked",
# Can't download",
"afpo": "unable to download",
"atol": "unable to download",
"eol": "unable to download, same source as atol",
"hog": "unable to download",
"vhog": "unable to download",
"gorel": "unable to download",
"dinto": "unable to download",
"gainesville.core": "unable to download",
"ato": "can't process",
"emapa": "recently changed with EMAP... not sure what the difference is anymore",
"kegg.genes": "needs fix", # FIXME
"kegg.genome": "needs fix", # FIXME
"kegg.pathway": "needs fix", # FIXME
"ensemblglossary": "URI is self-referential to data in OLS, extract from there",
"epio": "content from fraunhofer is unreliable",
"epso": "content from fraunhofer is unreliable",
"gwascentral.phenotype": "website is down? or API changed?", # FIXME
"gwascentral.study": "website is down? or API changed?", # FIXME
"snomedct": "dead source",
}
X = TypeVar("X")
def iter_helper(
f: Callable[[str, Unpack[GetOntologyKwargs]], Mapping[str, X]],
leave: bool = False,
**kwargs: Unpack[IterHelperHelperDict],
) -> Iterable[tuple[str, str, X]]:
"""Yield all mappings extracted from each database given."""
for prefix, mapping in iter_helper_helper(f, **kwargs):
it = tqdm(
mapping.items(),
desc=f"iterating {prefix}",
leave=leave,
unit_scale=True,
disable=None,
)
for key, value in it:
if isinstance(value, str):
value = value.strip('"').replace("\n", " ").replace("\t", " ").replace(" ", " ")
# TODO deal with when this is not a string?
if value:
yield prefix, key, value
def _prefixes(
skip_below: str | None = None,
skip_below_inclusive: bool = True,
skip_pyobo: bool = False,
skip_set: set[str] | None = None,
) -> Iterable[str]:
for prefix, resource in sorted(bioregistry.read_registry().items()):
if resource.no_own_terms:
continue
if prefix in SKIP:
tqdm.write(f"skipping {prefix} because {SKIP[prefix]}")
continue
if skip_set and prefix in skip_set:
tqdm.write(f"skipping {prefix} because in skip set")
continue
if skip_below is not None:
if skip_below_inclusive:
if prefix < skip_below:
continue
else:
if prefix <= skip_below:
continue
has_pyobo = has_nomenclature_plugin(prefix)
has_download = resource.has_download()
if skip_pyobo and has_pyobo:
continue
if not has_pyobo and not has_download:
continue
yield prefix
def iter_helper_helper(
f: Callable[[str, Unpack[GetOntologyKwargs]], X],
use_tqdm: bool = True,
skip_below: str | None = None,
skip_pyobo: bool = False,
skip_set: set[str] | None = None,
**kwargs: Unpack[SlimGetOntologyKwargs],
) -> Iterable[tuple[str, X]]:
"""Yield all mappings extracted from each database given.
:param f: A function that takes a prefix and gives back something that will be used
by an outer function.
:param use_tqdm: If true, use the tqdm progress bar
:param skip_below: If true, skip sources whose names are less than this (used for
iterative curation
:param skip_pyobo: If true, skip sources implemented in PyOBO
:param skip_set: A pre-defined blacklist to skip
:param strict: If true, will raise exceptions and crash the program instead of
logging them.
:param kwargs: Keyword arguments passed to ``f``.
:raises TypeError: If a type error is raised, it gets re-raised
:raises urllib.error.HTTPError: If the resource could not be downloaded
:raises urllib.error.URLError: If another problem was encountered during download
:raises ValueError: If the data was not in the format that was expected (e.g., OWL)
:yields: A prefix and the result of the callable ``f``
"""
strict = kwargs.get("strict", True)
prefixes = list(
_prefixes(
skip_set=skip_set,
skip_below=skip_below,
skip_pyobo=skip_pyobo,
)
)
prefix_it = tqdm(
prefixes, disable=not use_tqdm, desc=f"Building with {f.__name__}()", unit="resource"
)
for prefix in prefix_it:
prefix_it.set_postfix(prefix=prefix)
tqdm.write(
click.style(f"\n{prefix} - {bioregistry.get_name(prefix)}", fg="green", bold=True)
)
try:
yv = f(prefix, **kwargs) # type:ignore
except (UnhandledFormatError, NoBuildError) as e:
# make sure this comes before the other runtimeerror catch
logger.warning("[%s] %s", prefix, e)
except urllib.error.HTTPError as e:
logger.warning("[%s] HTTP %s: unable to download %s", prefix, e.getcode(), e.geturl())
if strict and not bioregistry.is_deprecated(prefix):
raise
except urllib.error.URLError as e:
logger.warning("[%s] unable to download - %s", prefix, e.reason)
if strict and not bioregistry.is_deprecated(prefix):
raise
except ParseError as e:
if not e.node:
logger.warning("[%s] %s", prefix, e)
else:
logger.warning(str(e))
if strict and not bioregistry.is_deprecated(prefix):
raise e
except RuntimeError as e:
if "DrugBank" not in str(e):
raise
logger.warning("[drugbank] invalid credentials")
except subprocess.CalledProcessError:
logger.warning("[%s] ROBOT was unable to convert OWL to OBO", prefix)
except ValueError as e:
if _is_xml(e):
# this means that it tried doing parsing on an xml page
logger.warning(
"no resource available for %s. See http://www.obofoundry.org/ontology/%s",
prefix,
prefix,
)
else:
logger.exception(
"[%s] got exception %s while parsing", prefix, e.__class__.__name__
)
except zipfile.BadZipFile as e:
# This can happen if there's an error on UMLS
logger.exception("[%s] got exception %s while parsing", prefix, e.__class__.__name__)
except TypeError as e:
logger.exception("[%s] got exception %s while parsing", prefix, e.__class__.__name__)
if strict:
raise e
else:
yield prefix, yv
def _is_xml(e) -> bool:
return str(e).startswith("Tag-value pair parsing failed for:") or str(e).startswith(
'Tag-value pair parsing failed for:\n<?xml version="1.0" encoding="UTF-8"?>'
)
def _prep_dir(directory: None | str | pathlib.Path) -> pathlib.Path:
if directory is None:
rv = DATABASE_DIRECTORY
elif isinstance(directory, str):
rv = pathlib.Path(directory)
elif isinstance(directory, pathlib.Path):
rv = directory
else:
raise TypeError
rv.mkdir(parents=True, exist_ok=True)
return rv
def db_output_helper(
it: Iterable[tuple[Any, ...]],
db_name: str,
columns: Sequence[str],
*,
directory: None | str | pathlib.Path = None,
strict: bool = False,
use_gzip: bool = True,
summary_detailed: Sequence[int] | None = None,
) -> list[pathlib.Path]:
"""Help output database builds.
:param f: A function that takes a prefix and gives back something that will be used
by an outer function.
:param db_name: name of the output resource (e.g., "alts", "names")
:param columns: The names of the columns
:param directory: The directory to output everything, or defaults to
:data:`pyobo.constants.DATABASE_DIRECTORY`.
:param strict: Passed to ``f`` by keyword
:returns: A sequence of paths that got created.
"""
start = time.time()
directory = _prep_dir(directory)
c: typing.Counter[str] = Counter()
c_detailed: typing.Counter[tuple[str, ...]] = Counter()
if use_gzip:
db_path = directory.joinpath(f"{db_name}.tsv.gz")
else:
db_path = directory.joinpath(f"{db_name}.tsv")
db_sample_path = directory.joinpath(f"{db_name}_sample.tsv")
db_summary_path = directory.joinpath(f"{db_name}_summary.tsv")
db_summary_detailed_path = directory.joinpath(f"{db_name}_summary_detailed.tsv")
db_metadata_path = directory.joinpath(f"{db_name}_metadata.json")
rv: list[tuple[str, pathlib.Path]] = [
("Metadata", db_metadata_path),
("Data", db_path),
("Sample", db_sample_path),
("Summary", db_summary_path),
]
logger.info("writing %s to %s", db_name, db_path)
logger.info("writing %s sample to %s", db_name, db_sample_path)
sample_rows = []
with safe_open_writer(db_path) as writer:
# for the first 10 rows, put it in a sample file too
with safe_open_writer(db_sample_path) as sample_writer:
# write header
writer.writerow(columns)
sample_writer.writerow(columns)
for row, _ in zip(it, range(10), strict=False):
c[row[0]] += 1
if summary_detailed is not None:
c_detailed[tuple(row[i] for i in summary_detailed)] += 1
writer.writerow(row)
sample_writer.writerow(row)
sample_rows.append(row)
# continue just in the gzipped one
for row in it:
c[row[0]] += 1
if summary_detailed is not None:
c_detailed[tuple(row[i] for i in summary_detailed)] += 1
writer.writerow(row)
with safe_open_writer(db_summary_path) as summary_writer:
summary_writer.writerows(c.most_common())
if summary_detailed is not None:
logger.info(f"writing {db_name} detailed summary to {db_summary_detailed_path}")
with safe_open_writer(db_summary_detailed_path) as detailed_summary_writer:
detailed_summary_writer.writerows((*keys, v) for keys, v in c_detailed.most_common())
rv.append(("Summary (Detailed)", db_summary_detailed_path))
with open(db_metadata_path, "w") as file:
json.dump(
{
"version": get_version(),
"git_hash": get_git_hash(),
"date": datetime.datetime.now().strftime("%Y-%m-%d-%H-%M"),
"count": sum(c.values()),
},
file,
indent=2,
)
elapsed = time.time() - start
click.secho(f"\nWrote the following files in {elapsed:.1f} seconds\n", fg="green")
click.secho(indent(tabulate(rv), " "), fg="green")
click.secho("\nSample rows:\n", fg="green")
click.secho(indent(tabulate(sample_rows, headers=columns), " "), fg="green")
click.echo()
return [path for _, path in rv]