Scheduling#

UnionML enables you to schedule training and prediction jobs on a Flyte cluster so that it runs at a particular time or on a particular cadence.

Prerequisites

  • Understand the basics of how to create a UnionML app.

  • Spin up a Flyte Cluster, which will execute the scheduled jobs.

Trainer Scheduling#

Scheduling a training job is as easy as invoking the schedule_training() method after you’ve defined all of your UnionML app components. For example, say that you’re training a LogisticRegression model and have the following requirements for your schedule:

  • The model must be re-trained at 12am every day

  • Use the latest data snapshot as a csv file from some blob store like AWS S3.

The first thing we need to do is define the minimum components that you need to implement for a UnionML app (assuming you’re working with pandas.DataFrame objects), namely the reader(), trainer(), predictor(), and evaluator() functions.

from datetime import datetime
from typing import List

import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

from unionml import Dataset, Model

dataset = Dataset(targets=["target"])
model = Model(dataset=dataset, init=LogisticRegression)


@dataset.reader
def reader(time: datetime) -> pd.DataFrame:
    return pd.read_csv(
        f"s3://bucket/path/to/dataset/{time.strftime('YYYYMMDD')}/dataset.csv"
    )

@model.trainer
def trainer(
    estimator: LogisticRegression,
    features: pd.DataFrame,
    target: pd.DataFrame,
) -> LogisticRegression:
    return estimator.fit(features, target.squeeze())


@model.predictor
def predictor(
    estimator: LogisticRegression,
    features: pd.DataFrame,
) -> List[int]:
    return [int(x) for x in estimator.predict(features)]


@model.evaluator
def evaluator(
    estimator: LogisticRegression,
    features: pd.DataFrame,
    target: pd.DataFrame,
) -> float:
    return float(accuracy_score(target.squeeze(), predictor(estimator, features)))
/home/docs/checkouts/readthedocs.org/user_builds/unionml/envs/stable/lib/python3.8/site-packages/tqdm/auto.py:22: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm

In the reader function above, we read the csv file from s3 using pandas. Note that the s3 path containing the dataset conforms to a particular format that depends on the time datetime input.

Next, we use the schedule_training() method to lazily configure a training schedule to the app, which will be deployed to the remote Flyte Cluster in the app deployment step (we’ll get to that later in this guide 👇).

model.schedule_training(
    name="cron_daily_training",
    expression="0 0 * * *",  # train at 12am every day
    reader_time_arg="time",  # feed the schedule kickoff-time `time` to the dataset.reader function
    hyperparameters={"C": 0.1},
)

We’ve named the schedule daily_training, which is scheduled to run at 12am every morning. The reader_time_arg="time" indicates which argument in the reader function we want to use for feeding in the scheduled kickoff-time, which in turn determines which data is pulled. We also feed in additional hyperparameters that we want to use to train the model.

Note

UnionML doesn’t have any opinions about how the data got to the specified s3 path. This is considered to be outside of the scope of UnionML and assumes that you have some other process or application that handles the ETL process that produces the data.

Specifying Multiple Schedules#

Alternatively, you can specify multiple different kinds of schedules, as long as their names are unique:

from datetime import timedelta

# equivalent to the above schedule, except here use the non-standard croniter syntax.
model.schedule_training(
    name="non_standard_training_schedule",
    expression="@daily",
    reader_time_arg="time",
    hyperparameters={"C": 0.1},
)

# run the schedule with a fixed rate, if you don't care about the schedule running at
# a particular time of day.
model.schedule_training(
    name="fixed_rate_training_schedule",
    fixed_rate=timedelta(days=1),
    reader_time_arg="time",
    hyperparameters={"C": 0.1},
)

In the two code snippets above we specify daily schedules in three different ways:

  1. cron_daily_training: standard cron syntax

  2. non_standard_training_schedule: non-standard cronitor syntax

  3. fixed_rate_training_schedule: a timedelta object specifying the job cadence.

Predictor Scheduling#

The syntax for specifying batch prediction jobs is similar.

When using batch prediction schedules, you need to ensure that your reader() component is correctly factored to account for labeled data (for training and backtesting) and unlabeled data (for prediction). For example, you can specify a flag that allows you to differentiate between training and prediction settings:

@dataset.reader
def reader(time: datetime, labeled: bool = True) -> pd.DataFrame:
    uri_prefix = "labeled_datasets" if labeled else "unlabeled_datasets"
    return pd.read_csv(f"s3://bucket/path/to/{uri_prefix}/{time.strftime('YYYYMMDD')}/dataset.csv")

We’ve added a labeled argument to the reader function, which determines the s3 prefix that we use to fetch the data. Next, we make use of this new argument in the batch prediction schedule via the schedule_prediction() method.

First, however, we need to train a model locally so that the schedule can use a specific model object. Calling train() will assign the artifact property, which schedule_prediction() will use under the hood.

from datetime import datetime
from typing import List

# train a model locally
model.train(hyperparameters={"C": 0.1, "max_iter": 5000}, time=datetime.now(), labeled=True)

# this will use the model object in the model.artifact property
model.schedule_prediction(
    name="daily_predictions",
    expression="0 0 * * *",
    reader_time_arg="time",
    labeled=False,
)

Note that we specify a labeled keyword argument to indicate that we’re working with unlabeled data. Just like the train(), predict(), remote_train(), remote_predict() methods, the scheduling methods take in the reader function arguments as keyword arguments.

Specifying Models in Multiple Ways#

You can specify the model you want to use for batch prediction in three other ways.

Explicitly Passing in a model_object

model_object, _ = model.train(hyperparameters={"C": 0.1}, time=datetime.now(), labeled=True)

model.schedule_prediction(
    name="daily_predictions_with_model_object",
    expression="0 0 * * *",
    reader_time_arg="time",
    model_object=model_object,
    labeled=False,
)
/home/docs/checkouts/readthedocs.org/user_builds/unionml/envs/stable/lib/python3.8/site-packages/sklearn/linear_model/_logistic.py:458: ConvergenceWarning: lbfgs failed to converge (status=1):
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(

Note that model_object could have originated from anywhere, not just from model.train, as long as the model type is consistent with the type specified in trainer().

Passing in a filepath to a serialized model object

If you have a serialized model object in a file, you can pass in the file path through the model_file argument. As with the model_object option, the underlying model object in the serialized file must be the same model type as defined in the trainer().

You may also pass in additional keyword arguments via loader_kwargs, which will be forwarded to that model type’s loader() component. See the _default_loader() and _default_saver() methods for more details.

from tempfile import NamedTemporaryFile


with NamedTemporaryFile() as f:
    # saves model.artifact.model_object to a file, forwarding kwargs to joblib.dump
    model.save(f.name, compress=3)

    # schedule prediction using the model object in the file, forward kwargs to joblib.load
    model.schedule_prediction(
        name="daily_predictions_with_model_file",
        expression="0 0 * * *",
        reader_time_arg="time",
        model_file=f.name,
        loader_kwargs={"mmap_mode": None},
        labeled=False,
    )

Note

The only supported model types, by default, are sklearn, pytorch, and keras models. If you’re working with a different model type, you’ll need to implement the saver() and loader() components.

Referencing a model_version

You can use the remote_list_model_versions() methods to get version string identifiers of models that you previously trained on a Flyte Cluster. In the case that your model version isn’t associated with the latest version of your UnionML app, you’ll also need to pass in an app_version argument, which is the output of invoking remote_deploy(). The app version can also be found in the Flyte console UI, which is essentially the version string associated with any of the workflows that UnionML registers for you when you deploy your app.

model.schedule_prediction(
    name="daily_predictions_with_remote_model",
    expression="0 0 * * *",
    reader_time_arg="time",
    model_version="<MODEL_VERSION>",
    app_version="<APP_VERSION>",
    loader_kwargs={},
    labeled=False,
)

Deploying Schedules to Flyte#

Once we’re happy with the schedule definitions, we can simply deploy them with unionml.model.Model.remote_deploy() or the unionml deploy CLI tool.

model.remote(project="my-unionml-app", domain="development")
model.remote_deploy(schedule=True)

Note

Make sure you follow the Deploying to Flyte guide to understand how deployment works.

Both of these options provide a schedule flag, which is True by default. Specifying schedule=false in the programmatic API or --no-schedule in the CLI will disable the deployment and activation of these scheduled jobs.

Once the schedules are successfully deployed, you can go to the Flyte Console UI to check out their execution status and progress. You can also inspect and fetch them programmatically. For the training schedules, you can do something like:

from unionml import ModelArtifact

# get the latest FlyteWorkflowExecutions associated with training schedule runs
latest_training_execution, *_ = model.remote_list_scheduled_training_runs("daily_training")

# fetch the latest model artifact
model_artifact: ModelArtifact = model.remote_fetch_model(latest_training_execution)
model_object = model_artifact.model_object

# use the model_object for some downstream purpose
...

And something similar for prediction schedules

# get the latest FlyteWorkflowExecution associated with prediction schedule runs
latest_prediction_execution, *_ = model.remote_list_scheduled_prediction_runs("daily_predictions")

# fetch the latest prediction
predictions = model.remote_fetch_predictions(latest_prediction_execution)

# use the predictions for some downstream purpose
...

Manually Activating and Deactivating Schedules#

By default, deploying the UnionML App will deploy and activate all the declared schedules. You can manually deactivate and re-activate them by using the programmatic API:

Or the CLI tool:

Summary#

In this guide, you learned how to:

  • Schedule training and prediction jobs.

  • Deploy them to a Flyte cluster.

  • Get the metadata and outputs of executed schedules.

  • Deactivate and re-activate them manually.