import os
import os.path as osp
import shutil
from datetime import datetime
from pprint import pformat
import yaml
import obnb
from obnb.typing import Any, Converter, Dict, List, LogLevel, Mapping, Optional
from obnb.util.checkers import checkConfig
from obnb.util.converter import MyGeneInfoConverter
from obnb.util.download import download_unzip, get_data_url
from obnb.util.logger import get_logger, log_file_context
from obnb.util.path import cleandir, hexdigest
[docs]class BaseData:
"""BaseData object.
This is an abstract (mixin) class for constructing data objects. The main
methods are _download and _process, which are wrappers that download the raw
files and process the files into the final processed file if they are not
yet available. Otherwise, directly load the previously processed file.
"""
CONFIG_KEYS: List[str] = ["version", "gene_id_converter"]
# Set to new data release name when preparing data for new release
_new_data_release: Optional[str] = None
def __init__(
self,
root: str,
*,
version: str = "latest",
redownload: bool = False,
reprocess: bool = False,
retransform: bool = False,
log_level: LogLevel = "INFO",
pre_transform: Any = "default",
transform: Optional[Any] = None,
cache_transform: bool = True,
download_cache: bool = True,
gene_id_converter: Converter = "HumanEntrez",
**kwargs,
):
"""Initialize BaseData object.
Args:
root: Root directory of the data files.
version: Name of the version of the data to use, default setting 'latest'
will download and process the latest data from the source.
redownload: If set to True, redownload the data even if all raw files are
downloaded.
reprocess: If set to True, reprocess the data even if the processed data
is available.
retransform: If set to tTrue, retransform the data even if the cached
transformation is available.
pre_transform: Optional pre_transformation to be applied to the data object
before saving as the final processed data object. If set to 'default',
will use the default pre_transformation.
transform: Optional transformation to be applied to the data
object.
cache_transform: Whether or not to cache the transformed data. The cached
transformed data will be saved under
`<data_root_directory>/processed/.cache/`.
download_cache: If set to True, then check to see if <root>/.cache exists,
and if not, pull the cache from versioned archive.
gene_id_converter: A mapping object that maps a given node ID to a new node
ID of interest. Or the name of a predefined MygeneInfoConverter object
as a string.
Note:
The `pre_transform` option is only valid when `version` is set to
'latest'.
"""
super().__init__(**kwargs)
self.root = root
self.version = obnb.__data_version__ if version == "current" else version
self.log_level = log_level
self.cache_transform = cache_transform
self.download_cache = download_cache
self.gene_id_converter = gene_id_converter
self.pre_transform = pre_transform
self._setup_redos(redownload, reprocess, retransform)
self._setup_process_logger()
if version == "latest":
with log_file_context(self.plogger, self.info_log_path):
self._download()
self._process()
else:
self._download_archive()
self._process() # FIX:
self.load_processed_data()
self._apply_transform(transform)
[docs] def to_config(self) -> Dict[str, Any]:
"""Generate configuration dictionary from the data object.
Note:
If a parameter of the data object is a dictionary, it cannot
contain value that is another dictionary. The only exception
currently is `pre_transform`.
"""
params = {key: getattr(self, key) for key in self.CONFIG_KEYS}
# Set version to new data release version if it is set
params["version"] = self._new_data_release or params["version"]
if self.pre_transform is not None:
params["pre_transform"] = self.pre_transform.to_config()
config = {
"package_version": obnb.__version__,
"processed_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"data_module": self.classname,
"data_module_params": params,
}
checkConfig("Data config", config, max_depth=3, white_list=["pre_transform"])
return config
def _setup_redos(self, redownload: bool, reprocess: bool, retransform: bool):
# Redownload > reprocess > retransform
self.redownload = redownload
self.reprocess = reprocess or self.redownload
self.retransform = retransform or self.reprocess
def _setup_process_logger(self):
"""Set up process logger and file handler for data processing steps."""
os.makedirs(self.info_dir, exist_ok=True)
self.plogger = get_logger(
None,
base_logger="obnb_precise",
log_level=self.log_level,
)
def get_gene_id_converter(self) -> Mapping[str, str]:
if self.gene_id_converter is None:
return {}
elif isinstance(self.gene_id_converter, str):
return MyGeneInfoConverter.construct(
self.gene_id_converter,
root=self.root,
log_level=self.log_level,
)
else:
return self.gene_id_converter
@property
def default_pre_transform(self) -> Any:
return None
@property
def pre_transform(self) -> Any:
return self._pre_transform
@pre_transform.setter
def pre_transform(self, pre_transform):
if pre_transform == "default":
self._pre_transform = self.default_pre_transform
elif isinstance(pre_transform, str):
raise ValueError(f"Unknown pre_transform option {pre_transform}")
elif self.version != "latest":
self._pre_transform = pre_transform
return # FIX: allow custom pre_transform for archived version
raise ValueError(
"pre_transform option is only valid when version='latest', "
f"got {self.version!r} instead",
)
else:
self._pre_transform = pre_transform
@property
def classname(self) -> str:
"""Return data object name."""
return self.__class__.__name__
@property
def raw_dir(self) -> str:
"""Return raw file directory."""
return cleandir(osp.join(self.root, self.classname, "raw"))
@property
def processed_dir(self) -> str:
"""Return raw file directory."""
return cleandir(osp.join(self.root, self.classname, "processed"))
@property
def cache_dir(self) -> str:
"""Return transformed data cache directory."""
return cleandir(osp.join(self.processed_dir, ".cache"))
@property
def info_dir(self) -> str:
"""Return info file directory."""
return cleandir(osp.join(self.root, self.classname, "info"))
@property
def raw_files(self) -> List[str]:
"""Return a list of raw file names."""
raise NotImplementedError
@property
def processed_files(self) -> List[str]:
"""Return a list of processed file names."""
raise NotImplementedError
@property
def info_log_path(self) -> str:
"""Return path to the data processing information log file."""
return osp.join(self.info_dir, "run.log")
[docs] def raw_file_path(self, idx: int) -> str:
"""Return path to a raw file given its index."""
return osp.join(self.raw_dir, self.raw_files[idx])
[docs] def processed_file_path(self, idx) -> str:
"""Return path to a processed file given its index."""
return osp.join(self.processed_dir, self.processed_files[idx])
[docs] def download_completed(self) -> bool:
"""Check if all raw files are downloaded."""
return all(
osp.isfile(osp.join(self.raw_dir, raw_file)) for raw_file in self.raw_files
)
[docs] def process_completed(self) -> bool:
"""Check if all processed files are available.."""
return all(
osp.isfile(osp.join(self.processed_dir, processed_file))
for processed_file in self.processed_files
)
[docs] def load_processed_data(self, path: Optional[str] = None):
"""Load processed data into the data object.
Note:
Any existing data must be purged upon calling this function. That
is, the data object (self) will contain exactly the data loaded,
but not not anything else.
"""
raise NotImplementedError
[docs] def download(self):
"""Download raw files."""
raise NotImplementedError
def _download(self):
"""Check to see if files downloaded first before downloading."""
os.makedirs(self.raw_dir, exist_ok=True)
if self.redownload or not self.download_completed():
self.plogger.info(f"Start downloading {self.classname}...")
self.download()
[docs] def process(self):
"""Process raw files and save processed data."""
raise NotImplementedError
def _process(self):
"""Check to see if processed file exist and process if not."""
os.makedirs(self.processed_dir, exist_ok=True)
if not self.reprocess and self.process_completed():
return
# Process data
self.plogger.info(f"Start processing {self.classname}...")
self.process()
# Pre-transform data
if self.pre_transform is not None:
self.load_processed_data()
self.plogger.info(f"Applying pre-transformation {self.pre_transform}")
self.apply_transform(self.pre_transform)
outpath = self.processed_file_path(0)
self.save(outpath)
self.plogger.info(f"Saved pre-transformed file {outpath}")
# Save data config file
config_path = osp.join(self.info_dir, "config.yaml")
with open(config_path, "w") as f:
f.write(yaml.dump(self.to_config(), sort_keys=False))
self.plogger.info(f"Config file saved to {config_path}")
[docs] def save(self, path):
"""Save the data object to file.
Args:
path: Path to the data file to save.
"""
raise NotImplementedError
def _apply_transform(self, transform: Optional[Any]):
"""Check to see if cached transformed data exist and load if so."""
# TODO: make this pretransform and add a transform version that do not save?
if transform is None:
return
# Set up configs and cache related variables
config = transform.to_config()
config_dump = yaml.dump(config)
hexhash = hexdigest(config_dump)
cache_dir = osp.join(self.cache_dir, hexhash)
cache_config_path = osp.join(cache_dir, "config.yaml")
cache_file_path = osp.join(cache_dir, self.processed_files[0])
# Check if transformed data cache is available and load directly if so
if osp.isfile(cache_file_path):
# TODO: option to furthercheck if info matches (config.yaml)
with open(cache_config_path) as f:
force_retransform = False
if (cache_config := yaml.safe_load(f)) != config:
self.plogger.warning(
f"Found transformed cache in {cache_dir} but found in "
"compatible configs, over writing now. Please report "
"to the GitHub issue if you saw this message, along "
"with the specific transformation you used.",
)
force_retransform = True
self.plogger.debug(f"config:\n{pformat(config)!s}")
self.plogger.debug(f"cache_config:\n{pformat(cache_config)!s}")
if not self.retransform and not force_retransform:
cache_path = osp.join(cache_dir, self.processed_files[0])
self.plogger.info(f"Loading cached transformed data from {cache_path}")
self.load_processed_data(cache_path)
return
shutil.rmtree(cache_dir)
os.makedirs(cache_dir, exist_ok=True)
with open(cache_config_path, "w") as f:
f.write(config_dump)
# Apply transformation to the data
# TODO: add option to disable saving option
# FIX: implement stats for graph/feature data types
with log_file_context(self.plogger, osp.join(cache_dir, "run.log")):
self.plogger.info(f"Before transformation:\n{self.stats()}") # type: ignore
self.plogger.info(f"Applying transformation:\n{transform}")
self.apply_transform(transform)
self.plogger.info(f"After transformation:\n{self.stats()}") # type: ignore
# Optionally, save transformed data to cache
if self.cache_transform:
self.save(cache_file_path)
self.plogger.info(f"Saved cache transformation to {cache_file_path}")
else:
shutil.rmtree(cache_dir)
[docs] def download_archive(self, version: str):
"""Load data from archived version that ensures reproducibility.
Note:
The downloaded data is assumed to be a zip file, which will be
unzipped and saved to the :attr:`root` directory.
Args:
version: Archival version.
"""
self.plogger.info(f"Loading {self.classname} ({version=})...")
data_url = get_data_url(version, self.classname, logger=self.plogger)
download_unzip(data_url, self.root, logger=self.plogger)
if self.download_cache and not osp.isdir(osp.join(self.root, ".cache")):
cache_url = get_data_url(version, ".cache", logger=self.plogger)
download_unzip(cache_url, self.root, logger=self.plogger)
def _download_archive(self):
"""Check if files data set up and download the archive if not."""
# TODO: check version in the config file to see if matches
if (
self.redownload
or not self.download_completed()
or not self.process_completed()
):
self.download_archive(self.version)