ExponentialSmoothing + IsolationForest + FastAPI
project/
app/
main.py
schemas.py
services/
preprocessing.py
training.py
inference.py
utils/
file_io.py
converters.py
models/
trend_model.pkl
anomaly_model.pkl
data/
raw/
processed/
results/
requirements.txt
from fastapi import FastAPI, UploadFile
from app.schemas import AnomalyRequest
from app.services.preprocessing import load_and_preprocess
from app.services.training import train_models
from app.services.inference import run_inference
from app.utils.file_io import save_raw_file, save_result_file
from app.utils.converters import csv_to_json
app = FastAPI()
@app.post("/detect-anomaly")
async def detect_anomaly(req: AnomalyRequest, file: UploadFile):
raw_path = save_raw_file(file)
df = load_and_preprocess(raw_path)
ts_model, anomaly_model = train_models(df)
result_df = run_inference(df, ts_model, anomaly_model)
output_path = save_result_file(result_df)
json_result = csv_to_json(output_path)
return {
"status": "success",
"results": json_result
}
from pydantic import BaseModel
class AnomalyRequest(BaseModel):
sensor_name: str
sampling_rate: int
import os
import shutil
def save_raw_file(file):
os.makedirs("data/raw", exist_ok=True)
path = f"data/raw/{file.filename}"
with open(path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return path
def save_result_file(df):
os.makedirs("data/results", exist_ok=True)
path = "data/results/anomaly_result.csv"
df.to_csv(path, index=False)
return path
import pandas as pd
def csv_to_json(path):
df = pd.read_csv(path)
return df.to_dict(orient="records")
import pandas as pd
def load_and_preprocess(path):
df = pd.read_csv(path)
df["timestamp"] = pd.to_datetime(df["timestamp"])
df = df.set_index("timestamp")
df["value"] = df["value"].interpolate()
return df
import pickle
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from sklearn.ensemble import IsolationForest
def train_models(df):
y = df["value"]
ts_model = ExponentialSmoothing(
y,
trend="add",
seasonal="add",
seasonal_periods=24
).fit()
df["baseline"] = ts_model.fittedvalues
df["residual"] = df["value"] - df["baseline"]
anomaly_model = IsolationForest(contamination=0.03)
anomaly_model.fit(df[["residual"]])
with open("app/models/trend_model.pkl", "wb") as f:
pickle.dump(ts_model, f)
with open("app/models/anomaly_model.pkl", "wb") as f:
pickle.dump(anomaly_model, f)
return ts_model, anomaly_model
def run_inference(df, ts_model, anomaly_model):
df["baseline"] = ts_model.fittedvalues
df["residual"] = df["value"] - df["baseline"]
df["score"] = anomaly_model.decision_function(df[["residual"]])
df["anomaly"] = anomaly_model.predict(df[["residual"]])
df["anomaly"] = df["anomaly"].map({1: 0, -1: 1})
return df.reset_index()
fastapi
uvicorn
pandas
scikit-learn
statsmodels
python-multipart
pip install -r requirements.txt
uvicorn app.main:app --reload
POST /detect-anomaly 로
sensor_name, sampling_rate, file(csv) 전송 가능.