UnionML
UnionML is an open source MLOps framework that reduces the boilerplate, complexity, and friction that comes with building models and deploying them to production. Taking inspiration from web protocols, UnionML asks the question:
Is it possible to define a standard set of functions/methods for machine learning that can be reused in many different contexts, from model training to prediction?
UnionML aims to unify the ever-evolving ecosystem of machine learning and data tools into a single interface for expressing microservices as Python functions.
You can create UnionML Apps by defining a few core methods that are automatically bundled into ML microservices, starting with model training and offline/online prediction:
Brought to you by the Union.ai team, UnionML is built on top of Flyte to provide a high-level interface for productionizing your ML models so that you can focus on curating a better dataset and improving your models.
Installation#
pip install unionml
Quickstart#
A UnionML app is composed of two core classes: a Dataset
and a
Model
.
In this example, we’ll build a minimal UnionML app that classifies images of handwritten digits into their corresponding digit labels using sklearn, pytorch, or keras.
Create a python file called app.py
, import app dependencies, and define dataset
and model
objects.
from typing import List
import pandas as pd
from sklearn.datasets import load_digits
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from unionml import Dataset, Model
dataset = Dataset(name="digits_dataset", test_size=0.2, shuffle=True, targets=["target"])
model = Model(name="digits_classifier", init=LogisticRegression, dataset=dataset)
Install pytorch:
pip install torch
from typing import List
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.datasets import load_digits
from sklearn.metrics import accuracy_score
from unionml import Dataset, Model
# define a simple pytorch module
class PytorchModel(nn.Module):
def __init__(self, in_dims: int, hidden_dims: int, out_dims: int):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(in_dims, hidden_dims),
nn.ReLU(),
nn.Linear(hidden_dims, out_dims),
)
def forward(self, features):
return F.softmax(self.layers(features), dim=1)
dataset = Dataset(name="digits_dataset", test_size=0.2, shuffle=True, targets=["target"])
model = Model(name="digits_classifier", init=PytorchModel, dataset=dataset)
Install keras via tensorflow:
pip install tensorflow
from typing import List
import pandas as pd
from sklearn.datasets import load_digits
from sklearn.metrics import accuracy_score
from tensorflow import keras
from tensorflow.keras import Sequential
from unionml import Dataset, Model
# define a simple keras model
def build_keras_model(
in_dims: int,
hidden_dims: int,
out_dims: int,
) -> Sequential:
keras_model = Sequential()
keras_model.add(keras.layers.Dense(hidden_dims, input_shape=(in_dims,), activation="relu"))
keras_model.add(keras.layers.Dense(out_dims, activation="softmax"))
return keras_model
dataset = Dataset(name="digits_dataset", test_size=0.2, shuffle=True, targets=["target"])
model = Model(name="digits_classifier", init=build_keras_model, dataset=dataset)
Define App Methods#
Specify the core functions for training and prediction with the decorators
exposed by the dataset
and model
objects:
@dataset.reader
def reader() -> pd.DataFrame:
return load_digits(as_frame=True).frame
@model.trainer
def trainer(estimator: LogisticRegression, features: pd.DataFrame, target: pd.DataFrame) -> LogisticRegression:
return estimator.fit(features, target.squeeze())
@model.predictor
def predictor(estimator: LogisticRegression, features: pd.DataFrame) -> List[float]:
return [float(x) for x in estimator.predict(features)]
@model.evaluator
def evaluator(estimator: LogisticRegression, features: pd.DataFrame, target: pd.DataFrame) -> float:
return float(accuracy_score(target.squeeze(), predictor(estimator, features)))
First we’ll define some helper functions to convert dataframes to tensors
def process_features(features: pd.DataFrame) -> torch.Tensor:
return torch.from_numpy(features.values).float()
def process_target(target: pd.DataFrame) -> torch.Tensor:
return torch.from_numpy(target.squeeze().values).long()
Then let’s define the app methods
@dataset.reader
def reader() -> pd.DataFrame:
return load_digits(as_frame=True).frame
@model.trainer
def trainer(
module: PytorchModel,
features: pd.DataFrame,
target: pd.DataFrame,
*,
# keyword-only arguments define trainer parameters
batch_size: int,
n_epochs: int,
learning_rate: float,
) -> PytorchModel:
opt = torch.optim.Adam(module.parameters(), lr=learning_rate)
for _ in range(n_epochs):
for (X, y) in zip(
torch.split(process_features(features), batch_size),
torch.split(process_target(target), batch_size),
):
opt.zero_grad()
loss = F.cross_entropy(module(X), y)
loss.backward()
opt.step()
return module
@model.predictor
def predictor(module: PytorchModel, features: pd.DataFrame) -> List[float]:
return [float(x) for x in module(process_features(features)).argmax(1)]
@model.evaluator
def evaluator(module: PytorchModel, features: pd.DataFrame, target: pd.DataFrame) -> float:
return float(accuracy_score(target.squeeze(), predictor(module, features)))
@dataset.reader
def reader() -> pd.DataFrame:
return load_digits(as_frame=True).frame
@model.trainer
def trainer(
keras_model: Sequential,
features: pd.DataFrame,
target: pd.DataFrame,
*,
# keyword-only arguments define trainer parameters
batch_size: int,
n_epochs: int,
learning_rate: float,
) -> Sequential:
keras_model.compile(
loss="categorical_crossentropy",
optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
metrics=["accuracy"],
run_eagerly=True,
)
keras_model.fit(
features.values,
keras.utils.to_categorical(target.values),
batch_size=batch_size,
epochs=n_epochs,
)
return keras_model
@model.predictor
def predictor(keras_model: Sequential, features: pd.DataFrame) -> List[float]:
return [float(x) for x in keras_model.predict(features).argmax(1)]
@model.evaluator
def evaluator(keras_model: Sequential, features: pd.DataFrame, target: pd.DataFrame) -> float:
return float(accuracy_score(target.squeeze(), predictor(keras_model, features)))
Train and Predict Locally#
Invoke train()
to train a model and predict()
to generate predictions.
if __name__ == "__main__":
model_object, metrics = model.train(hyperparameters={"C": 1.0, "max_iter": 10000})
predictions = model.predict(features=load_digits(as_frame=True).frame.sample(5, random_state=42))
print(model_object, metrics, predictions, sep="\n")
# save model to a file, using joblib as the default serialization format
model.save("/tmp/model_object.joblib")
if __name__ == "__main__":
model_object, metrics = model.train(
hyperparameters={"in_dims": 64, "hidden_dims": 32, "out_dims": 10},
trainer_kwargs={"batch_size": 512, "n_epochs": 100, "learning_rate": 0.0003},
)
predictions = model.predict(features=load_digits(as_frame=True).frame.sample(5, random_state=42))
print(model_object, metrics, predictions, sep="\n")
# save model to a file using torch.save
model.save("/tmp/model_object.pt")
if __name__ == "__main__":
model_object, metrics = model.train(
hyperparameters={"in_dims": 64, "hidden_dims": 32, "out_dims": 10},
trainer_kwargs={"batch_size": 512, "n_epochs": 100, "learning_rate": 0.0003},
)
predictions = model.predict(features=load_digits(as_frame=True).frame.sample(5, random_state=42))
print(model_object, metrics, predictions, sep="\n")
# save model to a file using torch.save
model.save("/tmp/model_object.h5")
Serve Seamlessly with FastAPI#
UnionML integrates with FastAPI to automatically
create /train/
and /predict/
endpoints.
Install unionml with fastapi:
pip install unionml[fastapi]
Start a server with unionml serve
and call the app
endpoints with the requests
library.
Bind a FastAPI app
to the model
object with model.serve
from fastapi import FastAPI
app = FastAPI()
model.serve(app)
Start the server, assuming the UnionML app is in a app.py
script
unionml serve app:app --reload --model-path /tmp/model_object.joblib
Bind a FastAPI app
to the model
object with model.serve
from fastapi import FastAPI
app = FastAPI()
model.serve(app)
Start the server, assuming the UnionML app is in a main.py
script
unionml serve app:app --reload --model-path /tmp/model_object.pt
Bind a FastAPI app
to the model
object with model.serve
from fastapi import FastAPI
app = FastAPI()
model.serve(app)
Start the server, assuming the UnionML app is in a main.py
script
unionml serve app:app --reload --model-path /tmp/model_object.h5
Important
The first argument to unionml serve
is a :
-separated string where the first
part is the module name of the app script, and the second part is the variable
name of the FastAPI app.
Then you can invoke the endpoints using the requests
library, e.g. in a separate
client.py
script:
import requests
from sklearn.datasets import load_digits
digits = load_digits(as_frame=True)
features = digits.frame[digits.feature_names]
prediction_response = requests.post(
"http://127.0.0.1:8000/predict",
json={"features": features.sample(5, random_state=42).to_dict(orient="records")},
)
print(prediction_response.text)
What Next?#
Learn how to leverage the full power of UnionML 🦾 in the Basics guide.
Want to contribute?
Check out the Contributing Guide.