Do you remember the last time you dived deep into a coding session, losing track of time, just for the sheer fun of it? It's not about deadlines, efficiency, or even perfection. It's the allure of experimentation, breaking, and rebuilding, all while discovering new paths.
Today, I'm going to take you on a thrilling coding adventure inspired by a LinkedIn code snippet, where I tangled with FastAPI, River, Watchdog, and Tenacity. Ready? Buckle up!
⚠️ Disclaimer:
For the adventurous spirit: Don\'t attempt this at home \`{home == production}\`. For clarity, the code has been condensed into three files.
Once Upon a FastAPI...
It all started when my eyes caught a captivating MantisNLP's LinkedIn post. The post showcased FastAPI
code snippets intended for educational purposes. What intrigued me was the swift model reloading. In the snippet, which carried out an ML prediction, the authors first loaded the model and then executed the prediction.
This stirred a question in me: Could I modify an API's model state based on an event change? If achievable, could we serve a machine learning model that's a shell or semi-trained and continues learning over time?
Using the penguins classification dataset as my canvas, I envisioned something like this:
from pydantic import BaseModel
from fastapi import FastAPI
app = FastAPI(title="😂 Pure Joy")
class Attributes(BaseModel):
island:str
bill_length_mm: float | None
bill_depth_mm: float | None
flipper_length_mm: float | None
body_mass_g: float | None
sex: str | None
class LearnAttributes(BaseModel):
attributes: Attributes
species: str
@app.on_event("startup")
def startup_event():
# check and use if the ml model exists else create one
...
@app.post("/predict")
def predict(attributes: Attributes) -> dict:
# predict
...
@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> dict
# learn and update the ml model
...
Swimming the River of Learning
But just tweaking the API felt too straightforward. I wanted both the API and the machine learning model to be dynamic, evolving in tandem. Enter River
, a tool tailor-made for online machine learning. As new data streamed in, the model continually refines its skills.
Here's a basic model:
# ml.py
from river import compose
from river import preprocessing, stats
from river import naive_bayes
def penguins_model():
island_transformation = compose.Select("island") | preprocessing.OneHotEncoder(
drop_first=True
)
sex_transformation = (
compose.Select("sex")
| preprocessing.StatImputer(("sex", stats.Mode()))
| preprocessing.OneHotEncoder(drop_first=True)
)
numeric_transformation = compose.Select(
"bill_length_mm",
"bill_depth_mm",
"flipper_length_mm",
"body_mass_g",
) | preprocessing.StatImputer(
("bill_length_mm", stats.Mean()),
("bill_depth_mm", stats.Mean()),
("flipper_length_mm", stats.Mean()),
("body_mass_g", stats.Mean()),
)
model = (
island_transformation + sex_transformation + numeric_transformation
| naive_bayes.MultinomialNB(alpha=1)
)
return model
Putting the Lego pieces together into a complete, testable code.
from pathlib import Path
import pickle
...
MODEL_FILE = "model/naive.pickle"
app = FastAPI(title="😂 Pure Joy")
...
@app.on_event("startup")
def startup_event():
model_file = Path(MODEL_FILE)
if not model_file.exists():
from ml import penguins_model
app.state.ml = penguins_model()
else:
app.state.ml = pickle.loads(model_file.read_bytes())
@app.post("/predict")
def predict(attributes: Attributes) -> str | None:
X = attributes.model_dump()
return app.state.ml.predict_one(X)
@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> dict[str, str]:
X = learn_attributes.attributes.model_dump()
y = learn_attributes.species
y_pred = app.state.ml.predict_one(X)
app.state.ml.learn_one(X, y)
Path(MODEL_FILE).write_bytes(pickle.dumps(app.state.ml))
return {"status": f"we learned {y}. We initially predicted {y_pred}"}
Boom! That was easy. Let's run
uvicorn app:app --reload
🎉 Hurrah ... The code worked seamlessly. That is, until multiple workers entered the fray, leading to unexpected pandemonium. Why? The reason lies in uvicorn
's workers: each operates independently, causing utter chaos, especially when executing:
uvicorn app:app --reload --workers 4
To better illustrate the conundrum, let's introduce counters for both predictions and learnings:
...
@app.on_event("startup")
def startup_event():
model_file = Path(MODEL_FILE)
if not model_file.exists():
...
ml.meta = {"predicted": 0,
"learned": 0}
app.state.ml = ml
else:
app.state.ml = pickle.loads(model_file.read_bytes())
@app.post("/predict")
def predict(attributes: Attributes) -> dict[str, str | int | None]:
X = attributes.model_dump()
y_pred = app.state.ml.predict_one(X)
app.state.ml.meta["predicted"] += 1
return {"predicted": y_pred,
**app.state.ml.meta,}
@app.post("/learn")
def learn(learn_features: LearnAttributes) -> dict[str, str|int]:
...
app.state.ml.meta["learned"] += 1
Path(MODEL_FILE).write_bytes(pickle.dumps(app.state.ml)
return {"status": f"we learned {y}. We initially predicted {y_pred}",
**app.state.ml.meta,}
The tally for predictions and learnings is inexplicably out of sync.
😭 My joy short lived. The core issue was that our ML model's knowledge wasn't uniformly distributed across workers. It felt like having several chefs in a kitchen, each adding their unique flavour, unaware of the others' contributions.
The Watchdog's Bark and Bite
That's when Watchdog
barked into the scene. Its event handling capabilities ensured machine learning model updates happened seamlessly based on file changes across workers. Let's add the modification.
...
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
MODEL_FILE = "model/naive.pickle"
app = FastAPI(title="😂 Pure Joy")
class FileHandler(FileSystemEventHandler):
def on_modified(self, event):
print(f"path={event.src_path} event={event.event_type}")
app.state.ml = pickle.loads(Path(MODEL_FILE).read_bytes())
...
handler = FileHandler()
observer = Observer()
@app.on_event("startup")
def startup_event():
model_file = Path(MODEL_FILE)
if not model_file.exists():
...
else:
...
observer.schedule(handler, path=MODEL_FILE, recursive=False)
observer.start()
@app.on_event("shutdown")
def shutdown_event():
observer.stop()
observer.join()
@app.post("/predict")
def predict(attributes: Attributes) -> dict[str, str | int | None]:
...
@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> dict[str, str | int]:
...
🎉 Hurrah! Again … It worked. But watchdog
had its quirks. It bite back. Workers still occasionally collided, each clamouring to update the model and sometimes finding the file "in use" error. It is chaotic, to say the least. But isn't that what coding for fun is all about? Facing challenges and finding creative solutions.
Tenacity to the Rescue!
That's where Tenacity
strutted in. Instead of letting workers clash and give up, Tenacity made them... well, tenacious! It equipped them with smart retries. If a worker faced an "in-use" error, it patiently waited and retried, ensuring that every update was eventually accounted for.
We are going to update our machine learning file to have the loading and saving in order to add the tenaciousness. More over, we can move code arround to cleanup a little. The final code looks like:
Getting schemas out
# schemas.py
from pydantic import BaseModel
class Attributes(BaseModel):
island: str
bill_length_mm: float | None
bill_depth_mm: float | None
flipper_length_mm: float | None
body_mass_g: float | None
sex: str | None
class LearnAttributes(BaseModel):
attributes: Attributes
species: str
class Predictions(BaseModel):
species: str | None
predicted: int
learned: int
class Learnings(BaseModel):
status: str
predicted: int
learned: int
Adding model io(input and out) function that is tenacious
# ml.py
import pickle
from pathlib import Path
from typing import Literal
...
from tenacity import retry, retry_if_exception_type
...
@retry(retry=retry_if_exception_type(EOFError))
def ml_io(
model_file: Path,
mode: Literal["wb", "rb"] = "rb",
ml_object: compose.Pipeline | None = None,
):
if mode == "rb" and not model_file.exists():
ml = penguins_model()
ml.meta = {
"predicted": 0,
"learned": 0,
}
# save the first local copy (god like ...)
model_file.write_bytes(pickle.dumps(ml_object))
return ml
elif mode == "rb" and model_file.exists():
return pickle.loads(model_file.read_bytes())
elif mode == "wb":
return model_file.write_bytes(pickle.dumps(ml_object))
else:
NotImplemented(f"mode can only be `wb` or `rb`")
And for our application:
# app.py
from pathlib import Path
from fastapi import FastAPI
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from ml import ml_io
from schemas import Attributes, LearnAttributes, Predictions, Learnings
MODEL_FILE = "model/naive.pickle"
app = FastAPI(title="😂 Pure Joy")
class FileHandler(FileSystemEventHandler):
def on_modified(self, event):
# print(f"path={event.src_path} event={event.event_type}")
app.state.ml = ml_io(model_file=Path(MODEL_FILE), mode="rb")
handler = FileHandler()
observer = Observer()
@app.on_event("startup")
def startup_event():
model_file = Path(MODEL_FILE)
app.state.ml = ml_io(model_file=model_file, mode="rb")
observer.schedule(handler, path=MODEL_FILE, recursive=False)
observer.start()
@app.on_event("shutdown")
def shutdown_event():
observer.stop()
observer.join()
@app.post("/predict")
def predict(attributes: Attributes) -> Predictions:
X = attributes.model_dump()
y_pred = app.state.ml.predict_one(X)
app.state.ml.meta["predicted"] += 1
return {
"species": y_pred,
**app.state.ml.meta,
}
@app.post("/learn")
def learn(learn_attributes: LearnAttributes) -> Learnings:
X = learn_attributes.attributes.model_dump()
y = learn_attributes.species
y_pred = app.state.ml.predict_one(X)
app.state.ml.learn_one(X, y)
app.state.ml.meta["learned"] += 1
ml_io(model_file=Path(MODEL_FILE), mode="wb", ml_object=app.state.ml)
return {
"status": f"learned {y}. Initially predicted {y_pred}",
**app.state.ml.meta,
}
The Sunset of Our Tale
This journey was not about creating the code for production. It was a thrilling adventure through challenges and solutions, a testament to the beauty of coding for fun. I experimented, learned, broke things, and eventually built something magnificent.
The biggest takeaway? Embrace the chaos and curiosity. Revel in the unpredictability. And remember, coding for fun should always be fun. And when in doubt, be tenacious!
Until then, keep on coding curiously
Top comments (0)