Source code for obnb.model_trainer.base

import time
from copy import deepcopy

import numpy as np
from tqdm.auto import tqdm

import obnb.metric
from obnb.alltypes import Any, Callable, Dict, LogLevel, Optional
from obnb.util.logger import attach_file_handler, get_logger


[docs]class BaseTrainer: """The BaseTrainer object. Abstract class for trainer objects, which serve as interfaces or shortcuts for training specific types of models. """ def __init__( self, metrics: Optional[Dict[str, Callable[[np.ndarray, np.ndarray], float]]] = None, train_on: str = "train", log_level: LogLevel = "INFO", log_path: Optional[str] = None, ): """Initialize BaseTraining. Note: "dual" mode only works if the input features is MultiFeatureVec. Args: metrics: Dictionary of metrics used to train/evaluate the model. If not specified, will use the default selection of APOP and AUROC. train_on: Which mask to use for training. log_level: Log level. log_path: Log file path. If not set, then do not log to file. """ self._tic: Optional[float] = None if not metrics: metrics = { "apop": obnb.metric.log2_auprc_prior, "auroc": obnb.metric.auroc, } self.metrics = metrics self.train_on = train_on self.logger = get_logger( self.__class__.__name__, log_level=log_level, base_logger="obnb_brief", ) if log_path: attach_file_handler(self.logger, log_path)
[docs] def train( self, model: Any, dataset, split_idx: int = 0, ): """Train model and return metrics. Args: model: Model to be trained. y: Label array with the shape of (n_tot_samples, n_classes) or (n_tot_samples,) if n_classes = 1. masks: Masks for splitting data, see the ``split`` method in ``label.collection.LabelsetCollection`` for moer info. split_idx: Which split to use for training and evaluation. """ raise NotImplementedError( f"{self.__class__.__name__} does not have functional ``train`` " f"method, use a derived class instead.", )
def _elapse(self) -> float: """Record the time difference between two consecutive calls. Note: The first call will return elapsed time of 0. """ now = time.time() elapsed = 0.0 if self._tic is None else now - self._tic self._tic = now return elapsed
class StandardTrainer(BaseTrainer): def train( self, model: Any, dataset, split_idx: int = 0, ) -> Dict[str, float]: """Train a supervised learning model. The ``model`` in this case is a upervised learning model that has a ``fit`` method for training the model, and a ``decision_function`` that returns the predict confidence scores given some features. See ``sklearn.linear_model.LogisticRegression`` for example. """ g = dataset.graph x = None if dataset.feature is None else dataset.feature.mat y = dataset.y # TODO: log time and other useful stats (maybe use the decorator?) train_mask = dataset.masks[self.train_on][:, split_idx] self._model_train(model, g, x, y, train_mask) _, _, get_predictions, compute_results = self._setup(dataset, split_idx) get_predictions(model, x, y, dataset.masks) results = compute_results(dataset.masks) return results def fit_and_eval( self, model: Any, dataset, split_idx: int = 0, consider_negative: bool = False, reduce: str = "none", progress: bool = True, ) -> Dict[str, float]: """Fit model and evaluate. Note: The original model is not trained. For each task, a deep copy of the model is created and it is evaluated via one-vs-rest. """ g = dataset.graph x = None if dataset.feature is None else dataset.feature.mat _, _, get_predictions, compute_results = self._setup(dataset, split_idx) pbar = tqdm(dataset.label.label_ids, disable=not progress) for i, label_id in enumerate(pbar): y, masks = dataset.label.split( splitter=dataset.splitter, target_ids=tuple(dataset.idmap.lst), labelset_name=label_id, consider_negative=consider_negative, ) train_mask = masks[self.train_on][:, split_idx] model_copy = deepcopy(model) self._model_train(model_copy, g, x, y, train_mask) get_predictions(model_copy, x, y, masks, i) intermediate_results = compute_results(masks, label_idx=i) self.logger.info(f"{label_id}\t{intermediate_results}") results = compute_results(dataset.masks, reduce=reduce) return results def _setup(self, dataset, split_idx: int): # Initialize y dictionary: mask_name -> y_pred/true (2d arrays) y_pred_dict: Dict[str, np.ndarray] = {} y_true_dict: Dict[str, np.ndarray] = {} num_classes = 1 if len(dataset.y.shape) == 1 else dataset.y.shape[1] for mask_name in dataset.masks: num_examples = dataset.masks[mask_name][:, split_idx].sum() shape = (num_examples, num_classes) y_pred_dict[mask_name] = np.zeros(shape) y_true_dict[mask_name] = np.zeros(shape) def compute_results( masks, label_idx: Optional[int] = None, reduce: str = "mean", ) -> Dict[str, float]: # Set up results compute function using the y dicts and the metrics results = {} for metric_name, metric_func in self.metrics.items(): for mask_name in masks: y_true = y_true_dict[mask_name] y_pred = y_pred_dict[mask_name] if label_idx is not None: y_true = y_true[:, label_idx] y_pred = y_pred[:, label_idx] score = metric_func(y_true, y_pred, reduce=reduce) # type: ignore results[f"{mask_name}_{metric_name}"] = score return results def get_predictions(model, x, y, masks, label_idx: Optional[int] = None): # Function to fill in y_pred_dict and y_true_dict given trained model for mask_name in masks: mask = masks[mask_name][:, split_idx] y_true = y[mask] y_pred = self._model_predict(model, x, mask) if label_idx is None: y_true_dict[mask_name] = y_true y_pred_dict[mask_name] = y_pred else: # only fill in the column that corresponds to the task y_true_dict[mask_name][:, label_idx] = y_true y_pred_dict[mask_name][:, label_idx] = y_pred return y_true_dict, y_pred_dict, get_predictions, compute_results @staticmethod def _model_predict(model, x, mask): raise NotImplementedError @staticmethod def _model_train(model, g, x, y, mask): raise NotImplementedError