Source code for autogluon.timeseries.models.chronos.model

import logging
import os
from itertools import chain
from typing import Any, Dict, Literal, Optional, Union

import numpy as np
import pandas as pd

from autogluon.common.loaders import load_pkl
from autogluon.timeseries.dataset.ts_dataframe import TimeSeriesDataFrame
from autogluon.timeseries.models.abstract import AbstractTimeSeriesModel
from autogluon.timeseries.utils.forecast import get_forecast_horizon_index_ts_dataframe

logger = logging.getLogger(__name__)


# allowed HuggingFace model paths with custom parameter definitions
MODEL_CONFIGS = {
    "amazon/chronos-t5-tiny": {
        "num_gpus": 0,  # minimum number of required GPUs
    },
    "amazon/chronos-t5-mini": {"num_gpus": 0},
    "amazon/chronos-t5-small": {"num_gpus": 1},
    "amazon/chronos-t5-base": {"num_gpus": 1},
    "amazon/chronos-t5-large": {"num_gpus": 1},
}


MODEL_ALIASES = {
    "tiny": "amazon/chronos-t5-tiny",
    "mini": "amazon/chronos-t5-mini",
    "small": "amazon/chronos-t5-small",
    "base": "amazon/chronos-t5-base",
    "large": "amazon/chronos-t5-large",
}


class ChronosInferenceDataset:
    """A container for time series datasets that implements the ``torch.utils.data.Dataset`` interface"""

    def __init__(
        self,
        target_df: TimeSeriesDataFrame,
        context_length: int,
        target_column: str = "target",
    ):
        assert context_length > 0
        self.context_length = context_length
        self.target_array = target_df[target_column].to_numpy(dtype=np.float32)
        self.freq = target_df.freq

        # store pointer to start:end of each time series
        cum_sizes = target_df.num_timesteps_per_item().values.cumsum()
        self.indptr = np.append(0, cum_sizes).astype(np.int32)

    def __len__(self):
        return len(self.indptr) - 1  # noqa

    def _get_context(self, a: np.ndarray, pad_value=np.nan):
        a = a[-self.context_length :]
        pad_size = self.context_length - len(a)
        if pad_size > 0:
            pad = np.full(shape=(pad_size,), fill_value=pad_value)
            a = np.concatenate((pad, a))
        return a

    def __getitem__(self, idx) -> np.ndarray:
        start_idx = self.indptr[idx]
        end_idx = self.indptr[idx + 1]

        return self._get_context(self.target_array[start_idx:end_idx])


[docs] class ChronosModel(AbstractTimeSeriesModel): """Chronos pretrained time series forecasting models, based on the original `ChronosModel <https://github.com/amazon-science/chronos-forecasting>`_ implementation. Chronos is family of pretrained models, based on the T5 family, with number of parameters ranging between 8M and 710M. The full collection of Chronos models is available on `Hugging Face <https://huggingface.co/collections/amazon/chronos-models-65f1791d630a8d57cb718444>`_. For Chronos small, base, and large variants a GPU is required to perform inference efficiently. Chronos takes a minimalistic approach to pretraining time series models, by discretizing time series data directly into bins which are treated as tokens, effectively performing regression by classification. This results in a simple and flexible framework for using any language model in the context of time series forecasting. See [Ansari2024]_ for more information. References ---------- .. [Ansari2024] Ansari, Abdul Fatir, Stella, Lorenzo et al. "Chronos: Learning the Language of Time Series." http://arxiv.org/abs/2403.07815 Other Parameters ---------------- model_path: str, default = "amazon/chronos-t5-small" Model path used for the model, i.e., a HuggingFace transformers ``name_or_path``. Can be a compatible model name on HuggingFace Hub or a local path to a model directory. Original Chronos models (i.e., ``amazon/chronos-t5-{model_size}``) can be specified with aliases ``tiny``, ``mini`` , ``small``, ``base``, and ``large``. batch_size : int, default = 16 Size of batches used during inference num_samples : int, default = 20 Number of samples used during inference device : str, default = None Device to use for inference. If None, model will use the GPU if available. For larger model sizes `small`, `base`, and `large`; inference will fail if no GPU is available. context_length : int or None, default = None The context length to use in the model. Shorter context lengths will decrease model accuracy, but result in faster inference. If None, the model will infer context length from the data set length at inference time, but set it to a maximum of 512. optimization_strategy : {None, "onnx", "openvino"}, default = None Optimization strategy to use for inference on CPUs. If None, the model will use the default implementation. If `onnx`, the model will be converted to ONNX and the inference will be performed using ONNX. If ``openvino``, inference will be performed with the model compiled to OpenVINO. torch_dtype : torch.dtype or str, default = "auto" Torch data type for model weights, provided to ``from_pretrained`` method of Hugging Face AutoModels. data_loader_num_workers : int, default = 1 Number of worker processes to be used in the data loader. See documentation on ``torch.utils.data.DataLoader`` for more information. """ # default number of samples for prediction default_num_samples: int = 20 default_batch_size: int = 16 default_model_path = "amazon/chronos-t5-small" maximum_context_length = 512 def __init__( self, freq: Optional[str] = None, prediction_length: int = 1, path: Optional[str] = None, name: Optional[str] = None, eval_metric: str = None, hyperparameters: Dict[str, Any] = None, **kwargs, # noqa ): hyperparameters = hyperparameters if hyperparameters is not None else {} model_path_input = hyperparameters.get("model_path", self.default_model_path) self.model_path = MODEL_ALIASES.get(model_path_input, model_path_input) # TODO: automatically determine batch size based on GPU / memory availability self.batch_size = hyperparameters.get("batch_size", self.default_batch_size) self.num_samples = hyperparameters.get("num_samples", self.default_num_samples) self.device = hyperparameters.get("device") self.torch_dtype = hyperparameters.get("torch_dtype", "auto") self.data_loader_num_workers = hyperparameters.get("data_loader_num_workers", 1) self.optimization_strategy: Optional[Literal["onnx", "openvino"]] = hyperparameters.get( "optimization_strategy", None ) self.context_length = hyperparameters.get("context_length") if self.context_length is not None and self.context_length > self.maximum_context_length: logger.warning( f"Context length {self.context_length} exceeds maximum context length {self.maximum_context_length}." f"Context length will be set to {self.maximum_context_length}." ) self.context_length = self.maximum_context_length model_path_safe = str.replace(model_path_input, "/", "__") name = (name if name is not None else "Chronos") + f"[{model_path_safe}]" super().__init__( path=path, freq=freq, prediction_length=prediction_length, name=name, eval_metric=eval_metric, hyperparameters=hyperparameters, **kwargs, ) self.model_pipeline: Optional[Any] = None # of type OptimizedChronosPipeline def save(self, path: str = None, verbose: bool = True) -> str: pipeline = self.model_pipeline self.model_pipeline = None path = super().save(path=path, verbose=verbose) self.model_pipeline = pipeline return str(path) @classmethod def load(cls, path: str, reset_paths: bool = True, verbose: bool = True) -> "ChronosModel": model = load_pkl.load(path=os.path.join(path, cls.model_file_name), verbose=verbose) if reset_paths: model.set_contexts(path) return model def _is_gpu_available(self) -> bool: import torch.cuda return torch.cuda.is_available() def get_minimum_resources(self, is_gpu_available: bool = False) -> Dict[str, Union[int, float]]: minimum_resources = {"num_cpus": 1} # if GPU is available, we train with 1 GPU per trial if is_gpu_available: minimum_resources["num_gpus"] = MODEL_CONFIGS.get(self.model_path, {}).get("num_gpus", 0) return minimum_resources def load_model_pipeline(self, context_length: Optional[int] = None): from .chronos import OptimizedChronosPipeline gpu_available = self._is_gpu_available() if not gpu_available and MODEL_CONFIGS.get(self.model_path, {}).get("num_gpus", 0) > 0: raise RuntimeError( f"{self.name} requires a GPU to run, but no GPU was detected. " "Please make sure that you are using a computer with a CUDA-compatible GPU and " "`import torch; torch.cuda.is_available()` returns `True`." ) device = self.device or ("cuda" if gpu_available else "auto") pipeline = OptimizedChronosPipeline.from_pretrained( self.model_path, device_map=device, optimization_strategy=self.optimization_strategy, torch_dtype=self.torch_dtype, context_length=context_length or self.context_length, ) self.model_pipeline = pipeline def _fit( self, train_data: TimeSeriesDataFrame, val_data: Optional[TimeSeriesDataFrame] = None, time_limit: int = None, **kwargs, ) -> None: self._check_fit_params() def _get_inference_data_loader( self, data: TimeSeriesDataFrame, context_length: int, num_workers: int = 1, ): import torch chronos_dataset = ChronosInferenceDataset( target_df=data, target_column=self.target, context_length=context_length, ) return torch.utils.data.DataLoader( chronos_dataset, batch_size=self.batch_size, shuffle=False, num_workers=num_workers, ) def _predict( self, data: TimeSeriesDataFrame, known_covariates: Optional[TimeSeriesDataFrame] = None, **kwargs, ) -> TimeSeriesDataFrame: import torch # We defer initialization of the model pipeline. i.e., the model is only loaded to device memory # during inference. We also infer the maximum length of the time series in the inference data set # and use that to determine the context length of the model. If the context length is specified # during initialization, this is always used. If not, the context length is set to the longest # item length. The context length is always capped by self.maximum_context_length. context_length = self.context_length or min( data.num_timesteps_per_item().max(), self.maximum_context_length, ) # verbosity = kwargs.get("verbosity", 2) # for logger_name in logging.root.manager.loggerDict: # if "lightning" in logger_name: # pl_logger = logging.getLogger(logger_name) # pl_logger.setLevel(logging.ERROR if verbosity <= 3 else logging.INFO) # load model pipeline to device memory self.load_model_pipeline(context_length=context_length) self.model_pipeline.model.eval() with torch.inference_mode(): prediction_samples = ( self.model_pipeline.predict( batch, prediction_length=self.prediction_length, num_samples=self.num_samples, limit_prediction_length=False, ) .detach() .cpu() .numpy() for batch in self._get_inference_data_loader( data=data, num_workers=self.data_loader_num_workers, context_length=context_length, ) ) samples = np.concatenate([c.T for c in chain.from_iterable(prediction_samples)], axis=0) mean = samples.mean(axis=-1, keepdims=True) quantiles = np.quantile(samples, self.quantile_levels, axis=-1).T df = pd.DataFrame( np.concatenate([mean, quantiles], axis=1), columns=["mean"] + [str(q) for q in self.quantile_levels], index=get_forecast_horizon_index_ts_dataframe(data, self.prediction_length), ) return TimeSeriesDataFrame(df)