# -*- coding: utf-8 -*-
"""Utilities for OBO files."""
import datetime
import gzip
import json
import logging
import pathlib
import subprocess
import typing
import urllib.error
from collections import Counter
from typing import (
Callable,
Iterable,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
TypeVar,
Union,
)
import bioregistry
from tqdm.auto import tqdm
from .constants import DATABASE_DIRECTORY
from .identifier_utils import MissingPrefix, wrap_norm_prefix
from .plugins import has_nomenclature_plugin, run_nomenclature_plugin
from .struct import Obo
from .utils.io import get_writer
from .utils.path import ensure_path, prefix_directory_join
from .version import get_git_hash, get_version
__all__ = [
"get_ontology",
"NoBuild",
]
logger = logging.getLogger(__name__)
class NoBuild(RuntimeError):
"""Base exception for being unable to build."""
class UnhandledFormat(NoBuild):
"""Only OWL is available."""
[docs]@wrap_norm_prefix
def get_ontology(
prefix: str,
*,
force: bool = False,
rewrite: bool = False,
strict: bool = True,
version: Optional[str] = None,
) -> Obo:
"""Get the OBO for a given graph.
:param prefix: The prefix of the ontology to look up
:param version: The pre-looked-up version of the ontology
:param force: Download the data again
:param rewrite: Should the OBO cache be rewritten? Automatically set to true if ``force`` is true
:param strict: Should CURIEs be treated strictly? If true, raises exceptions on invalid/malformed
:returns: An OBO object
:raises OnlyOWLError: If the OBO foundry only has an OWL document for this resource.
Alternate usage if you have a custom url::
>>> from pystow.utils import download
>>> from pyobo import Obo, from_obo_path
>>> url = ...
>>> obo_path = ...
>>> download(url=url, path=path)
>>> obo = from_obo_path(path)
"""
if force:
rewrite = True
if prefix == "uberon":
logger.info("UBERON has so much garbage in it that defaulting to non-strict parsing")
strict = False
obonet_json_gz_path = prefix_directory_join(
prefix, name=f"{prefix}.obonet.json.gz", ensure_exists=False, version=version
)
if obonet_json_gz_path.exists() and not force:
from .reader import from_obonet
from .utils.cache import get_gzipped_graph
logger.debug("[%s] using obonet cache at %s", prefix, obonet_json_gz_path)
return from_obonet(get_gzipped_graph(obonet_json_gz_path))
if has_nomenclature_plugin(prefix):
obo = run_nomenclature_plugin(prefix)
logger.info("[%s] caching nomenclature plugin", prefix)
obo.write_default(force=rewrite)
return obo
logger.debug("[%s] no obonet cache found at %s", prefix, obonet_json_gz_path)
ontology_format, path = _ensure_ontology_path(prefix, force=force, version=version)
if path is None:
raise NoBuild
elif ontology_format == "obo":
pass # all gucci
elif ontology_format == "owl":
from bioontologies import robot
_converted_obo_path = path.with_suffix(".obo")
robot.convert(path, _converted_obo_path)
path = _converted_obo_path
else:
raise UnhandledFormat(f"[{prefix}] unhandled ontology file format: {path.suffix}")
from .reader import from_obo_path
obo = from_obo_path(path, prefix=prefix, strict=strict)
if version is not None:
if obo.data_version is None:
logger.warning("[%s] did not have a version, overriding with %s", obo.ontology, version)
obo.data_version = version
elif obo.data_version != version:
logger.warning(
"[%s] had version %s, overriding with %s", obo.ontology, obo.data_version, version
)
obo.data_version = version
obo.write_default(force=rewrite)
return obo
def _ensure_ontology_path(
prefix: str, force, version
) -> Union[Tuple[str, pathlib.Path], Tuple[None, None]]:
for ontology_format, url in [ # noqa:B007
("obo", bioregistry.get_obo_download(prefix)),
("owl", bioregistry.get_owl_download(prefix)),
("json", bioregistry.get_json_download(prefix)),
]:
if url is not None:
return ontology_format, pathlib.Path(
ensure_path(prefix, url=url, force=force, version=version)
)
return None, None
#: Obonet/Pronto can't parse these (consider converting to OBO with ROBOT?)
CANT_PARSE = {
"agro",
"aro",
"bco",
"caro",
"cco",
"chmo",
"cido",
"covoc",
"cto",
"cvdo",
"dicom",
"dinto",
"emap",
"epso",
"eupath",
"fbbi",
"fma",
"fobi",
"foodon",
"genepio",
"hancestro",
"hom",
"hso",
"htn", # Unknown string format: creation: 16MAY2017
"ico",
"idocovid19",
"labo",
"mamo",
"mfmo",
"mfo",
"mfomd",
"miapa",
"mo",
"oae",
"ogms", # Unknown string format: creation: 16MAY2017
"ohd",
"ons",
"oostt",
"opmi",
"ornaseq",
"orth",
"pdro",
"probonto",
"psdo",
"reo",
"rex",
"rnao",
"sepio",
"sio",
"spd",
"sweetrealm",
"txpo",
"vido",
"vt",
"xl",
}
SKIP = {
"ncbigene", # too big, refs acquired from other dbs
"pubchem.compound", # to big, can't deal with this now
"gaz", # Gazetteer is irrelevant for biology
"ma", # yanked
"bila", # yanked
# FIXME below
"emapa", # recently changed with EMAP... not sure what the difference is anymore
"kegg.genes",
"kegg.genome",
"kegg.pathway",
# URL is wrong
"ensemblglossary",
# Too much junk
"biolink",
}
X = TypeVar("X")
def iter_helper(
f: Callable[[str], Mapping[str, X]],
leave: bool = False,
strict: bool = True,
**kwargs,
) -> Iterable[Tuple[str, str, X]]:
"""Yield all mappings extracted from each database given."""
for prefix, mapping in iter_helper_helper(f, strict=strict, **kwargs):
it = tqdm(
mapping.items(),
desc=f"iterating {prefix}",
leave=leave,
unit_scale=True,
disable=None,
)
for key, value in it:
value = value.strip('"').replace("\n", " ").replace("\t", " ").replace(" ", " ")
if value:
yield prefix, key, value
def _prefixes(
skip_below: Optional[str] = None,
skip_below_inclusive: bool = True,
skip_pyobo: bool = False,
skip_set: Optional[Set[str]] = None,
) -> Iterable[str]:
for prefix, resource in sorted(bioregistry.read_registry().items()):
if resource.no_own_terms:
continue
if prefix in SKIP:
tqdm.write(f"skipping {prefix} because in default skip set")
continue
if skip_set and prefix in skip_set:
tqdm.write(f"skipping {prefix} because in skip set")
continue
if skip_below is not None:
if skip_below_inclusive:
if prefix < skip_below:
continue
else:
if prefix <= skip_below:
continue
has_pyobo = has_nomenclature_plugin(prefix)
has_download = resource.has_download()
if skip_pyobo and has_pyobo:
continue
if not has_pyobo and not has_download:
continue
yield prefix
def iter_helper_helper(
f: Callable[[str], X],
use_tqdm: bool = True,
skip_below: Optional[str] = None,
skip_below_inclusive: bool = True,
skip_pyobo: bool = False,
skip_set: Optional[Set[str]] = None,
strict: bool = True,
**kwargs,
) -> Iterable[Tuple[str, X]]:
"""Yield all mappings extracted from each database given.
:param f: A function that takes a prefix and gives back something that will be used by an outer function.
:param use_tqdm: If true, use the tqdm progress bar
:param skip_below: If true, skip sources whose names are less than this (used for iterative curation
:param skip_pyobo: If true, skip sources implemented in PyOBO
:param skip_set: A pre-defined blacklist to skip
:param strict: If true, will raise exceptions and crash the program instead of logging them.
:param kwargs: Keyword arguments passed to ``f``.
:yields: A prefix and the result of the callable ``f``
:raises TypeError: If a type error is raised, it gets re-raised
:raises urllib.error.HTTPError: If the resource could not be downloaded
:raises urllib.error.URLError: If another problem was encountered during download
:raises ValueError: If the data was not in the format that was expected (e.g., OWL)
"""
prefixes = list(
_prefixes(
skip_set=skip_set,
skip_below=skip_below,
skip_pyobo=skip_pyobo,
skip_below_inclusive=skip_below_inclusive,
)
)
prefix_it = tqdm(
prefixes, disable=not use_tqdm, desc=f"Building with {f.__name__}()", unit="resource"
)
for prefix in prefix_it:
prefix_it.set_postfix(prefix=prefix)
try:
yv = f(prefix, **kwargs) # type:ignore
except urllib.error.HTTPError as e:
logger.warning("[%s] HTTP %s: unable to download %s", prefix, e.getcode(), e.geturl())
if strict and not bioregistry.is_deprecated(prefix):
raise
except urllib.error.URLError:
logger.warning("[%s] unable to download", prefix)
if strict and not bioregistry.is_deprecated(prefix):
raise
except MissingPrefix as e:
logger.warning("[%s] missing prefix: %s", prefix, e)
if strict and not bioregistry.is_deprecated(prefix):
raise e
except subprocess.CalledProcessError:
logger.warning("[%s] ROBOT was unable to convert OWL to OBO", prefix)
except UnhandledFormat as e:
logger.warning("[%s] %s", prefix, e)
except ValueError as e:
if _is_xml(e):
# this means that it tried doing parsing on an xml page
logger.info(
"no resource available for %s. See http://www.obofoundry.org/ontology/%s",
prefix,
prefix,
)
else:
logger.exception(
"[%s] got exception %s while parsing", prefix, e.__class__.__name__
)
except TypeError as e:
logger.exception("[%s] got exception %s while parsing", prefix, e.__class__.__name__)
if strict:
raise e
else:
yield prefix, yv
def _is_xml(e) -> bool:
return str(e).startswith("Tag-value pair parsing failed for:") or str(e).startswith(
'Tag-value pair parsing failed for:\n<?xml version="1.0" encoding="UTF-8"?>'
)
def _prep_dir(directory: Union[None, str, pathlib.Path]) -> pathlib.Path:
if directory is None:
rv = DATABASE_DIRECTORY
elif isinstance(directory, str):
rv = pathlib.Path(directory)
elif isinstance(directory, pathlib.Path):
rv = directory
else:
raise TypeError
rv.mkdir(parents=True, exist_ok=True)
return rv
def db_output_helper(
f: Callable[..., Iterable[Tuple[str, ...]]],
db_name: str,
columns: Sequence[str],
*,
directory: Union[None, str, pathlib.Path] = None,
strict: bool = True,
use_gzip: bool = True,
summary_detailed: Optional[Sequence[int]] = None,
**kwargs,
) -> List[pathlib.Path]:
"""Help output database builds.
:param f: A function that takes a prefix and gives back something that will be used by an outer function.
:param db_name: name of the output resource (e.g., "alts", "names")
:param columns: The names of the columns
:param directory: The directory to output everything, or defaults to :data:`pyobo.constants.DATABASE_DIRECTORY`.
:param strict: Passed to ``f`` by keyword
:param kwargs: Passed to ``f`` by splat
:returns: A sequence of paths that got created.
"""
directory = _prep_dir(directory)
c: typing.Counter[str] = Counter()
c_detailed: typing.Counter[Tuple[str, ...]] = Counter()
if use_gzip:
db_path = directory.joinpath(f"{db_name}.tsv.gz")
else:
db_path = directory.joinpath(f"{db_name}.tsv")
db_sample_path = directory.joinpath(f"{db_name}_sample.tsv")
db_summary_path = directory.joinpath(f"{db_name}_summary.tsv")
db_summary_detailed_path = directory.joinpath(f"{db_name}_summary_detailed.tsv")
logger.info("writing %s to %s", db_name, db_path)
logger.info("writing %s sample to %s", db_name, db_sample_path)
it = f(strict=strict, **kwargs)
with gzip.open(db_path, mode="wt") if use_gzip else open(db_path, "w") as gzipped_file:
writer = get_writer(gzipped_file)
# for the first 10 rows, put it in a sample file too
with open(db_sample_path, "w") as sample_file:
sample_writer = get_writer(sample_file)
# write header
writer.writerow(columns)
sample_writer.writerow(columns)
for row, _ in zip(it, range(10)):
c[row[0]] += 1
if summary_detailed is not None:
c_detailed[tuple(row[i] for i in summary_detailed)] += 1
writer.writerow(row)
sample_writer.writerow(row)
# continue just in the gzipped one
for row in it:
c[row[0]] += 1
if summary_detailed is not None:
c_detailed[tuple(row[i] for i in summary_detailed)] += 1
writer.writerow(row)
logger.info(f"writing {db_name} summary to {db_summary_path}")
with open(db_summary_path, "w") as file:
writer = get_writer(file)
writer.writerows(c.most_common())
if summary_detailed is not None:
logger.info(f"writing {db_name} detailed summary to {db_summary_detailed_path}")
with open(db_summary_detailed_path, "w") as file:
writer = get_writer(file)
writer.writerows((*keys, v) for keys, v in c_detailed.most_common())
db_metadata_path = directory.joinpath(f"{db_name}_metadata.json")
with open(db_metadata_path, "w") as file:
json.dump(
{
"version": get_version(),
"git_hash": get_git_hash(),
"date": datetime.datetime.now().strftime("%Y-%m-%d-%H-%M"),
"count": sum(c.values()),
},
file,
indent=2,
)
rv: List[pathlib.Path] = [
db_metadata_path,
db_path,
db_sample_path,
db_summary_path,
]
if summary_detailed:
rv.append(db_summary_detailed_path)
return rv