from pathlib import Path
import numpy as np
import pandas as pd
from asammdf import MDF
def pick_value_at_time(time_s: np.ndarray, value: np.ndarray, t: float, mode: str = "interp"):
"""
time_s: seconds array (monotonic increasing expected)
value : signal samples (same length as time_s)
t : target time in seconds
mode : interp | nearest | left | right
"""
if len(time_s) == 0:
return np.nan
# out of range
if t < time_s[0] or t > time_s[-1]:
return np.nan
idx = np.searchsorted(time_s, t, side="left")
if mode == "right":
return value[min(idx, len(value) - 1)]
if mode == "left":
return value[max(idx - 1, 0)]
if mode == "nearest":
if idx == 0:
return value[0]
if idx >= len(time_s):
return value[-1]
left_i = idx - 1
right_i = idx
return value[left_i] if (t - time_s[left_i]) <= (time_s[right_i] - t) else value[right_i]
# default: linear interpolation
if idx == 0:
return value[0]
if idx >= len(time_s):
return value[-1]
t0, t1 = time_s[idx - 1], time_s[idx]
y0, y1 = value[idx - 1], value[idx]
if t1 == t0:
return y0
return y0 + (y1 - y0) * (t - t0) / (t1 - t0)
def extract_signal_at_t(mf4_path: Path, signal_name: str, t_sec: float, mode: str = "interp"):
"""
Returns:
dict with file, value, and some debug meta.
"""
mdf = MDF(str(mf4_path))
# 채널명이 완전 일치하지 않을 수 있어, 정확히 안 맞으면 예외를 잡아서 알려줌
try:
sig = mdf.get(signal_name)
except Exception as e:
return {
"file": mf4_path.name,
"value": np.nan,
"status": f"signal_not_found: {e.__class__.__name__}",
}
time_s = np.asarray(sig.timestamps, dtype=float)
values = np.asarray(sig.samples)
v = pick_value_at_time(time_s, values, t_sec, mode=mode)
return {
"file": mf4_path.name,
"value": v,
"status": "ok" if np.isfinite(v) else "out_of_range_or_empty",
"t_sec": t_sec,
"mode": mode,
"t_min": float(time_s[0]) if len(time_s) else np.nan,
"t_max": float(time_s[-1]) if len(time_s) else np.nan,
"sample_count": int(len(time_s)),
}
def batch_extract(folder: str, pattern: str, signal_name: str, t_sec: float, mode: str = "interp"):
folder_path = Path(folder)
mf4_files = sorted(folder_path.glob(pattern))
rows = []
for f in mf4_files:
rows.append(extract_signal_at_t(f, signal_name, t_sec, mode=mode))
df = pd.DataFrame(rows)
return df
if __name__ == "__main__":
# ===== 사용자 설정 =====
folder = r"./mf4" # MF4들이 있는 폴더
pattern = "*.mf4" # 필요하면 "*.MF4" 등으로 변경
signal_name = "YourSignalName" # 예: "BMS_VBAT", "VehicleSpeed" 등
t_sec = 12.345 # n초
mode = "interp" # interp | nearest | left | right
# ======================
df = batch_extract(folder, pattern, signal_name, t_sec, mode=mode)
# 결과 저장
out_csv = Path(folder) / f"extract_{signal_name}_at_{t_sec:.3f}s_{mode}.csv"
df.to_csv(out_csv, index=False, encoding="utf-8-sig")
print("Saved:", out_csv)
print(df[["file", "value", "status"]].to_string(index=False))