test

H_wall·2025년 12월 24일
from pathlib import Path
import numpy as np
import pandas as pd
from asammdf import MDF


def pick_value_at_time(time_s: np.ndarray, value: np.ndarray, t: float, mode: str = "interp"):
    """
    time_s: seconds array (monotonic increasing expected)
    value : signal samples (same length as time_s)
    t     : target time in seconds
    mode  : interp | nearest | left | right
    """
    if len(time_s) == 0:
        return np.nan

    # out of range
    if t < time_s[0] or t > time_s[-1]:
        return np.nan

    idx = np.searchsorted(time_s, t, side="left")

    if mode == "right":
        return value[min(idx, len(value) - 1)]

    if mode == "left":
        return value[max(idx - 1, 0)]

    if mode == "nearest":
        if idx == 0:
            return value[0]
        if idx >= len(time_s):
            return value[-1]
        left_i = idx - 1
        right_i = idx
        return value[left_i] if (t - time_s[left_i]) <= (time_s[right_i] - t) else value[right_i]

    # default: linear interpolation
    if idx == 0:
        return value[0]
    if idx >= len(time_s):
        return value[-1]
    t0, t1 = time_s[idx - 1], time_s[idx]
    y0, y1 = value[idx - 1], value[idx]
    if t1 == t0:
        return y0
    return y0 + (y1 - y0) * (t - t0) / (t1 - t0)


def extract_signal_at_t(mf4_path: Path, signal_name: str, t_sec: float, mode: str = "interp"):
    """
    Returns:
      dict with file, value, and some debug meta.
    """
    mdf = MDF(str(mf4_path))

    # 채널명이 완전 일치하지 않을 수 있어, 정확히 안 맞으면 예외를 잡아서 알려줌
    try:
        sig = mdf.get(signal_name)
    except Exception as e:
        return {
            "file": mf4_path.name,
            "value": np.nan,
            "status": f"signal_not_found: {e.__class__.__name__}",
        }

    time_s = np.asarray(sig.timestamps, dtype=float)
    values = np.asarray(sig.samples)

    v = pick_value_at_time(time_s, values, t_sec, mode=mode)

    return {
        "file": mf4_path.name,
        "value": v,
        "status": "ok" if np.isfinite(v) else "out_of_range_or_empty",
        "t_sec": t_sec,
        "mode": mode,
        "t_min": float(time_s[0]) if len(time_s) else np.nan,
        "t_max": float(time_s[-1]) if len(time_s) else np.nan,
        "sample_count": int(len(time_s)),
    }


def batch_extract(folder: str, pattern: str, signal_name: str, t_sec: float, mode: str = "interp"):
    folder_path = Path(folder)
    mf4_files = sorted(folder_path.glob(pattern))

    rows = []
    for f in mf4_files:
        rows.append(extract_signal_at_t(f, signal_name, t_sec, mode=mode))

    df = pd.DataFrame(rows)
    return df


if __name__ == "__main__":
    # ===== 사용자 설정 =====
    folder = r"./mf4"              # MF4들이 있는 폴더
    pattern = "*.mf4"              # 필요하면 "*.MF4" 등으로 변경
    signal_name = "YourSignalName" # 예: "BMS_VBAT", "VehicleSpeed" 등
    t_sec = 12.345                 # n초
    mode = "interp"                # interp | nearest | left | right
    # ======================

    df = batch_extract(folder, pattern, signal_name, t_sec, mode=mode)

    # 결과 저장
    out_csv = Path(folder) / f"extract_{signal_name}_at_{t_sec:.3f}s_{mode}.csv"
    df.to_csv(out_csv, index=False, encoding="utf-8-sig")

    print("Saved:", out_csv)
    print(df[["file", "value", "status"]].to_string(index=False))

0개의 댓글