# -*- coding: utf-8 -*-
"""Data structures for OBO."""
import gzip
import json
import logging
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime
from operator import attrgetter
from pathlib import Path
from textwrap import dedent
from typing import (
Any,
ClassVar,
Collection,
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Sequence,
Set,
TextIO,
Tuple,
Union,
)
import bioregistry
import click
import networkx as nx
import pandas as pd
from more_click import force_option, verbose_option
from networkx.utils import open_file
from tqdm.auto import tqdm
from typing_extensions import Literal
from .reference import Reference, Referenced
from .typedef import (
RelationHint,
TypeDef,
default_typedefs,
from_species,
get_reference_tuple,
has_part,
is_a,
orthologous,
part_of,
)
from .utils import comma_separate, obo_escape_slim
from ..constants import (
DATE_FORMAT,
NCBITAXON_PREFIX,
RELATION_ID,
RELATION_PREFIX,
TARGET_ID,
TARGET_PREFIX,
)
from ..identifier_utils import normalize_curie
from ..utils.io import multidict, write_iterable_tsv
from ..utils.misc import obo_to_obograph, obo_to_owl
from ..utils.path import get_prefix_obo_path, prefix_directory_join
__all__ = [
"Synonym",
"SynonymTypeDef",
"SynonymSpecificity",
"SynonymSpecificities",
"Term",
"Obo",
"make_ad_hoc_ontology",
]
logger = logging.getLogger(__name__)
SynonymSpecificity = Literal["EXACT", "NARROW", "BROAD", "RELATED"]
SynonymSpecificities: Sequence[SynonymSpecificity] = ("EXACT", "NARROW", "BROAD", "RELATED")
[docs]@dataclass
class Synonym:
"""A synonym with optional specificity and references."""
#: The string representing the synonym
name: str
#: The specificity of the synonym
specificity: SynonymSpecificity = "EXACT"
#: The type of synonym. Must be defined in OBO document!
type: Optional["SynonymTypeDef"] = None
#: References to articles where the synonym appears
provenance: List[Reference] = field(default_factory=list)
[docs] def to_obo(self) -> str:
"""Write this synonym as an OBO line to appear in a [Term] stanza."""
return f"synonym: {self._fp()}"
def _fp(self) -> str:
x = f'"{self._escape(self.name)}" {self.specificity}'
if self.type:
x = f"{x} {self.type.id}"
return f"{x} [{comma_separate(self.provenance)}]"
@staticmethod
def _escape(s: str) -> str:
return s.replace('"', '\\"')
[docs]@dataclass
class SynonymTypeDef:
"""A type definition for synonyms in OBO."""
id: str
name: str
specificity: Optional[SynonymSpecificity] = None
[docs] def to_obo(self) -> str:
"""Serialize to OBO."""
if self.specificity:
return f'synonymtypedef: {self.id} "{self.name}" {self.specificity}'
else:
return f'synonymtypedef: {self.id} "{self.name}"'
[docs] @classmethod
def from_text(
cls, text: str, specificity: Optional[SynonymSpecificity] = None
) -> "SynonymTypeDef":
"""Get a type definition from text that's normalized."""
return cls(
id=text.lower()
.replace("-", "_")
.replace(" ", "_")
.replace('"', "")
.replace(")", "")
.replace("(", ""),
name=text.replace('"', ""),
specificity=specificity,
)
ReferenceHint = Union[Reference, "Term", Tuple[str, str], str]
def _ensure_ref(reference: ReferenceHint) -> Reference:
if reference is None:
raise ValueError("can not append null reference")
if isinstance(reference, Term):
return reference.reference
if isinstance(reference, str):
_rv = Reference.from_curie(reference)
if _rv is None:
raise ValueError
return _rv
if isinstance(reference, tuple):
return Reference(*reference)
if isinstance(reference, Reference):
return reference
raise TypeError
[docs]@dataclass
class Term(Referenced):
"""A term in OBO."""
#: The primary reference for the entity
reference: Reference
#: A description of the entity
definition: Optional[str] = None
#: References to articles in which the term appears
provenance: List[Reference] = field(default_factory=list)
#: Relationships defined by [Typedef] stanzas
relationships: Dict[TypeDef, List[Reference]] = field(default_factory=lambda: defaultdict(list))
#: Properties, which are not defined with Typedef and have scalar values instead of references.
properties: Dict[str, List[str]] = field(default_factory=lambda: defaultdict(list))
#: Relationships with the default "is_a"
parents: List[Reference] = field(default_factory=list)
#: Synonyms of this term
synonyms: List[Synonym] = field(default_factory=list)
#: Equivalent references
xrefs: List[Reference] = field(default_factory=list)
xref_types: List[Reference] = field(default_factory=list)
#: Alternate Identifiers
alt_ids: List[Reference] = field(default_factory=list)
#: The sub-namespace within the ontology
namespace: Optional[str] = None
#: An annotation for obsolescence. By default, is None, but this means that it is not obsolete.
is_obsolete: Optional[bool] = None
def __hash__(self): # noqa: D105
return hash((self.__class__, self.prefix, self.identifier))
[docs] @classmethod
def from_triple(
cls,
prefix: str,
identifier: str,
name: Optional[str] = None,
definition: Optional[str] = None,
) -> "Term":
"""Create a term from a reference."""
return cls(
reference=Reference(prefix=prefix, identifier=identifier, name=name),
definition=definition,
)
[docs] @classmethod
def auto(
cls,
prefix: str,
identifier: str,
) -> "Term":
"""Create a term from a reference."""
from ..api import get_definition
return cls(
reference=Reference.auto(prefix=prefix, identifier=identifier),
definition=get_definition(prefix, identifier),
)
[docs] @classmethod
def from_curie(cls, curie: str, name: Optional[str] = None) -> "Term":
"""Create a term directly from a CURIE and optional name."""
prefix, identifier = normalize_curie(curie)
if prefix is None or identifier is None:
raise ValueError
return cls.from_triple(prefix=prefix, identifier=identifier, name=name)
[docs] def get_url(self) -> Optional[str]:
"""Return a URL for this term's reference, if possible."""
return self.reference.get_url()
[docs] def append_provenance(self, reference: ReferenceHint) -> None:
"""Add a provenance reference."""
self.provenance.append(_ensure_ref(reference))
[docs] def append_synonym(
self, synonym: Union[str, Synonym], type: Optional[SynonymTypeDef] = None
) -> None:
"""Add a synonym."""
if isinstance(synonym, str):
synonym = Synonym(synonym, type=type)
self.synonyms.append(synonym)
[docs] def append_alt(self, alt: Union[str, Reference]) -> None:
"""Add an alternative identifier."""
if isinstance(alt, str):
alt = Reference(prefix=self.prefix, identifier=alt)
self.alt_ids.append(alt)
[docs] def append_parent(self, reference: ReferenceHint) -> "Term":
"""Add a parent to this entity."""
self.parents.append(_ensure_ref(reference))
return self
[docs] def extend_parents(self, references: Collection[Reference]) -> None:
"""Add a collection of parents to this entity."""
if any(x is None for x in references):
raise ValueError("can not append a collection of parents containing a null parent")
self.parents.extend(references)
[docs] def get_properties(self, prop) -> List[str]:
"""Get properties from the given key."""
return self.properties[prop]
[docs] def get_property(self, prop) -> Optional[str]:
"""Get a single property of the given key."""
r = self.get_properties(prop)
if not r:
return None
if len(r) != 1:
raise ValueError
return r[0]
[docs] def get_relationship(self, typedef: TypeDef) -> Optional[Reference]:
"""Get a single relationship of the given type."""
r = self.get_relationships(typedef)
if not r:
return None
if len(r) != 1:
raise ValueError
return r[0]
[docs] def get_relationships(self, typedef: TypeDef) -> List[Reference]:
"""Get relationships from the given type."""
return self.relationships[typedef]
[docs] def append_xref(self, reference: ReferenceHint) -> None:
"""Append an xref."""
self.xrefs.append(_ensure_ref(reference))
[docs] def append_relationship(self, typedef: TypeDef, reference: ReferenceHint) -> None:
"""Append a relationship."""
self.relationships[typedef].append(_ensure_ref(reference))
[docs] def set_species(self, identifier: str, name: Optional[str] = None):
"""Append the from_species relation."""
if name is None:
import pyobo
name = pyobo.get_name(NCBITAXON_PREFIX, identifier)
self.append_relationship(
from_species, Reference(prefix=NCBITAXON_PREFIX, identifier=identifier, name=name)
)
[docs] def get_species(self, prefix: str = NCBITAXON_PREFIX) -> Optional[Reference]:
"""Get the species if it exists.
:param prefix: The prefix to use in case the term has several species annotations.
"""
for species in self.relationships.get(from_species, []):
if species.prefix == prefix:
return species
return None
[docs] def extend_relationship(self, typedef: TypeDef, references: Iterable[Reference]) -> None:
"""Append several relationships."""
if any(x is None for x in references):
raise ValueError("can not extend a collection that includes a null reference")
self.relationships[typedef].extend(references)
[docs] def append_property(self, prop: str, value: str) -> None:
"""Append a property."""
self.properties[prop].append(value)
def _definition_fp(self) -> str:
assert self.definition is not None
return f'"{obo_escape_slim(self.definition)}" [{comma_separate(self.provenance)}]'
[docs] def iterate_relations(self) -> Iterable[Tuple[TypeDef, Reference]]:
"""Iterate over pairs of typedefs and targets."""
for typedef, targets in self.relationships.items():
for target in targets:
yield typedef, target
[docs] def iterate_properties(self) -> Iterable[Tuple[str, str]]:
"""Iterate over pairs of property and values."""
for prop, values in self.properties.items():
for value in values:
yield prop, value
[docs] def iterate_obo_lines(self, write_relation_comments: bool = True) -> Iterable[str]:
"""Iterate over the lines to write in an OBO file."""
yield "\n[Term]"
yield f"id: {self.curie}"
if self.name:
yield f"name: {obo_escape_slim(self.name)}"
if self.namespace and self.namespace != "?":
namespace_normalized = (
self.namespace.replace(" ", "_").replace("-", "_").replace("(", "").replace(")", "")
)
yield f"namespace: {namespace_normalized}"
if self.definition:
yield f"def: {self._definition_fp()}"
for xref in sorted(self.xrefs, key=attrgetter("prefix", "identifier")):
yield f"xref: {xref}"
for parent in sorted(self.parents, key=attrgetter("prefix", "identifier")):
yield f"is_a: {parent}"
for typedef, references in sorted(self.relationships.items(), key=_sort_relations):
for reference in sorted(references, key=attrgetter("prefix", "identifier")):
s = f"relationship: {typedef.curie} {reference.curie}"
if typedef.name or reference.name:
s += " !"
if typedef.name:
s += f" {typedef.name}"
if reference.name:
s += f" {reference.name}"
yield s
for prop, value in sorted(self.iterate_properties()):
yield f'property_value: {prop} "{value}" xsd:string' # TODO deal with types later
for synonym in sorted(self.synonyms, key=attrgetter("name")):
yield synonym.to_obo()
@staticmethod
def _escape(s) -> str:
return s.replace("\n", "\\n").replace('"', '\\"')
def _sort_relations(r):
typedef, _references = r
return typedef.reference.name or typedef.reference.identifier
class BioregistryError(ValueError):
def __str__(self) -> str:
return dedent(
f"""
The value you gave for Obo.ontology field ({self.args[0]}) is not a canonical
Bioregistry prefix in the Obo.ontology field.
Please see https://bioregistry.io for valid prefixes or feel free to open an issue
on the PyOBO issue tracker for support.
"""
)
[docs]@dataclass
class Obo:
"""An OBO document."""
#: The prefix for the ontology
ontology: ClassVar[str]
#: The name of the ontology. If not given, tries looking up with the Bioregistry.
name: ClassVar[Optional[str]] = None
#: The OBO format
format_version: ClassVar[str] = "1.2"
#: Type definitions
typedefs: ClassVar[Optional[List[TypeDef]]] = None
#: Synonym type definitions
synonym_typedefs: ClassVar[Optional[List[SynonymTypeDef]]] = None
#: An annotation about how an ontology was generated
auto_generated_by: ClassVar[Optional[str]] = None
#: The idspaces used in the document
idspaces: ClassVar[Optional[Mapping[str, str]]] = None
#: For super-sized datasets that shouldn't be read into memory
iter_only: ClassVar[bool] = False
#: Set to true for resources that are unversioned/very dynamic, like HGNC
dynamic_version: ClassVar[bool] = False
#: Set to a static version for the resource (i.e., the resource is not itself versioned)
static_version: ClassVar[Optional[str]] = None
bioversions_key: ClassVar[Optional[str]] = None
#: The date the ontology was generated
date: Optional[datetime] = field(default_factory=datetime.today)
#: The ontology version
data_version: Optional[str] = None
#: Should this ontology be reloaded?
force: bool = False
#: The hierarchy of terms
_hierarchy: Optional[nx.DiGraph] = field(init=False, default=None, repr=False)
#: A cache of terms
_items: Optional[List[Term]] = field(init=False, default=None, repr=False)
def __post_init__(self):
"""Run post-init checks."""
if self.ontology != bioregistry.normalize_prefix(self.ontology):
raise BioregistryError(self.ontology)
# The type ignores are because of the hack where we override the
# class variables in the instance
if self.name is None:
self.name = bioregistry.get_name(self.ontology) # type:ignore
if not self.data_version:
if self.static_version:
self.data_version = self.static_version
else:
self.data_version = self._get_version()
if not self.dynamic_version:
if self.data_version is None:
raise ValueError(f"{self.ontology} is missing data_version")
elif "/" in self.data_version:
raise ValueError(f"{self.ontology} has a slash in version: {self.data_version}")
if self.auto_generated_by is None:
self.auto_generated_by = f"bio2obo:{self.ontology}" # type:ignore
def _get_version(self) -> Optional[str]:
if self.bioversions_key:
import bioversions
try:
return bioversions.get_version(self.bioversions_key)
except KeyError:
logger.warning(f"[{self.bioversions_key}] bioversions doesn't list this resource ")
except IOError:
logger.warning(f"[{self.bioversions_key}] error while looking up version")
return None
@property
def _version_or_raise(self) -> str:
if not self.data_version:
raise ValueError(f"There is no version available for {self.ontology}")
return self.data_version
[docs] def iter_terms(self, force: bool = False) -> Iterable[Term]:
"""Iterate over terms in this ontology."""
raise NotImplementedError
[docs] @classmethod
def cli(cls) -> None:
"""Run the CLI for this class."""
cli = cls.get_cls_cli()
cli()
[docs] @classmethod
def get_cls_cli(cls) -> click.Command:
"""Get the CLI for this class."""
@click.command()
@verbose_option
@force_option
@click.option("--owl", is_flag=True, help="Write OWL via ROBOT")
@click.option("--graph", is_flag=True, help="Write OBO Graph JSON via ROBOT")
@click.option(
"--version", help="Specify data version to get. Use this if bioversions is acting up."
)
def _main(force: bool, owl: bool, graph: bool, version: Optional[str]):
inst = cls(force=force, data_version=version)
inst.write_default(
write_obograph=graph,
write_obo=True,
write_owl=owl,
force=force,
use_tqdm=True,
)
return _main
@property
def date_formatted(self) -> str:
"""Get the date as a formatted string."""
return (self.date if self.date else datetime.now()).strftime(DATE_FORMAT)
def _iter_terms(self, use_tqdm: bool = False, desc: str = "terms") -> Iterable[Term]:
if use_tqdm:
total: Optional[int]
try:
total = len(self._items_accessor)
except TypeError:
total = None
yield from tqdm(self, desc=desc, unit_scale=True, unit="term", total=total)
else:
yield from self
[docs] def iterate_obo_lines(self) -> Iterable[str]:
"""Iterate over the lines to write in an OBO file."""
yield f"format-version: {self.format_version}"
yield f"date: {self.date_formatted}"
if self.auto_generated_by is not None:
yield f"auto-generated-by: {self.auto_generated_by}"
if self.data_version is not None:
yield f"data-version: {self.data_version}"
for prefix, url in sorted((self.idspaces or {}).items()):
yield f"idspace: {prefix} {url}"
for synonym_typedef in sorted((self.synonym_typedefs or []), key=attrgetter("id")):
yield synonym_typedef.to_obo()
yield f"ontology: {self.ontology}"
if self.name is None:
raise ValueError("ontology is missing name")
yield f"remark: {self.name}"
for typedef in sorted(self.typedefs or [], key=attrgetter("curie")):
yield from typedef.iterate_obo_lines()
for term in self:
yield from term.iterate_obo_lines()
[docs] @open_file(1, mode="w")
def write_obo(
self, file: Union[None, str, TextIO, Path] = None, use_tqdm: bool = False
) -> None:
"""Write the OBO to a file."""
it = self.iterate_obo_lines()
if use_tqdm:
it = tqdm(it, desc=f"Writing {self.ontology}", unit_scale=True, unit="line")
self._write_lines(it, file)
@staticmethod
def _write_lines(it, file):
for line in it:
print(line, file=file) # noqa: T201
[docs] def write_obonet_gz(self, path: Union[str, Path]) -> None:
"""Write the OBO to a gzipped dump in Obonet JSON."""
graph = self.to_obonet()
with gzip.open(path, "wt") as file:
json.dump(nx.node_link_data(graph), file)
def _path(self, *parts: str, name: Optional[str] = None) -> Path:
return prefix_directory_join(self.ontology, *parts, name=name, version=self.data_version)
def _cache(self, *parts: str, name: Optional[str] = None) -> Path:
return self._path("cache", *parts, name=name)
@property
def _names_path(self) -> Path:
return self._cache(name="names.tsv")
@property
def _definitions_path(self) -> Path:
return self._cache(name="definitions.tsv")
@property
def _species_path(self) -> Path:
return self._cache(name="species.tsv")
@property
def _synonyms_path(self) -> Path:
return self._cache(name="synonyms.tsv")
@property
def _alts_path(self):
return self._cache(name="alt_ids.tsv")
@property
def _typedefs_path(self) -> Path:
return self._cache(name="typedefs.tsv")
@property
def _xrefs_path(self) -> Path:
return self._cache(name="xrefs.tsv")
@property
def _relations_path(self) -> Path:
return self._cache(name="relations.tsv")
@property
def _properties_path(self) -> Path:
return self._cache(name="properties.tsv")
@property
def _root_metadata_path(self) -> Path:
return prefix_directory_join(self.ontology, name="metadata.json")
@property
def _versioned_metadata_path(self) -> Path:
return self._cache(name="metadata.json")
@property
def _obo_path(self) -> Path:
return get_prefix_obo_path(self.ontology, version=self.data_version)
@property
def _obograph_path(self) -> Path:
return self._path(name=f"{self.ontology}.json.gz")
@property
def _owl_path(self) -> Path:
return self._path(name=f"{self.ontology}.owl")
@property
def _obonet_gz_path(self) -> Path:
return self._path(name=f"{self.ontology}.obonet.json.gz")
[docs] def write_default(
self,
use_tqdm: bool = False,
force: bool = False,
write_obo: bool = False,
write_obonet: bool = False,
write_obograph: bool = False,
write_owl: bool = False,
) -> None:
"""Write the OBO to the default path."""
metadata = self.get_metadata()
for path in (self._root_metadata_path, self._versioned_metadata_path):
logger.debug("[%s v%s] caching metadata to %s", self.ontology, self.data_version, path)
with path.open("w") as file:
json.dump(metadata, file, indent=2)
logger.debug(
"[%s v%s] caching typedefs to %s", self.ontology, self.data_version, self._typedefs_path
)
typedef_df: pd.DataFrame = self.get_typedef_df()
typedef_df.sort_values(list(typedef_df.columns), inplace=True)
typedef_df.to_csv(self._typedefs_path, sep="\t", index=False)
for label, path, header, fn in [
("names", self._names_path, [f"{self.ontology}_id", "name"], self.iterate_id_name),
(
"definitions",
self._definitions_path,
[f"{self.ontology}_id", "definition"],
self.iterate_id_definition,
),
(
"species",
self._species_path,
[f"{self.ontology}_id", "taxonomy_id"],
self.iterate_id_species,
),
(
"synonyms",
self._synonyms_path,
[f"{self.ontology}_id", "synonym"],
self.iterate_synonym_rows,
),
("alts", self._alts_path, [f"{self.ontology}_id", "alt_id"], self.iterate_alt_rows),
("xrefs", self._xrefs_path, self.xrefs_header, self.iterate_xref_rows),
("relations", self._relations_path, self.relations_header, self.iter_relation_rows),
("properties", self._properties_path, self.properties_header, self.iter_property_rows),
]:
if path.exists() and not force:
continue
logger.debug("[%s v%s] caching %s to %s", self.ontology, self.data_version, label, path)
write_iterable_tsv(
path=path,
header=header,
it=fn(), # type:ignore
)
for relation in (is_a, has_part, part_of, from_species, orthologous):
if relation is not is_a and self.typedefs is not None and relation not in self.typedefs:
continue
relations_path = self._cache("relations", name=f"{relation.curie}.tsv")
if relations_path.exists() and not force:
continue
logger.debug(
"[%s v%s] caching relation %s ! %s",
self.ontology,
self.data_version,
relation.curie,
relation.name,
)
relation_df = self.get_filtered_relations_df(relation)
if not len(relation_df.index):
continue
relation_df.sort_values(list(relation_df.columns), inplace=True)
relation_df.to_csv(relations_path, sep="\t", index=False)
if (write_obo or write_obograph or write_owl) and (not self._obo_path.exists() or force):
self.write_obo(self._obo_path, use_tqdm=use_tqdm)
if write_obograph:
obo_to_obograph(self._obo_path, self._obograph_path)
if write_owl:
obo_to_owl(self._obo_path, self._owl_path)
if write_obonet and (not self._obonet_gz_path.exists() or force):
logger.debug("writing obonet to %s", self._obonet_gz_path)
self.write_obonet_gz(self._obonet_gz_path)
@property
def _items_accessor(self):
if self._items is None:
self._items = sorted(self.iter_terms(force=self.force), key=attrgetter("curie"))
return self._items
def __iter__(self) -> Iterator["Term"]: # noqa: D105
if self.iter_only:
return iter(self.iter_terms(force=self.force))
return iter(self._items_accessor)
[docs] def ancestors(self, identifier: str) -> Set[str]:
"""Return a set of identifiers for parents of the given identifier."""
return nx.descendants(self.hierarchy, identifier) # note this is backwards
[docs] def descendants(self, identifier: str) -> Set[str]:
"""Return a set of identifiers for the children of the given identifier."""
return nx.ancestors(self.hierarchy, identifier) # note this is backwards
[docs] def is_descendant(self, descendant: str, ancestor: str) -> bool:
"""Return if the given identifier is a descendent of the ancestor.
.. code-block:: python
from pyobo import get_obo
obo = get_obo('go')
interleukin_10_complex = '1905571' # interleukin-10 receptor complex
all_complexes = '0032991'
assert obo.is_descendant('1905571', '0032991')
"""
return ancestor in self.ancestors(descendant)
@property
def hierarchy(self) -> nx.DiGraph:
"""A graph representing the parent/child relationships between the entities.
To get all children of a given entity, do:
.. code-block:: python
from pyobo import get_obo
obo = get_obo('go')
identifier = '1905571' # interleukin-10 receptor complex
is_complex = '0032991' in nx.descendants(obo.hierarchy, identifier) # should be true
""" # noqa:D401
if self._hierarchy is None:
self._hierarchy = nx.DiGraph()
for term in self._iter_terms(desc=f"[{self.ontology}] getting hierarchy"):
for parent in term.parents:
self._hierarchy.add_edge(term.identifier, parent.identifier)
return self._hierarchy
[docs] def to_obonet(self: "Obo", *, use_tqdm: bool = False) -> nx.MultiDiGraph:
"""Export as a :mod`obonet` style graph."""
rv = nx.MultiDiGraph()
rv.graph.update(
{
"name": self.name,
"ontology": self.ontology,
"auto-generated-by": self.auto_generated_by,
"typedefs": _convert_typedefs(self.typedefs),
"format-version": self.format_version,
"data-version": self.data_version,
"synonymtypedef": _convert_synonym_typedefs(self.synonym_typedefs),
"date": self.date_formatted,
}
)
nodes = {}
links = []
for term in self._iter_terms(use_tqdm=use_tqdm):
parents = []
for parent in term.parents:
if parent is None:
raise ValueError("parent should not be none!")
links.append((term.curie, "is_a", parent.curie))
parents.append(parent.curie)
relations = []
for typedef, target in term.iterate_relations():
if target is None:
raise ValueError("target should not be none!")
relations.append(f"{typedef.curie} {target.curie}")
links.append((term.curie, typedef.curie, target.curie))
d = {
"id": term.curie,
"name": term.name,
"def": term.definition and term._definition_fp(),
"xref": [xref.curie for xref in term.xrefs],
"is_a": parents,
"relationship": relations,
"synonym": [synonym._fp() for synonym in term.synonyms],
"property_value": [
f"{prop} {value}"
for prop, values in term.properties.items()
for value in values
],
}
nodes[term.curie] = {k: v for k, v in d.items() if v}
rv.add_nodes_from(nodes.items())
for _source, _key, _target in links:
rv.add_edge(_source, _target, key=_key)
logger.info(
"[%s v%s] exported graph with %d nodes",
self.ontology,
self.data_version,
rv.number_of_nodes(),
)
return rv
[docs] def iterate_ids(self, *, use_tqdm: bool = False) -> Iterable[str]:
"""Iterate over identifiers."""
for term in self._iter_terms(use_tqdm=use_tqdm, desc=f"[{self.ontology}] getting names"):
if term.prefix == self.ontology:
yield term.identifier
[docs] def get_ids(self, *, use_tqdm: bool = False) -> Set[str]:
"""Get the set of identifiers."""
return set(self.iterate_ids(use_tqdm=use_tqdm))
[docs] def iterate_id_name(self, *, use_tqdm: bool = False) -> Iterable[Tuple[str, str]]:
"""Iterate identifier name pairs."""
for term in self._iter_terms(use_tqdm=use_tqdm, desc=f"[{self.ontology}] getting names"):
if term.name:
yield term.identifier, term.name
[docs] def get_id_name_mapping(self, *, use_tqdm: bool = False) -> Mapping[str, str]:
"""Get a mapping from identifiers to names."""
return dict(self.iterate_id_name(use_tqdm=use_tqdm))
[docs] def iterate_id_definition(self, *, use_tqdm: bool = False) -> Iterable[Tuple[str, str]]:
"""Iterate over pairs of terms' identifiers and their respective definitions."""
for term in self._iter_terms(use_tqdm=use_tqdm, desc=f"[{self.ontology}] getting names"):
if term.identifier and term.definition:
yield term.identifier, term.definition.strip('"').replace("\n", " ").replace(
"\t", " "
).replace(" ", " ")
[docs] def get_id_definition_mapping(self, *, use_tqdm: bool = False) -> Mapping[str, str]:
"""Get a mapping from identifiers to definitions."""
return dict(self.iterate_id_definition(use_tqdm=use_tqdm))
############
# TYPEDEFS #
############
[docs] def iterate_id_species(
self, *, prefix: Optional[str] = None, use_tqdm: bool = False
) -> Iterable[Tuple[str, str]]:
"""Iterate over terms' identifiers and respective species (if available)."""
if prefix is None:
prefix = NCBITAXON_PREFIX
for term in self._iter_terms(use_tqdm=use_tqdm, desc=f"[{self.ontology}] getting species"):
species = term.get_species(prefix=prefix)
if species:
yield term.identifier, species.identifier
[docs] def get_id_species_mapping(
self, *, prefix: Optional[str] = None, use_tqdm: bool = False
) -> Mapping[str, str]:
"""Get a mapping from identifiers to species."""
return dict(self.iterate_id_species(prefix=prefix, use_tqdm=use_tqdm))
############
# TYPEDEFS #
############
[docs] def get_typedef_df(self, use_tqdm: bool = False) -> pd.DataFrame:
"""Get a typedef dataframe."""
rows = [
(typedef.prefix, typedef.identifier, typedef.name)
for typedef in tqdm(self.typedefs or [], disable=not use_tqdm)
]
return pd.DataFrame(rows, columns=["prefix", "identifier", "name"])
[docs] def iter_typedef_id_name(self) -> Iterable[Tuple[str, str]]:
"""Iterate over typedefs' identifiers and their respective names."""
for typedef in self.typedefs or []:
yield typedef.identifier, typedef.name
[docs] def get_typedef_id_name_mapping(self) -> Mapping[str, str]:
"""Get a mapping from typedefs' identifiers to names."""
return dict(self.iter_typedef_id_name())
#########
# PROPS #
#########
[docs] def iterate_properties(self, *, use_tqdm: bool = False) -> Iterable[Tuple[Term, str, str]]:
"""Iterate over tuples of terms, properties, and their values."""
# TODO if property_prefix is set, try removing that as a prefix from all prop strings.
for term in self._iter_terms(
use_tqdm=use_tqdm, desc=f"[{self.ontology}] getting properties"
):
for prop, value in term.iterate_properties():
yield term, prop, value
@property
def properties_header(self):
"""Property dataframe header.""" # noqa:D401
return [f"{self.ontology}_id", "property", "value"]
[docs] def iter_property_rows(self, *, use_tqdm: bool = False) -> Iterable[Tuple[str, str, str]]:
"""Iterate property rows."""
for term, prop, value in self.iterate_properties(use_tqdm=use_tqdm):
yield term.identifier, prop, value
[docs] def get_properties_df(self, *, use_tqdm: bool = False) -> pd.DataFrame:
"""Get all properties as a dataframe."""
return pd.DataFrame(
list(self.iter_property_rows(use_tqdm=use_tqdm)),
columns=self.properties_header,
)
[docs] def iterate_filtered_properties(
self, prop: str, *, use_tqdm: bool = False
) -> Iterable[Tuple[Term, str]]:
"""Iterate over tuples of terms and the values for the given property."""
for term in self._iter_terms(use_tqdm=use_tqdm):
for _prop, value in term.iterate_properties():
if _prop == prop:
yield term, value
[docs] def get_filtered_properties_df(self, prop: str, *, use_tqdm: bool = False) -> pd.DataFrame:
"""Get a dataframe of terms' identifiers to the given property's values."""
return pd.DataFrame(
list(self.get_filtered_properties_mapping(prop, use_tqdm=use_tqdm).items()),
columns=[f"{self.ontology}_id", prop],
)
[docs] def get_filtered_properties_mapping(
self, prop: str, *, use_tqdm: bool = False
) -> Mapping[str, str]:
"""Get a mapping from a term's identifier to the property.
.. warning:: Assumes there's only one version of the property for each term.
"""
return {
term.identifier: value
for term, value in self.iterate_filtered_properties(prop, use_tqdm=use_tqdm)
}
[docs] def get_filtered_properties_multimapping(
self, prop: str, *, use_tqdm: bool = False
) -> Mapping[str, List[str]]:
"""Get a mapping from a term's identifier to the property values."""
return multidict(
(term.identifier, value)
for term, value in self.iterate_filtered_properties(prop, use_tqdm=use_tqdm)
)
#############
# RELATIONS #
#############
[docs] def iterate_relations(
self, *, use_tqdm: bool = False
) -> Iterable[Tuple[Term, TypeDef, Reference]]:
"""Iterate over tuples of terms, relations, and their targets."""
for term in self._iter_terms(
use_tqdm=use_tqdm, desc=f"[{self.ontology}] getting relations"
):
for parent in term.parents:
yield term, is_a, parent
for typedef, reference in term.iterate_relations():
if (self.typedefs is None or typedef not in self.typedefs) and (
typedef.prefix,
typedef.identifier,
) not in default_typedefs:
raise ValueError(f"Undefined typedef: {typedef.curie} ! {typedef.name}")
yield term, typedef, reference
[docs] def iter_relation_rows(
self, use_tqdm: bool = False
) -> Iterable[Tuple[str, str, str, str, str]]:
"""Iterate the relations' rows."""
for term, typedef, reference in self.iterate_relations(use_tqdm=use_tqdm):
yield term.identifier, typedef.prefix, typedef.identifier, reference.prefix, reference.identifier
[docs] def iterate_filtered_relations(
self,
relation: RelationHint,
*,
use_tqdm: bool = False,
) -> Iterable[Tuple[Term, Reference]]:
"""Iterate over tuples of terms and ther targets for the given relation."""
_target_prefix, _target_identifier = get_reference_tuple(relation)
for term, typedef, reference in self.iterate_relations(use_tqdm=use_tqdm):
if typedef.prefix == _target_prefix and typedef.identifier == _target_identifier:
yield term, reference
@property
def relations_header(self) -> Sequence[str]:
"""Header for the relations dataframe.""" # noqa:D401
return [f"{self.ontology}_id", RELATION_PREFIX, RELATION_ID, TARGET_PREFIX, TARGET_ID]
[docs] def get_relations_df(self, *, use_tqdm: bool = False) -> pd.DataFrame:
"""Get all relations from the OBO."""
return pd.DataFrame(
list(self.iter_relation_rows(use_tqdm=use_tqdm)),
columns=self.relations_header,
)
[docs] def get_filtered_relations_df(
self,
relation: RelationHint,
*,
use_tqdm: bool = False,
) -> pd.DataFrame:
"""Get a specific relation from OBO."""
return pd.DataFrame(
[
(term.identifier, reference.prefix, reference.identifier)
for term, reference in self.iterate_filtered_relations(relation, use_tqdm=use_tqdm)
],
columns=[f"{self.ontology}_id", TARGET_PREFIX, TARGET_ID],
)
[docs] def iterate_filtered_relations_filtered_targets(
self,
relation: RelationHint,
target_prefix: str,
*,
use_tqdm: bool = False,
) -> Iterable[Tuple[Term, Reference]]:
"""Iterate over relationships between one identifier and another."""
for term, reference in self.iterate_filtered_relations(
relation=relation, use_tqdm=use_tqdm
):
if reference.prefix == target_prefix:
yield term, reference
[docs] def get_relation_mapping(
self,
relation: RelationHint,
target_prefix: str,
*,
use_tqdm: bool = False,
) -> Mapping[str, str]:
"""Get a mapping from the term's identifier to the target's identifier.
.. warning:: Assumes there's only one version of the property for each term.
Example usage: get homology between HGNC and MGI:
>>> from pyobo.sources.hgnc import get_obo
>>> obo = get_obo()
>>> human_mapt_hgnc_id = '6893'
>>> mouse_mapt_mgi_id = '97180'
>>> hgnc_mgi_orthology_mapping = obo.get_relation_mapping('ro:HOM0000017', 'mgi')
>>> assert mouse_mapt_mgi_id == hgnc_mgi_orthology_mapping[human_mapt_hgnc_id]
"""
return {
term.identifier: reference.identifier
for term, reference in self.iterate_filtered_relations_filtered_targets(
relation=relation,
target_prefix=target_prefix,
use_tqdm=use_tqdm,
)
}
[docs] def get_relation(
self,
source_identifier: str,
relation: RelationHint,
target_prefix: str,
*,
use_tqdm: bool = False,
) -> Optional[str]:
"""Get the value for a bijective relation mapping between this resource and a target resource.
>>> from pyobo.sources.hgnc import get_obo
>>> obo = get_obo()
>>> human_mapt_hgnc_id = '6893'
>>> mouse_mapt_mgi_id = '97180'
>>> assert mouse_mapt_mgi_id == obo.get_relation(human_mapt_hgnc_id, 'ro:HOM0000017', 'mgi')
"""
relation_mapping = self.get_relation_mapping(
relation=relation, target_prefix=target_prefix, use_tqdm=use_tqdm
)
return relation_mapping.get(source_identifier)
[docs] def get_relation_multimapping(
self,
relation: RelationHint,
target_prefix: str,
*,
use_tqdm: bool = False,
) -> Mapping[str, List[str]]:
"""Get a mapping from the term's identifier to the target's identifiers."""
return multidict(
(term.identifier, reference.identifier)
for term, reference in self.iterate_filtered_relations_filtered_targets(
relation=relation,
target_prefix=target_prefix,
use_tqdm=use_tqdm,
)
)
[docs] def get_id_multirelations_mapping(
self,
typedef: TypeDef,
*,
use_tqdm: bool = False,
) -> Mapping[str, List[Reference]]:
"""Get a mapping from identifiers to a list of all references for the given relation."""
return multidict(
(term.identifier, reference)
for term in self._iter_terms(
use_tqdm=use_tqdm, desc=f"[{self.ontology}] getting {typedef.curie}"
)
for reference in term.get_relationships(typedef)
)
############
# SYNONYMS #
############
[docs] def iterate_synonyms(self, *, use_tqdm: bool = False) -> Iterable[Tuple[Term, Synonym]]:
"""Iterate over pairs of term and synonym object."""
for term in self._iter_terms(use_tqdm=use_tqdm, desc=f"[{self.ontology}] getting synonyms"):
for synonym in sorted(term.synonyms, key=attrgetter("name")):
yield term, synonym
[docs] def iterate_synonym_rows(self, *, use_tqdm: bool = False) -> Iterable[Tuple[str, str]]:
"""Iterate over pairs of identifier and synonym text."""
for term, synonym in self.iterate_synonyms(use_tqdm=use_tqdm):
yield term.identifier, synonym.name
[docs] def get_id_synonyms_mapping(self, *, use_tqdm: bool = False) -> Mapping[str, List[str]]:
"""Get a mapping from identifiers to a list of sorted synonym strings."""
return multidict(self.iterate_synonym_rows(use_tqdm=use_tqdm))
#########
# XREFS #
#########
[docs] def iterate_xrefs(self, *, use_tqdm: bool = False) -> Iterable[Tuple[Term, Reference]]:
"""Iterate over xrefs."""
for term in self._iter_terms(use_tqdm=use_tqdm, desc=f"[{self.ontology}] getting xrefs"):
for xref in term.xrefs:
yield term, xref
[docs] def iterate_filtered_xrefs(
self, prefix: str, *, use_tqdm: bool = False
) -> Iterable[Tuple[Term, Reference]]:
"""Iterate over xrefs to a given prefix."""
for term, xref in self.iterate_xrefs(use_tqdm=use_tqdm):
if xref.prefix == prefix:
yield term, xref
[docs] def iterate_xref_rows(self, *, use_tqdm: bool = False) -> Iterable[Tuple[str, str, str]]:
"""Iterate over terms' identifiers, xref prefixes, and xref identifiers."""
for term, xref in self.iterate_xrefs(use_tqdm=use_tqdm):
yield term.identifier, xref.prefix, xref.identifier
@property
def xrefs_header(self):
"""The header for the xref dataframe.""" # noqa:D401
return [f"{self.ontology}_id", TARGET_PREFIX, TARGET_ID]
[docs] def get_xrefs_df(self, *, use_tqdm: bool = False) -> pd.DataFrame:
"""Get a dataframe of all xrefs extracted from the OBO document."""
return pd.DataFrame(
list(self.iterate_xref_rows(use_tqdm=use_tqdm)),
columns=[f"{self.ontology}_id", TARGET_PREFIX, TARGET_ID],
).drop_duplicates()
[docs] def get_filtered_xrefs_mapping(
self, prefix: str, *, use_tqdm: bool = False
) -> Mapping[str, str]:
"""Get filtered xrefs as a dictionary."""
return {
term.identifier: xref.identifier
for term, xref in self.iterate_filtered_xrefs(prefix, use_tqdm=use_tqdm)
}
[docs] def get_filtered_multixrefs_mapping(
self, prefix: str, *, use_tqdm: bool = False
) -> Mapping[str, List[str]]:
"""Get filtered xrefs as a dictionary."""
return multidict(
(term.identifier, xref.identifier)
for term, xref in self.iterate_filtered_xrefs(prefix, use_tqdm=use_tqdm)
)
########
# ALTS #
########
[docs] def iterate_alts(self) -> Iterable[Tuple[Term, Reference]]:
"""Iterate over alternative identifiers."""
for term in self:
for alt in term.alt_ids:
yield term, alt
[docs] def iterate_alt_rows(self) -> Iterable[Tuple[str, str]]:
"""Iterate over pairs of terms' primary identifiers and alternate identifiers."""
for term, alt in self.iterate_alts():
yield term.identifier, alt.identifier
[docs] def get_id_alts_mapping(self) -> Mapping[str, List[str]]:
"""Get a mapping from identifiers to a list of alternative identifiers."""
return multidict((term.identifier, alt.identifier) for term, alt in self.iterate_alts())
def make_ad_hoc_ontology(
_ontology: str,
_name: str,
_auto_generated_by: Optional[str] = None,
_format_version: str = "1.2",
_typedefs: Optional[List[TypeDef]] = None,
_synonym_typedefs: Optional[List[SynonymTypeDef]] = None,
_date: Optional[datetime] = None,
_data_version: Optional[str] = None,
*,
terms: List[Term],
) -> "Obo":
"""Make an ad-hoc ontology."""
class AdHocOntology(Obo):
"""An ad hoc ontology created from an OBO file."""
ontology = _ontology
name = _name
auto_generated_by = _auto_generated_by
format_version = _format_version
typedefs = _typedefs
synonym_typedefs = _synonym_typedefs
def __post_init__(self):
self.date = _date
self.data_version = _data_version
def iter_terms(self, force: bool = False) -> Iterable[Term]:
"""Iterate over terms in the ad hoc ontology."""
return terms
return AdHocOntology()
def _convert_typedefs(typedefs: Optional[Iterable[TypeDef]]) -> List[Mapping[str, Any]]:
"""Convert the type defs."""
if not typedefs:
return []
return [_convert_typedef(typedef) for typedef in typedefs]
def _convert_typedef(typedef: TypeDef) -> Mapping[str, Any]:
"""Convert a type def."""
# TODO add more later
return typedef.reference.to_dict()
def _convert_synonym_typedefs(synonym_typedefs: Optional[Iterable[SynonymTypeDef]]) -> List[str]:
"""Convert the synonym type defs."""
if not synonym_typedefs:
return []
return [_convert_synonym_typedef(synonym_typedef) for synonym_typedef in synonym_typedefs]
def _convert_synonym_typedef(synonym_typedef: SynonymTypeDef) -> str:
return f'{synonym_typedef.id} "{synonym_typedef.name}"'