dasfdsaf

ad_official·2026년 1월 24일

<위 너의 지적>
2) Ego 과거(ego_agent_past): 보통 OK(가정 의존)
✅/⚠️ ego_agent_past

ego는 “항상 존재” 전제라서 보통 무효점 자체가 없다고 가정합니다.

work()에서는 ego_agent_past에 대해 별도의 “무효 프레임 정리”를 추가로 하진 않습니다.
➡️ nuPlan이 past를 항상 정상 길이/정상 값으로 준다는 전제에서는 OK
➡️ 만약 어떤 시나리오에서 past가 부족해 “실제로 없는 시점”을 0으로 채워야 한다면, 현재 work()는 그걸 따로 보장하진 않습니다(그런 케이스가 실제로 발생하는지에 따라 판단).

그러므로, "어떤 시나리오에서 past가 부족해 “실제로 없는 시점”을 0으로 채우는 로직"을 추가합니다. ego_agent_past 는 과거~현재 순서대로 저장하는 데이터입니다. 그래서, 애초에 데이터셋에서, 과거에 무효점이었다가 유효점으로 채워진 패턴만 가능합니다. (유효점 사이에 무효점이 존재하는 경우는 애초에 없습니다.)

[GPT에 요청] <위 너의 지적>을 아래 코드에 직접 구현 반영해줄래? 위 목적을 완벽하게 달성하면서도 변경점을 최소화하는게 가장 훌륭한 코딩이야.

<diffusion_planner/data_process/data_processor.py>
import numpy as np
from tqdm import tqdm
import matplotlib
import time

matplotlib.use('Agg') # GUI 백엔드 사용 안함 (메모리 절약)
import matplotlib.pyplot as plt
import contextlib

from nuplan.planning.scenario_builder.nuplan_db.nuplan_scenario import NuPlanScenario
from nuplan.common.maps.nuplan_map.nuplan_map import NuPlanMap
from nuplan.planning.simulation.history.simulation_history_buffer import SimulationHistoryBuffer
import copy
from typing import Deque
from nuplan.planning.simulation.observation.observation_type import Observation
import os
import torch
from nuplan.common.maps.maps_datatypes import TrafficLightStatusData
from nuplan.common.actor_state.tracked_objects import TrackedObjects
from typing import Dict, Tuple, Union, List, Optional
from nuplan.common.actor_state.state_representation import Point2D
from nuplan.common.actor_state.ego_state import EgoState
import draw_machine

matplotlib 설정 추가

plt.rcParams['figure.max_open_warning'] = 0 # 경고 메시지 비활성화
matplotlib.rcParams['figure.max_open_warning'] = 0
from nuplan.planning.training.preprocessing.feature_builders.vector_builder_utils import (
MapObjectPolylines, LaneSegmentTrafficLightData)
from diffusion_planner.data_process.roadblock_utils import route_roadblock_correction
from diffusion_planner.data_process.agent_process import (
build_ego_past_feature,
build_neighbor_past_feature,
build_static_feature,
sampled_tracked_objects_to_array_list,
sampled_ego_objects_to_array_list,
sampled_static_objects_to_array_list,
agent_future_all_process,
)
from diffusion_planner.data_process.map_process import get_neighbor_vector_set_map, map_process
from diffusion_planner.data_process.ego_process import get_ego_past_array_from_scenario, get_ego_future_array_from_scenario, calculate_additional_ego_states
from diffusion_planner.data_process.utils import convert_data_dict_to_device_tensors, get_npc_route_roadblock_ids, get_neighbor_track_tokens

[ADDED] 통계 저장용

import json
from nuplan.common.actor_state.tracked_objects_types import TrackedObjectType # 타입 판정용
from diffusion_planner.data_process.road_safety_process import extract_stop_sign_points, extract_crosswalk_points

class DataProcessor(object):

def __init__(self, config):

    self._save_dir = getattr(config, "save_path", None)
    self.config = config
    self.past_time_horizon = 2  # [seconds]
    self.num_past_poses = 10 * self.past_time_horizon
    self.future_time_horizon = 8  # [seconds]
    self.num_future_poses = 10 * self.future_time_horizon
    self.set_coord_as_center = config.set_coord_as_center
    self.caching_max_agent_num = config.caching_max_agent_num
    self.max_agent_num = config.max_agent_num
    self._use_filter_radius, self._filter_radius = self._read_filter_radius_settings_from_config(
        config)
    self.caching_max_static_num = config.caching_max_static_num
    self.max_static_num = config.max_static_num
    # [변경] 타입별 상한 신설: 보행자/자전거
    self.max_pedestrians = None  #getattr(config, "max_pedestrians", 7)  #128)
    self.max_bicycles = None  #getattr(config, "max_bicycles", 3)  #64)
    self.all_car_token_to_rr_ids: Optional[Dict[str,
                                                Optional[List[str]]]] = None
    self.init_cur_fut_agents_world_8_list: Optional[List[np.ndarray]] = None
    self._map_elements = [
        'LANE', 'LEFT_BOUNDARY', 'RIGHT_BOUNDARY', 'ROUTE_LANES'
    ]  # name of map features to be extracted.
    self._caching_max_map_elements = {
        'LANE': config.caching_max_lane_num,
        'LEFT_BOUNDARY': config.caching_max_lane_num,
        'RIGHT_BOUNDARY': config.caching_max_lane_num,
        'ROUTE_LANES': config.caching_max_lane_num
    }  # maximum number of elements to extract per feature layer.
    self._max_map_elements = {
        'LANE': config.max_lane_num,
        'LEFT_BOUNDARY': config.max_lane_num,
        'RIGHT_BOUNDARY': config.max_lane_num,
        'ROUTE_LANES': config.max_lane_num
    }  # maximum number of elements to extract per feature layer.
    self._map_points_num = {
        'LANE': config.lane_len,
        'LEFT_BOUNDARY': config.lane_len,
        'RIGHT_BOUNDARY': config.lane_len,
        'ROUTE_LANES': config.lane_len
    }  # maximum number of points per feature to extract per feature layer.

@staticmethod
def _build_origin_world_pose(
        ego_cur_pose_np: np.
    ndarray,  # shape: (3,) = [x_world, y_world, yaw_world]
) -> np.ndarray:
    """현재 샘플의 “ego 기준 좌표계 원점”이 세계좌표계에서 어디인지 (x, y, cos, sin)으로 만든다.

    우리가 저장하는 대부분의 값은
    “현재 ego 위치를 (0,0) 원점으로 둔 좌표계(ego 기준 좌표계)”에서 표현됩니다.

    그런데 나중에 이 값을 다시 세계좌표계로 복원하려면,
    “그 원점이 세계좌표계에서는 어디였는지”가 반드시 필요합니다.

    이 함수는 그 정보를 아래 형태로 만들어 줍니다.

    - origin_world_pose = [x_world, y_world, cos(yaw_world), sin(yaw_world)]
      shape: (4,)

    여기서 (x_world, y_world, yaw_world)는 입력 `ego_cur_pose_np`에서 가져옵니다.
    `ego_cur_pose_np`는 이미 코드에서 ego 기준 좌표계 변환의 기준점(원점)으로 쓰는 값이므로,
    이 값을 그대로 저장하면 “데이터를 만들 때 사용한 기준점”과 완전히 일치합니다.

    Args:
        ego_cur_pose_np (np.ndarray):
            shape: (3,)
            - [x_world, y_world, yaw_world]
            - 세계좌표계에서의 현재 ego 위치/방향(라디안)

    Returns:
        np.ndarray:
            shape: (4,)
            - [x_world, y_world, cos(yaw_world), sin(yaw_world)]
            - dtype: float32
    """
    if not isinstance(ego_cur_pose_np, np.ndarray):
        raise TypeError(
            f"`ego_cur_pose_np`는 np.ndarray 여야 합니다. got {type(ego_cur_pose_np)}"
        )
    if ego_cur_pose_np.shape != (3,):
        raise ValueError(
            f"`ego_cur_pose_np` shape는 (3,) 이어야 합니다. got {ego_cur_pose_np.shape}"
        )

    x_world: float = float(ego_cur_pose_np[0])
    y_world: float = float(ego_cur_pose_np[1])
    yaw_world: float = float(ego_cur_pose_np[2])

    # origin_world_pose: shape (4,) = [x, y, cos(yaw), sin(yaw)]
    origin_world_pose: np.ndarray = np.array(
        [x_world, y_world,
         np.cos(yaw_world),
         np.sin(yaw_world)],
        dtype=np.float32,
    )
    return origin_world_pose

@staticmethod
def _slice_neighbor_cur_fut_horizon_11dim(
    neighbor_cur_fut_all_gt_11_dim: np.ndarray,  # shape: (N, T_all, 11)
    iteration: int,
    future_len: int,
) -> np.ndarray:
    """neighbor의 (현재~미래) 전체 시퀀스에서, 특정 iteration 기준으로 (현재+미래) 구간만 고정 길이로 뽑는다.

    이 함수가 필요한 이유
    ---------------------
    observation_adapter()에서는 scenario 전체 길이만큼의 (현재~미래) 데이터를 미리 만들어두고,
    매 step마다 iteration 위치에서 앞으로 future_len 만큼을 잘라서 씁니다.

    그런데 “유효점과 유효점 사이에 무효점이 있으면 안 된다” 규칙을 적용할 때,
    **현재 step에서 필요한 101개(=past_len + future_len)**만 보고 처리해야 합니다.
    (멀리 뒤의 미래 프레임이 섞이면, 그 정보 때문에 앞 구간이 잘못 채워질 수 있습니다.)

    그래서 이 함수는:
    - neighbor_cur_fut_all_gt_11_dim (N, T_all, 11) 에서
    - [iteration ... iteration + future_len] (총 1+future_len 프레임)
      을 뽑아서 (N, 1+future_len, 11) 로 반환합니다.
    - 범위를 벗어나는 프레임은 0으로 패딩합니다.

    Args:
        neighbor_cur_fut_all_gt_11_dim (np.ndarray):
            shape: (N, T_all, 11)
            - N: agent 수
            - T_all: scenario 전체 타임 길이(현재 포함)
            - 11: [x, y, cos, sin, vx, vy, width, length, onehot(3)]
        iteration (int):
            현재 step 인덱스(0 기반).
            이 값이 “현재 프레임” 위치라고 가정합니다.
        future_len (int):
            “현재 이후”로 몇 프레임을 쓸지 (현재 제외).
            예: 80 이면 출력 길이는 81(=현재1 + 미래80)

    Returns:
        np.ndarray:
            shape: (N, 1 + future_len, 11)
            - index 0: 현재 프레임
            - index 1..future_len: 미래 프레임
            - 부족한 구간은 0으로 채워짐
    """
    if neighbor_cur_fut_all_gt_11_dim.ndim != 3 or neighbor_cur_fut_all_gt_11_dim.shape[
            -1] != 11:
        raise ValueError(
            f"`neighbor_cur_fut_all_gt_11_dim` shape는 (N, T_all, 11)이어야 합니다. "
            f"got {neighbor_cur_fut_all_gt_11_dim.shape}")
    if iteration < 0:
        raise ValueError(f"`iteration`은 0 이상이어야 합니다. got {iteration}")
    if future_len < 0:
        raise ValueError(f"`future_len`은 0 이상이어야 합니다. got {future_len}")

    N: int = int(neighbor_cur_fut_all_gt_11_dim.shape[0])
    T_all: int = int(neighbor_cur_fut_all_gt_11_dim.shape[1])
    out_len: int = int(1 + future_len)

    # out: (N, 1+future_len, 11)
    out: np.ndarray = np.zeros((N, out_len, 11),
                               dtype=neighbor_cur_fut_all_gt_11_dim.dtype)

    start: int = int(iteration)
    end: int = int(iteration + out_len)

    if start >= T_all:
        return out

    copy_end: int = int(min(T_all, end))
    copy_len: int = int(copy_end - start)  # 복사 가능한 길이

    out[:, :copy_len, :] = neighbor_cur_fut_all_gt_11_dim[:,
                                                          start:copy_end, :]
    return out

def _build_neighbor_future_gt_from_past_and_cur_fut_11dim(
    self,
    neighbor_agents_past: np.ndarray,  # shape: (N, Tp, 11)
    neighbor_cur_fut_gt_11_dim: np.
    ndarray,  # shape: (N, 1+Tf, 11)  (0번이 현재)
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """neighbor의 past와 (현재+미래)를 합쳐서, 규칙을 만족하도록 정리한 뒤 최종 출력들을 만든다.

    반드시 만족해야 하는 규칙(한 agent 기준)
    --------------------------------------
    1) 현재 점(past의 마지막)이 무효면:
       - 101개 전체(과거~현재~미래)가 전부 무효(=전부 0)여야 합니다.

    2) 현재 점이 유효면:
       - 101개 전체에서 “유효점과 유효점 사이에 무효점”이 있으면 안 됩니다.
       - 즉, 유효한 프레임들이 중간에 끊기면,
         그 끊긴 구간을 x/y/cos/sin/vx/vy의 “직선 중간값”으로 채워서
         유효 구간이 한 덩어리로 이어지게 만듭니다.
       - 유효 구간 밖(prefix/suffix)은 0으로 둡니다.

    Args:
        neighbor_agents_past (np.ndarray):
            shape: (N, Tp, 11)
            - Tp = time_len (예: 21)
            - 마지막 index (Tp-1)이 “현재 프레임”
        neighbor_cur_fut_gt_11_dim (np.ndarray):
            shape: (N, 1+Tf, 11)
            - index 0이 “현재 프레임”
            - index 1..Tf 가 “미래”
            - Tf = future_len (예: 80)

    Returns:
        Tuple[np.ndarray, np.ndarray, np.ndarray]:
            - fixed_neighbor_agents_past: shape (N, Tp, 11)
            - neighbor_future_gt_11_dim:  shape (N, Tf, 11)  (현재 제외)
            - neighbor_future_gt_3_dim:   shape (N, Tf, 3)   [x, y, yaw]
    """
    fixed_neighbor_agents_past, neighbor_future_gt_11_dim = self._merge_and_interpolate_neighbor_11dim(
        neighbor_agents_past=neighbor_agents_past,
        neighbor_cur_fut_gt_11_dim=neighbor_cur_fut_gt_11_dim,
    )

    # 11 -> 3 (yaw는 cos/sin으로 계산)
    neighbor_future_gt_3_dim: np.ndarray = self._traj11_to_traj3_yaw(
        neighbor_future_gt_11_dim)

    return fixed_neighbor_agents_past, neighbor_future_gt_11_dim, neighbor_future_gt_3_dim

def _enforce_no_invalid_between_valid_in_ego_past(
    self,
    ego_agent_past: np.ndarray,  # shape: (Tp, 11)
    *,
    eps: float = 1e-8,
) -> np.ndarray:
    """ego_agent_past(과거~현재)만 가지고도 “유효-무효-유효”가 생기지 않게 정리한다.

    observation_adapter()에서는 ego 미래 GT를 출력하지 않기 때문에,
    past(과거~현재)만으로 규칙을 강제합니다.

    규칙
    ----
    - 현재 프레임(=past의 마지막)이 유효일 때:
      · past 안에서 유효점과 유효점 사이에 무효점(0)이 끼면 안 됩니다.
      · 그래서 past에서 유효한 프레임들의 첫/마지막 위치를 찾고,
        그 사이에 비어 있는 프레임이 있으면 x/y/cos/sin/vx/vy를 “직선 중간값”으로 채웁니다.
      · 유효 구간 밖은 0으로 둡니다.

    - 현재 프레임이 무효면:
      · 안전하게 past 전체를 0으로 만듭니다.
        (ego는 보통 항상 존재하지만, 이상 케이스를 막기 위해서입니다.)

    Args:
        ego_agent_past (np.ndarray):
            shape: (Tp, 11)
            Tp는 보통 21 (2초 과거 + 현재)
        eps (float):
            0과 아주 가까운 값을 “0”처럼 볼 때 쓰는 기준

    Returns:
        np.ndarray:
            shape: (Tp, 11)
            규칙을 만족하도록 정리된 ego_agent_past (새 배열)
    """
    if ego_agent_past.ndim != 2 or ego_agent_past.shape[-1] != 11:
        raise ValueError(
            f"`ego_agent_past` shape는 (Tp, 11)이어야 합니다. got {ego_agent_past.shape}"
        )

    Tp: int = int(ego_agent_past.shape[0])
    if Tp == 0:
        return ego_agent_past

    traj: np.ndarray = ego_agent_past.astype(np.float32,
                                             copy=True)  # (Tp, 11)
    current_index: int = Tp - 1

    # valid_mask_1d: (Tp,)
    valid_mask_1d: np.ndarray = (np.abs(traj[:, :8]) > eps).any(axis=1)

    # 현재가 무효면 past 전체를 0으로
    if not bool(valid_mask_1d[current_index]):
        return np.zeros_like(traj)

    valid_idx: np.ndarray = np.nonzero(valid_mask_1d)[0]
    if valid_idx.size == 0:
        return np.zeros_like(traj)

    region_mask: np.ndarray = np.zeros((Tp,), dtype=bool)

    if valid_idx.size == 1:
        region_mask[int(valid_idx[0])] = True
    else:
        first_valid: int = int(valid_idx[0])
        last_valid: int = int(valid_idx[-1])
        region_mask[first_valid:last_valid + 1] = True

        # 중간 구멍이 있으면 x/y/cos/sin/vx/vy를 채움 (0~5)
        if last_valid - first_valid + 1 > valid_idx.size:
            xs: np.ndarray = valid_idx.astype(np.float64)  # (K,)
            seg_idx: np.ndarray = np.arange(first_valid,
                                            last_valid + 1,
                                            dtype=np.float64)

            for dim_idx in range(6):  # 0~5
                ys: np.ndarray = traj[valid_idx, dim_idx].astype(np.float64,
                                                                 copy=False)
                interp_vals: np.ndarray = np.interp(seg_idx, xs, ys)
                traj[first_valid:last_valid + 1,
                     dim_idx] = interp_vals.astype(np.float32, copy=False)

    # 타입/크기는 “현재 프레임 값”을 대표로 씀
    type_vec: np.ndarray = traj[current_index,
                                8:11].astype(np.float32, copy=False)  # (3,)
    rep_size: np.ndarray = traj[current_index,
                                6:8].astype(np.float32, copy=False)  # (2,)

    # one-hot은 유효 구간에만
    traj[:, 8:11] = 0.0
    traj[region_mask, 8:11] = type_vec

    # 유효 구간 밖은 완전 0
    traj[~region_mask, :] = 0.0

    # width/length 채우기 + cos/sin 정리
    traj_b: np.ndarray = traj[None, :, :]  # (1, Tp, 11)
    region_b: np.ndarray = region_mask[None, :]  # (1, Tp)
    rep_size_b: np.ndarray = rep_size[None, :]  # (1, 2)

    traj_b = self._fill_width_length_with_representative_size(
        traj_11=traj_b,
        valid_mask=region_b,
        rep_size=rep_size_b,
    )
    traj_b = self._normalize_cos_sin_in_traj_11(
        traj_11=traj_b,
        valid_mask=region_b,
    )
    return traj_b[0]  # (Tp, 11)

def _get_map_query_radius_m(self) -> float:
    """지도/도로시설(정지표지, 횡단보도 등)을 조회할 때 사용할 반경(m)을 반환합니다.

    이 반경은 '필터링을 켜고/끄는 옵션(use_filter_radius)'과 성격이 다릅니다.

    - use_filter_radius 는 '에이전트/정적 객체를 거리로 잘라낼지'를 제어합니다.
    - 하지만 지도/도로시설은 반경이 없으면 가져오는 데이터가 너무 커져서
      속도/메모리 문제가 생길 수 있습니다.

    그래서 지도/도로시설 조회 반경은 use_filter_radius 와 무관하게
    항상 filter_radius 값을 그대로 사용하도록 분리합니다.

    Returns:
        float:
            meter 단위 지도/도로시설 조회 반경.
    """
    return float(self._filter_radius)

@staticmethod
def _read_filter_radius_settings_from_config(
        config: object) -> Tuple[bool, float]:
    """config에서 필터링 설정(use_filter_radius, filter_radius)을 읽습니다.

    이 클래스는 오직 config에 아래 두 값이 "명시적으로 존재"할 때만 동작하도록 강제합니다.
      - use_filter_radius (bool)
      - filter_radius (float, meter)

    둘 중 하나라도 없으면, 잘못된 설정으로 조용히 다른 기본값을 쓰는 일을 막기 위해
    즉시 에러를 내고 중단합니다.

    Args:
        config: args_util.get_args()가 반환한 인자 객체(보통 argparse.Namespace)

    Returns:
        Tuple[bool, float]:
            - use_filter_radius: bool
            - filter_radius_m: float (meter)

    Raises:
        RuntimeError: use_filter_radius 또는 filter_radius가 config에 없을 때
    """
    missing: List[str] = []
    if not hasattr(config, "use_filter_radius"):
        missing.append("use_filter_radius")
    if not hasattr(config, "filter_radius"):
        missing.append("filter_radius")

    if len(missing) > 0:
        raise RuntimeError(
            "필수 설정이 config에 없습니다. "
            "args_util.py에 `--use_filter_radius`와 `--filter_radius`를 반드시 선언해야 합니다. "
            f"missing={missing}")

    use_filter_radius = bool(getattr(config, "use_filter_radius"))
    filter_radius_m = float(getattr(config, "filter_radius"))
    return use_filter_radius, filter_radius_m

def _get_effective_filter_radius_m(self) -> Optional[float]:
    """현재 설정(use_filter_radius)에 따라 실제로 사용할 반경을 반환합니다.

    - use_filter_radius=True: filter_radius(m)를 반환합니다.
    - use_filter_radius=False: None을 반환해서, 호출 측에서 필터링을 건너뛰게 합니다.

    Returns:
        Optional[float]:
            - float: meter 단위 반경
            - None: 필터링을 하지 않음
    """
    if bool(self._use_filter_radius):
        return float(self._filter_radius)
    return None
@staticmethod
def _adjust_ego_future_outputs_to_center_frame(
    ego_state: EgoState,
    ego_future_gt_3_dim: np.ndarray,  # shape: (T, 3)
    ego_future_gt_11_dim: np.ndarray,  # shape: (T, 11)
    *,
    set_coord_as_center: bool,
) -> Tuple[np.ndarray, np.ndarray]:
    """ego 미래 궤적 출력이 rear axle 기준일 때, center 기준으로 x,y만 보정한다.

    주의(이번 수정의 핵심)
    ---------------------
    - 무효 프레임(앞 8차원이 전부 0인 프레임)은 (0,0,0,0,...) 상태를 유지해야 한다.
    - 따라서 set_coord_as_center=True 여도 무효 프레임에는 x,y 이동(offset)을 적용하지 않는다.
      (유효 프레임에만 적용)

    Args:
        ego_state (EgoState):
            현재 ego 상태.
        ego_future_gt_3_dim (np.ndarray):
            shape: (T, 3)
            [x, y, yaw] 형태의 ego 미래 궤적(ego 로컬 좌표계).
        ego_future_gt_11_dim (np.ndarray):
            shape: (T, 11)
            [x, y, cos, sin, vx, vy, width, length, onehot(3)] 형태의 ego 미래 궤적.
        set_coord_as_center (bool):
            True면 center 기준으로 보정, False면 입력 그대로 반환.

    Returns:
        Tuple[np.ndarray, np.ndarray]:
            (ego_future_gt_3_dim, ego_future_gt_11_dim)
            - 둘 다 입력 배열을 **in-place로** 수정한 뒤 그대로 반환합니다.
    """
    if not set_coord_as_center:
        return ego_future_gt_3_dim, ego_future_gt_11_dim

    # 미래 길이가 0이면 할 게 없음
    if ego_future_gt_11_dim.size == 0:
        return ego_future_gt_3_dim, ego_future_gt_11_dim

    # (1) 월드 좌표에서 rear_axle -> center 변위
    dx_world: float = float(ego_state.center.x - ego_state.rear_axle.x)
    dy_world: float = float(ego_state.center.y - ego_state.rear_axle.y)

    # (2) 현재 heading 기준 로컬 좌표로 회전 (world -> ego heading frame)
    heading: float = float(ego_state.rear_axle.heading)
    c: float = float(np.cos(heading))
    s: float = float(np.sin(heading))

    # local = R^T * world,  R = [[c, -s],[s, c]]
    offset_x_local: float = dx_world * c + dy_world * s
    offset_y_local: float = -dx_world * s + dy_world * c

    # ✅ (핵심) 유효/무효 마스크
    # - 무효 프레임 정의: [x, y, cos, sin, vx, vy, width, length] 8개가 전부 0이면 무효
    eps: float = 1e-8
    # valid_mask: shape (T,)
    valid_mask: np.ndarray = (np.abs(ego_future_gt_11_dim[:, :8]) > eps).any(axis=1)

    # 유효 프레임에만 원점 이동 적용
    ego_future_gt_3_dim[valid_mask, 0] -= offset_x_local
    ego_future_gt_3_dim[valid_mask, 1] -= offset_y_local
    ego_future_gt_11_dim[valid_mask, 0] -= offset_x_local
    ego_future_gt_11_dim[valid_mask, 1] -= offset_y_local

    return ego_future_gt_3_dim, ego_future_gt_11_dim


@staticmethod
def _normalize_cos_sin_in_traj_11(
    traj_11: np.ndarray,  # shape: (N, T, 11)
    valid_mask: np.ndarray,  # shape: (N, T), True면 유효 프레임
    *,
    eps: float = 1e-6,
    cos_index: int = 2,
    sin_index: int = 3,
) -> np.ndarray:
    """(cos, sin) 채널을 "진짜 cos/sin"처럼 보이도록 길이를 1로 맞춥니다.

    왜 필요한가
    ----------
    중간 프레임을 채우는 과정에서 cos/sin을 숫자 그대로 선형으로 섞으면,
    (cos, sin)이 단위원 위(길이 1)에 있지 않을 수 있습니다.
    그러면 모델 입장에서 "방향 정보"가 애매해질 수 있습니다.

    이 함수는 다음 규칙으로 정리합니다.
    1) valid_mask=True 인 프레임만 처리합니다.
    2) (cos^2 + sin^2)의 제곱근이 eps보다 크면,
       cos와 sin을 그 길이로 나눠서 길이를 1로 맞춥니다.
    3) 길이가 너무 작으면(=방향 정보가 사실상 없는 경우),
       해당 프레임의 cos/sin을 0으로 둡니다.
    4) valid_mask=False 인 프레임은 cos/sin을 0으로 둡니다.

    Args:
        traj_11 (np.ndarray):
            shape (N, T, 11)
        valid_mask (np.ndarray):
            shape (N, T)
        eps (float):
            0 나누기 방지용 작은 값
        cos_index (int):
            cos 채널 인덱스(기본 2)
        sin_index (int):
            sin 채널 인덱스(기본 3)

    Returns:
        np.ndarray:
            shape (N, T, 11)
            traj_11을 직접 수정한 뒤 그대로 반환합니다.
    """
    if traj_11.ndim != 3 or traj_11.shape[-1] != 11:
        raise ValueError(
            f"`traj_11` shape는 (N, T, 11)이어야 합니다. got {traj_11.shape}")
    if valid_mask.shape != traj_11.shape[:2]:
        raise ValueError(
            f"`valid_mask` shape는 (N, T)이어야 합니다. got {valid_mask.shape}, expected {traj_11.shape[:2]}"
        )

    cos_v = traj_11[:, :, cos_index]  # (N, T)
    sin_v = traj_11[:, :, sin_index]  # (N, T)

    # norm: (N, T)
    norm = np.sqrt(cos_v * cos_v + sin_v * sin_v)

    # 나눗셈 안전장치
    norm_safe = np.where(norm > eps, norm, 1.0)

    cos_unit = cos_v / norm_safe
    sin_unit = sin_v / norm_safe

    # valid_mask=True 이고 norm>eps 인 곳만 (cos_unit, sin_unit) 사용
    good = valid_mask & (norm > eps)

    traj_11[:, :, cos_index] = np.where(good, cos_unit,
                                        0.0).astype(traj_11.dtype,
                                                    copy=False)
    traj_11[:, :, sin_index] = np.where(good, sin_unit,
                                        0.0).astype(traj_11.dtype,
                                                    copy=False)
    return traj_11

# [ADDED] 통계 유틸 함수들
# =========================
@staticmethod
def _count_valid_neighbors_by_type(
        neighbor_agents_past: np.ndarray,  # (N, Tp, 11)
) -> Tuple[int, int, int]:
    """마지막 시점의 에이전트 상태로 유효/타입을 판정해 수를 셉니다.

    규칙:
      - 유효성: 마지막 시점의 앞 8차원(kinematics/size)이 모두 0이면 무효로 간주
        · 즉, valid = any(|state_last[:8]| > eps)
      - 타입: 마지막 3차원(one-hot) = [vehicle, pedestrian, bicycle]
        · 임계값 0.5 초과를 1로 해석(부동소수 오차 대비)

    Args:
        neighbor_agents_past (np.ndarray):
            - shape: (N, Tp, 11)
            - 마지막 차원 11 = [x, y, cos, sin, vx, vy, width, length, onehot_vehicle, onehot_ped, onehot_bike]

    Returns:
        Tuple[int, int, int]: (vehicle_count, pedestrian_count, bicycle_count)

    Raises:
        ValueError: 입력의 마지막 차원 크기가 11이 아닌 경우.
    """
    if neighbor_agents_past.ndim != 3 or neighbor_agents_past.shape[
            -1] != 11:
        raise ValueError(
            f"`neighbor_agents_past` shape는 (N, Tp, 11)이어야 합니다. "
            f"got {neighbor_agents_past.shape}")

    # 마지막 시점만 사용
    last: np.ndarray = neighbor_agents_past[:, -1, :]  # (N, 11)

    # 유효성 마스크: 앞 8차원 중 하나라도 |.| > eps 이면 유효
    eps = 1e-8
    valid_mask: np.ndarray = (np.abs(last[:, :8]) > eps).any(axis=1)  # (N,)

    # 타입 one-hot (vehicle, pedestrian, bicycle)
    type_oh: np.ndarray = last[:, 8:11]  # (N, 3)
    veh_mask = type_oh[:, 0] > 0.5
    ped_mask = type_oh[:, 1] > 0.5
    bik_mask = type_oh[:, 2] > 0.5

    vehicle_count = int(np.sum(valid_mask & veh_mask))
    pedestrian_count = int(np.sum(valid_mask & ped_mask))
    bicycle_count = int(np.sum(valid_mask & bik_mask))

    return vehicle_count, pedestrian_count, bicycle_count

@staticmethod
def _compute_lane_speed_stats(
    vector_map_output: Dict[str,
                            np.ndarray],) -> Tuple[float, Optional[float]]:
    """차선 관련 통계를 계산한다.

    분모는 '유효 차선' 개수:
        - vector_map_output['lanes'] 의 각 차선 텐서 합(|.|) > 0

    통계:
        - 속도제한 차선 비율(%):
            100 * (#(유효 ∧ has_speed_limit True)) / (#유효)
        - 속도제한 차선들의 평균 제한속도(km/h):
            mean(lanes_speed_limit[유효 ∧ True]) * 3.6
            (없으면 None 반환)

    Args:
        vector_map_output: map_process(...) 가 반환한 dict

    Returns:
        Tuple[float, Optional[float]]: (ratio_percent, mean_speed_kmh or None)
    """
    lanes: np.ndarray = vector_map_output[
        'lanes']  # (lane_num, lane_len, 12)
    has_speed: np.ndarray = vector_map_output[
        'lanes_has_speed_limit']  # (lane_num, 1) bool
    speed_mps: np.ndarray = vector_map_output[
        'lanes_speed_limit']  # (lane_num, 1) float

    # 유효 차선 판정:
    #   - 앞 8채널(x, y, vec, left/right 등)이 전부 0이면 패딩으로 간주
    #   - 즉, lanes[..., :8]의 모든 값이 0인 lane 은 무시
    lanes_front8: np.ndarray = lanes[..., :8]  # (lane_num, lane_len, 8)
    lanes_valid_mask = (np.abs(lanes_front8).sum(axis=(1, 2))
                        > 0)  # (lane_num,)
    if lanes_valid_mask.sum() == 0:
        return 0.0, None

    has_speed_mask = (has_speed.reshape(-1).astype(bool)
                     ) & lanes_valid_mask  # (lane_num,)
    ratio_percent = float(100.0 * has_speed_mask.sum() /
                          lanes_valid_mask.sum())

    mean_speed_kmh: Optional[float] = None
    if has_speed_mask.any():
        mean_speed_kmh = float(
            speed_mps.reshape(-1)[has_speed_mask].mean() * 3.6)

    return ratio_percent, mean_speed_kmh

def _save_sample_stats_json(
    self,
    map_name: str,
    token: str,
    stats: Dict[str, Union[int, float, None]],
) -> None:
    """샘플별 통계를 `<save_path>/<map>_<token>.stats.json` 으로 저장한다.

    원자적 저장을 위해 `.tmp`로 쓴 뒤 최종 파일명으로 교체한다.

    Args:
        map_name: 맵 이름
        token: 시나리오 토큰
        stats: 저장할 통계 딕셔너리
    """
    if not self._save_dir:
        return
    os.makedirs(self._save_dir, exist_ok=True)
    json_temp_folder = os.path.join(self._save_dir, "json_temp")
    os.makedirs(json_temp_folder, exist_ok=True)
    out_path = os.path.join(json_temp_folder,
                            f"{map_name}_{token}.stats.json")
    tmp_path = out_path + ".tmp"
    with open(tmp_path, "w") as f:
        json.dump(stats, f, indent=2)
    os.replace(tmp_path, out_path)

def _get_car_token_to_rr_ids(
    self,
    all_car_token_to_rr_ids: Dict[str, Optional[List[str]]],
    neighbor_track_token: List[str]  # len = chosen_agent_num
) -> Dict[str, List[str]]:  # len = chosen_car_num
    car_token_to_rr_ids: Dict[str, Optional[List[str]]] = {}
    for token in neighbor_track_token:
        if token in all_car_token_to_rr_ids:
            car_token_to_rr_ids[token] = all_car_token_to_rr_ids[token]
    return car_token_to_rr_ids

def _get_past_cur_ego_feature(
    self,
    scenario: Optional[NuPlanScenario] = None,
    history_buffer: Optional[SimulationHistoryBuffer] = None,
    *,
    set_coord_as_center: bool = False,
) -> Tuple[EgoState, Point2D, float, np.ndarray, np.ndarray,
           Optional[np.ndarray]]:
    """시나리오 또는 history buffer 에서 ego 궤적을 공통 포맷으로 추출한다.

    (중간 설명은 기존 docstring 유지하되, 아래 한 줄만 추가 개념으로 보면 됩니다)
    - set_coord_as_center=True 이면:
      ego 기준 좌표 변환의 기준점을 rear axle이 아니라 ego center로 잡습니다.
      즉, ego_point2d / ego_heading / ego_cur_pose_np 가 center 기준으로 설정됩니다.
    """
    if (scenario is None and history_buffer is None) or \
       (scenario is not None and history_buffer is not None):
        raise ValueError("scenario 또는 history_buffer 중 정확히 하나만 전달해야 합니다.")

    if scenario is not None:
        ego_state: EgoState = scenario.initial_ego_state
    else:
        ego_state = history_buffer.current_state[
            0]  # type: ignore[union-attr]

    # ✅ 기준점 선택: rear_axle(default) vs center
    if set_coord_as_center:
        ref = ego_state.center
    else:
        ref = ego_state.rear_axle

    ego_point2d = Point2D(ref.x, ref.y)
    ego_heading: float = float(ref.heading)
    ego_cur_pose_np = np.array([ref.x, ref.y, ref.heading],
                               dtype=np.float64)  # shape: (3,)

    if scenario is not None:
        (past_cur_ego_world_10,
         past_cur_time_np) = get_ego_past_array_from_scenario(
             scenario,
             self.num_past_poses,
             self.past_time_horizon,
         )
    else:
        ego_state_buffer: Deque[EgoState] = history_buffer.ego_state_buffer
        past_cur_ego_world_10 = sampled_ego_objects_to_array_list(
            ego_state_buffer)
        past_cur_time_np = None

    assert past_cur_ego_world_10.shape[0] == self.num_past_poses + 1, \
        f"Expected past_cur_ego_world_10 shape[0] == {self.num_past_poses + 1}, got {past_cur_ego_world_10.shape[0]}"

    return (
        ego_state,
        ego_point2d,
        ego_heading,
        ego_cur_pose_np,
        past_cur_ego_world_10,
        past_cur_time_np,
    )

def _prepare_car_token_to_rr_ids(
    self,
    scenario: NuPlanScenario,
    use_route_lanes: bool = False,
    neighbor_track_token: Optional[List[str]] = None,
) -> Dict[str, List[str]]:
    if use_route_lanes and self.all_car_token_to_rr_ids is None:
        present_tracked_objects: TrackedObjects \
            = scenario.initial_tracked_objects.tracked_objects
        past_tracked_objects: List[TrackedObjects] = [
            tracked_objects.tracked_objects
            for tracked_objects in scenario.get_past_tracked_objects(
                iteration=0,
                time_horizon=self.past_time_horizon,
                num_samples=self.num_past_poses)
        ]
        past_cur_tracked_objects = past_tracked_objects + [
            present_tracked_objects
        ]
        self.all_car_token_to_rr_ids: Dict[
            str, List[str]] = get_npc_route_roadblock_ids(
                scenario,
                past_cur_tracked_objects,
                neighbor_track_token=None)
    elif not use_route_lanes:
        self.all_car_token_to_rr_ids = {}
    # len = chosen_car_num
    car_token_to_rr_ids: Dict[str,
                              List[str]] = self._get_car_token_to_rr_ids(
                                  self.all_car_token_to_rr_ids,
                                  neighbor_track_token)
    return car_token_to_rr_ids

def _get_cur_fut_agents_world_8_list(
    self,
    scenario: NuPlanScenario,
    token_to_id: Dict[str, int],
    do_inference: bool,
):
    if do_inference:
        if self.init_cur_fut_agents_world_8_list is None:
            scenario_duration: float = scenario.duration_s.time_s + self.future_time_horizon
            num_samples = int(scenario_duration * 10.0)
            """
            self.init_cur_fut_agents_world_8_list: List[np.ndarray]
                - 길이: 1 + num_samples
                - 각 원소 shape: (frame_agents_num_t, 8)
            """
            (self.init_cur_fut_agents_world_8_list,
             _) = self._get_future_tracked_objects_array_list(
                 scenario,
                 token_to_id=token_to_id,
                 iteration=0,
                 future_time_horizon=scenario_duration,
                 num_samples=num_samples)

        # 깊은 복사 후, 선택 에이전트들만 뽑아서 ego 기준으로 변환
        cur_fut_agents_world_8_list = copy.deepcopy(
            self.init_cur_fut_agents_world_8_list)
    else:
        (cur_fut_agents_world_8_list,
         _) = self._get_future_tracked_objects_array_list(
             scenario, token_to_id=token_to_id, iteration=0)
    return cur_fut_agents_world_8_list

# Use for inference
def observation_adapter(
    self,
    iteration: int,
    history_buffer: SimulationHistoryBuffer,
    traffic_light_data: List[TrafficLightStatusData],
    map_api: NuPlanMap,
    device='cpu',
    scenario: Optional[NuPlanScenario] = None,
    use_route_lanes: bool = False,
    squeeze: bool = False,
) -> Dict[str, torch.Tensor]:

    (ego_state, ego_point2d, ego_heading, ego_cur_pose_np,
     past_cur_ego_world_10, _) = self._get_past_cur_ego_feature(
         history_buffer=history_buffer,
         set_coord_as_center=self.set_coord_as_center,
     )

    # ✅ 추가: ego 기준 좌표계 원점의 세계좌표 포즈 저장
    # origin_world_pose: shape (4,) = [x_world, y_world, cos(yaw), sin(yaw)]
    origin_world_pose: np.ndarray = self._build_origin_world_pose(
        ego_cur_pose_np)

    ego_agent_past = build_ego_past_feature(
        past_cur_ego_world_10=past_cur_ego_world_10,
        ego_cur_pose_np=ego_cur_pose_np,
    )
    # Past observations including the current
    observation_buffer: Deque[
        Observation] = history_buffer.observation_buffer

    (
        past_cur_agents_world_8_list,
        past_cur_agents_types_list,
        present_static_feat_5,
        static_types_list,
        token_to_id,
        _,
        _,
    ) = self._get_past_cur_agents_feature(
        observation_buffer=observation_buffer)

    (neighbor_agents_past, agents_cur_frame_indices, neighbors_id,
     neighbor_track_token) = build_neighbor_past_feature(
         past_cur_agents_world_8_list=past_cur_agents_world_8_list,
         past_cur_agents_types_list=past_cur_agents_types_list,
         max_agent_num=self.max_agent_num,
         ego_cur_pose_np=ego_cur_pose_np,
         max_pedestrians=self.max_pedestrians,
         max_bicycles=self.max_bicycles,
         token_to_id=token_to_id,
         filter_radius=None,
     )

    ego_time_len = ego_agent_past.shape[0]
    neighbor_time_len = neighbor_agents_past.shape[1]
    assert ego_time_len == neighbor_time_len == self.num_past_poses + 1, \
        f"Expected time length {self.num_past_poses + 1}, got ego {ego_time_len}, neighbor {neighbor_time_len}"

    cur_fut_agents_world_8_list = self._get_cur_fut_agents_world_8_list(
        scenario, token_to_id, do_inference=True)

    # (N, 1 + Tf_all, 11)
    neighbor_cur_fut_all_gt_11_dim = agent_future_all_process(
        ego_cur_pose_np=ego_cur_pose_np,
        cur_fut_agents_world_8_list=cur_fut_agents_world_8_list,
        neighbor_token_id=neighbors_id,
        neighbor_agents_past=neighbor_agents_past,
    )

    # ✅ (요구조건 a) “현재 iteration 기준”으로 (현재+미래 80)만 뽑아서
    #    (past 21)과 합친 101개 기준으로 규칙을 강제
    neighbor_cur_fut_horizon_gt_11_dim = self._slice_neighbor_cur_fut_horizon_11dim(
        neighbor_cur_fut_all_gt_11_dim=neighbor_cur_fut_all_gt_11_dim,
        iteration=iteration,
        future_len=self.num_future_poses,
    )

    # ✅ 규칙 적용 + 최종 neighbor_future_gt_3_dim / neighbor_future_gt_11_dim 생성
    neighbor_agents_past, neighbor_future_gt_11_dim, neighbor_future_gt_3_dim = \
        self._build_neighbor_future_gt_from_past_and_cur_fut_11dim(
            neighbor_agents_past=neighbor_agents_past,
            neighbor_cur_fut_gt_11_dim=neighbor_cur_fut_horizon_gt_11_dim,
        )

    # (선택) 기존처럼 “전체 길이” future_all도 계속 내보내고 싶다면:
    # - 여기서는 추가적인 101 규칙 적용이 아니라, raw 변환만 제공합니다.
    # - shape: (N, Tf_all, 3)
    neighbor_future_all_gt_11_dim = neighbor_cur_fut_all_gt_11_dim[:,
                                                                   1:, :]  # 현재(0) 제외
    neighbor_future_all_gt_3_dim = self._traj11_to_traj3_yaw(
        neighbor_future_all_gt_11_dim)

    static_objects = build_static_feature(
        present_static_feat_5=present_static_feat_5,
        static_types_list=static_types_list,
        max_static_num=self.max_static_num,
        ego_cur_pose_np=ego_cur_pose_np,
        filter_radius=self._get_effective_filter_radius_m(),
    )
    key_to_array = {
        "origin_world_pose": origin_world_pose,  # (4,)
        "ego_agent_past": ego_agent_past,  # (time_len, 11)
        "neighbor_agents_past": neighbor_agents_past,
        # (chosen_agent_num, time_len, 11)

        # ✅ (요구조건 a) 최종 출력(규칙 적용된) future GT
        "neighbor_future_gt_3_dim": neighbor_future_gt_3_dim,
        # (chosen_agent_num, future_len, 3)
        "neighbor_future_gt_11_dim": neighbor_future_gt_11_dim,
        # (chosen_agent_num, future_len, 11)

        # 기존 키 유지(필요시):
        "neighbor_future_all_gt_3_dim": neighbor_future_all_gt_3_dim,
        # (chosen_agent_num, future_all_len, 3)
        "static_objects": static_objects,  # (chosen_static_num, 10)
    }

    key_to_road_safety = self._get_road_safety_features(
        scenario=scenario,
        ego_cur_pose_np=ego_cur_pose_np,
    )
    key_to_array.update(key_to_road_safety)

    (
        route_roadblock_ids,
        elements_to_obj_polylines,
        elements_to_traffic_light,
        speed_limit_dict,
        lanes_roadblock_id_list,
    ) = self._prepare_map(
        scenario=scenario,
        ego_state=ego_state,
        ego_point2d=ego_point2d,
        ego_heading=ego_heading,
        map_api=map_api,
        traffic_light_data=traffic_light_data,
    )

    car_token_to_rr_ids: Dict[
        str, List[str]] = self._prepare_car_token_to_rr_ids(
            scenario=scenario,
            use_route_lanes=use_route_lanes,
            neighbor_track_token=neighbor_track_token,
        )

    neighbor_agents_current = neighbor_agents_past[:, -1, :]
    map_key_to_array = map_process(
        route_roadblock_ids, car_token_to_rr_ids, neighbor_track_token,
        neighbor_agents_current, ego_cur_pose_np, elements_to_obj_polylines,
        elements_to_traffic_light, speed_limit_dict,
        lanes_roadblock_id_list, self._map_elements, self._max_map_elements,
        self._map_points_num)
    key_to_array.update(map_key_to_array)

    key_to_array = convert_data_dict_to_device_tensors(
        key_to_array, device, squeeze)

    key_to_array["neighbor_track_token"] = neighbor_track_token

    return key_to_array

@staticmethod
def zero_out_random_time_prefix(
        neighbor_agents_past: np.ndarray) -> np.ndarray:
    """주어진 neighbor_agents_past 텐서에서
    (max_agent_num, time_len, feature_dim) 형태를 가정하고,
    0 ~ time_len-1 사이에서 랜덤 target을 뽑아
    neighbor_agents_past[:, :target, :8] 구간을 0으로 만드는 함수.

    Args:
        neighbor_agents_past (np.ndarray):
            입력 텐서. shape = (num_agents, time_len, 11)

    Returns:
        np.ndarray:
            특정 시간 구간을 0으로 채운 텐서. shape 동일.
    """
    num_agents, time_len, feature_dim = neighbor_agents_past.shape

    # 0부터 time_len-1 사이 랜덤 target 선택
    target: int = np.random.randint(0, time_len // 2)
    print("target:", target)

    # 복사본을 만들어 수정 (원본을 바꾸고 싶으면 copy 제거)
    modified_past: np.ndarray = neighbor_agents_past.copy()

    # 첫 8개 feature만 0으로 세팅
    modified_past[:, :target, :8] = 0.0

    return modified_past

def _merge_and_interpolate_ego_11dim(
    self,
    ego_agent_past: np.ndarray,  # shape: (Tp, 11)
    ego_future_gt_11_dim: np.ndarray,  # shape: (Tf, 11)
    *,
    eps: float = 1e-8,
) -> Tuple[np.ndarray, np.ndarray]:
    """ego 과거~현재 + 미래를 합친 뒤, 유효~유효 사이에 무효(0)가 끼지 않게 만든다.

    - ego는 원래 대부분 "항상 존재"하지만,
      시나리오 길이가 짧아 미래 끝이 0으로 패딩되는 경우 등이 있어서
      안전하게 한 번 더 규칙을 강제한다.

    규칙
    ----
    - 현재 프레임(=past의 마지막)이 유효일 때만 처리한다.
    - 전체 101개를 과거→미래로 봤을 때,
      유효 프레임과 유효 프레임 사이에 무효 프레임이 끼면
      그 중간을 x/y/cos/sin/vx/vy의 "직선 중간값"으로 채워서
      연속 유효 구간을 만든다.
    - 구간 밖(prefix/suffix)은 0으로 둔다.

    Args:
        ego_agent_past (np.ndarray):
            shape: (Tp, 11)  # Tp=21
        ego_future_gt_11_dim (np.ndarray):
            shape: (Tf, 11)  # Tf=80

    Returns:
        Tuple[np.ndarray, np.ndarray]:
            - new_ego_agent_past: shape (Tp, 11)
            - new_ego_future_11:  shape (Tf, 11)
    """
    if ego_agent_past.ndim != 2 or ego_agent_past.shape[-1] != 11:
        raise ValueError(
            f"`ego_agent_past` shape는 (Tp, 11)이어야 합니다. got {ego_agent_past.shape}"
        )
    if ego_future_gt_11_dim.ndim != 2 or ego_future_gt_11_dim.shape[
            -1] != 11:
        raise ValueError(
            f"`ego_future_gt_11_dim` shape는 (Tf, 11)이어야 합니다. got {ego_future_gt_11_dim.shape}"
        )

    Tp: int = int(ego_agent_past.shape[0])
    Tf: int = int(ego_future_gt_11_dim.shape[0])
    if Tp == 0 or Tf == 0:
        return ego_agent_past, ego_future_gt_11_dim

    # full: (Tp+Tf, 11) == (101, 11)
    full: np.ndarray = np.concatenate(
        [ego_agent_past, ego_future_gt_11_dim], axis=0).astype(np.float32,
                                                               copy=True)
    T_full: int = int(full.shape[0])
    current_index: int = Tp - 1

    # 유효 프레임: 앞 8개 중 하나라도 0이 아니면 유효
    valid_mask_1d: np.ndarray = (np.abs(full[:, :8])
                                 > eps).any(axis=1)  # (T_full,)

    # 현재가 유효일 때만 규칙(b)을 강제
    if not bool(valid_mask_1d[current_index]):
        return ego_agent_past, ego_future_gt_11_dim

    valid_idx: np.ndarray = np.nonzero(valid_mask_1d)[0]  # (K,)
    if valid_idx.size == 0:
        return ego_agent_past, ego_future_gt_11_dim

    region_mask: np.ndarray = np.zeros((T_full,), dtype=bool)

    if valid_idx.size == 1:
        region_mask[int(valid_idx[0])] = True
    else:
        first_valid: int = int(valid_idx[0])
        last_valid: int = int(valid_idx[-1])
        region_mask[first_valid:last_valid + 1] = True

        # 중간 구멍이 있으면 x/y/cos/sin/vx/vy를 채움
        if last_valid - first_valid + 1 > valid_idx.size:
            xs: np.ndarray = valid_idx.astype(np.float64)
            seg_idx: np.ndarray = np.arange(first_valid,
                                            last_valid + 1,
                                            dtype=np.float64)

            for dim_idx in range(6):
                ys: np.ndarray = full[valid_idx, dim_idx].astype(np.float64,
                                                                 copy=False)
                interp_vals: np.ndarray = np.interp(seg_idx, xs, ys)
                full[first_valid:last_valid + 1,
                     dim_idx] = interp_vals.astype(np.float32, copy=False)

    # type/size는 현재 프레임 값을 대표값으로 쓴다.
    type_vec: np.ndarray = ego_agent_past[-1,
                                          8:11].astype(np.float32,
                                                       copy=False)  # (3,)
    rep_size: np.ndarray = ego_agent_past[-1,
                                          6:8].astype(np.float32,
                                                      copy=False)  # (2,)

    full[:, 8:11] = 0.0
    full[region_mask, 8:11] = type_vec

    # 구간 밖은 완전히 0으로
    full[~region_mask, :] = 0.0

    # width/length 채우기 + cos/sin 정리 (배치 함수 재사용을 위해 (1,T,11)로 바꿈)
    full_b: np.ndarray = full[None, :, :]  # (1, T_full, 11)
    region_b: np.ndarray = region_mask[None, :]  # (1, T_full)
    rep_size_b: np.ndarray = rep_size[None, :]  # (1, 2)

    full_b = self._fill_width_length_with_representative_size(
        traj_11=full_b,
        valid_mask=region_b,
        rep_size=rep_size_b,
    )
    full_b = self._normalize_cos_sin_in_traj_11(
        traj_11=full_b,
        valid_mask=region_b,
    )
    full = full_b[0]  # (T_full, 11)

    new_ego_agent_past: np.ndarray = full[:Tp, :]
    new_ego_future_11: np.ndarray = full[Tp:, :]
    return new_ego_agent_past, new_ego_future_11

@staticmethod
def _traj11_to_traj3_yaw(traj_11: np.ndarray) -> np.ndarray:
    """11차원 궤적을 [x, y, yaw] 3차원으로 바꾼다.

    Args:
        traj_11 (np.ndarray):
            - (T, 11) 또는 (N, T, 11)
            - 11차원 = [x, y, cos, sin, vx, vy, width, length, onehot(3)]

    Returns:
        np.ndarray:
            - (T, 3) 또는 (N, T, 3)
            - 3차원 = [x, y, yaw]
    """
    if traj_11.ndim == 2:
        yaw = np.arctan2(traj_11[:, 3], traj_11[:, 2])
        return np.stack([traj_11[:, 0], traj_11[:, 1], yaw], axis=-1)
    if traj_11.ndim == 3:
        yaw = np.arctan2(traj_11[:, :, 3], traj_11[:, :, 2])
        return np.stack([traj_11[:, :, 0], traj_11[:, :, 1], yaw], axis=-1)
    raise ValueError(
        f"`traj_11`은 (T,11) 또는 (N,T,11) 이어야 합니다. got {traj_11.shape}")

def _merge_and_interpolate_neighbor_11dim(
    self,
    neighbor_agents_past: np.ndarray,  # (max_agent_num, Tp, 11)
    neighbor_cur_fut_gt_11_dim: np.ndarray,
    # (max_agent_num, Tf, 11)  # 0번이 현재
) -> Tuple[np.ndarray, np.ndarray]:
    """neighbor 과거/현재와 현재/미래를 이어 붙인 뒤, "중간에 비었다가 다시 살아나는" 문제를 막는다.

    이 함수가 보장하는 규칙(한 agent 기준)
    -----------------------------------
    (a) 현재 프레임(=past의 마지막)이 무효라면:
        - 과거~미래 전체 프레임을 전부 0으로 만든다.
        - 즉, "현재는 없는데 미래에 갑자기 나타나는" 케이스를 없앤다.

    (b) 현재 프레임이 유효라면:
        - 전체 시퀀스를 과거→미래로 봤을 때,
          유효 프레임과 유효 프레임 사이에 무효(0) 프레임이 끼지 않게 만든다.
        - 방법:
          1) 전체 시퀀스에서 유효 프레임들의 첫 index(first_valid)와 마지막 index(last_valid)를 찾는다.
          2) first_valid~last_valid 구간은 "연속 유효 구간"으로 만들고,
             그 안에 비어 있던 프레임은 x/y/cos/sin/vx/vy를 직선 중간값으로 채운다.
          3) 구간 밖(prefix/suffix)은 그대로 0으로 둔다.

    Args:
        neighbor_agents_past (np.ndarray):
            shape: (N, Tp, 11)
        neighbor_cur_fut_gt_11_dim (np.ndarray):
            shape: (N, Tf, 11)
            index 0이 "현재"라고 가정한다.

    Returns:
        Tuple[np.ndarray, np.ndarray]:
            - new_neighbor_agents_past: shape (N, Tp, 11)
            - new_neighbor_future_11:  shape (N, Tf-1, 11)  # 현재 제외
    """
    # 기본 shape 검사
    if neighbor_agents_past.ndim != 3 or neighbor_cur_fut_gt_11_dim.ndim != 3:
        raise ValueError(
            f"`neighbor_agents_past` / `neighbor_cur_fut_gt_11_dim`는 "
            f"(max_agent_num, time_len, 11) 형태여야 합니다. "
            f"got {neighbor_agents_past.shape}, {neighbor_cur_fut_gt_11_dim.shape}"
        )
    if neighbor_agents_past.shape[-1] != 11 or \
            neighbor_cur_fut_gt_11_dim.shape[-1] != 11:
        raise ValueError(
            f"두 입력의 마지막 차원은 11이어야 합니다. "
            f"got {neighbor_agents_past.shape[-1]}, {neighbor_cur_fut_gt_11_dim.shape[-1]}"
        )
    if neighbor_agents_past.shape[0] != neighbor_cur_fut_gt_11_dim.shape[0]:
        raise ValueError(
            "neighbor_agents_past 와 neighbor_cur_fut_gt_11_dim 의 agent 축 크기가 다릅니다."
        )

    max_agent_num: int = int(neighbor_agents_past.shape[0])
    past_len: int = int(neighbor_agents_past.shape[1])  # Tp
    fut_len_with_current: int = int(
        neighbor_cur_fut_gt_11_dim.shape[1])  # Tf (0번 포함)

    # future 프레임이 아예 없으면 그대로 반환
    if fut_len_with_current == 0:
        return neighbor_agents_past, neighbor_cur_fut_gt_11_dim

    # current(0번) 프레임은 항상 한 번 제거
    # neighbor_future_wo_current: (N, Tf-1, 11)
    neighbor_future_wo_current: np.ndarray = neighbor_cur_fut_gt_11_dim[:,
                                                                        1:, :]

    # 에이전트가 0명이면 보간 없이 바로 반환
    if max_agent_num == 0:
        return neighbor_agents_past, neighbor_future_wo_current

    # full_traj_11: (N, Tp + (Tf-1), 11)
    full_traj_11: np.ndarray = np.concatenate(
        [neighbor_agents_past, neighbor_future_wo_current], axis=1)
    T_full: int = int(full_traj_11.shape[1])
    current_index: int = past_len - 1  # full_traj에서 현재는 past의 마지막

    # (1) size 대표값 계산 (기본: 과거~현재만 사용)
    # stable_size: (N, 2) = [width_rep, length_rep]
    stable_size: np.ndarray = self._estimate_stable_neighbor_sizes(
        full_traj_11=full_traj_11,
        past_len=past_len,
        use_future=False,
    )

    # (2) "유효 프레임" 마스크 계산 (앞 8개 값 중 하나라도 0이 아니면 유효)
    full_off_p_mask, _ = self._get_agents_past_cur_mask_np(full_traj_11)
    full_valid_mask: np.ndarray = ~full_off_p_mask  # (N, T_full)

    # (3) rule (a): 현재 프레임이 무효면 전체 0
    current_valid_mask: np.ndarray = full_valid_mask[:,
                                                     current_index]  # (N,)
    invalid_agents: np.ndarray = ~current_valid_mask

    # 결과 버퍼
    full_traj_interp: np.ndarray = full_traj_11.astype(np.float32,
                                                       copy=True)

    # 각 agent별 "연속 유효 구간" 마스크
    # region_mask_all: (N, T_full)
    region_mask_all: np.ndarray = np.zeros((max_agent_num, T_full),
                                           dtype=bool)

    # 현재가 무효인 agent는 전부 0으로 만들고 끝
    if np.any(invalid_agents):
        full_traj_interp[invalid_agents, :, :] = 0.0
        # region_mask_all은 그대로 False

    # (4) rule (b): 현재가 유효인 agent는 유효~유효 사이 구멍을 채움
    for agent_idx in range(max_agent_num):
        if invalid_agents[agent_idx]:
            continue

        agent_valid_idx: np.ndarray = \
        np.nonzero(full_valid_mask[agent_idx])[0]
        if agent_valid_idx.size == 0:
            # (현재는 유효인데 valid_idx가 0인 경우는 거의 없지만, 안전하게 0 처리)
            full_traj_interp[agent_idx, :, :] = 0.0
            continue

        first_valid: int = int(agent_valid_idx[0])
        last_valid: int = int(agent_valid_idx[-1])

        if agent_valid_idx.size == 1:
            # 유효 프레임이 1개면 "유효-무효-유효" 자체가 성립하지 않으므로 그대로 둠
            region_mask_all[agent_idx, first_valid] = True
        else:
            # first~last를 "연속 유효 구간"으로 선언
            region_mask_all[agent_idx, first_valid:last_valid + 1] = True

            # 중간에 빈 프레임이 있을 때만 x/y/cos/sin/vx/vy를 직선 중간값으로 채움
            if last_valid - first_valid + 1 > agent_valid_idx.size:
                xs: np.ndarray = agent_valid_idx.astype(np.float64)  # (K,)
                seg_idx: np.ndarray = np.arange(first_valid,
                                                last_valid + 1,
                                                dtype=np.float64)

                for dim_idx in range(6):  # 0~5: [x, y, cos, sin, vx, vy]
                    ys: np.ndarray = full_traj_11[agent_idx,
                                                  agent_valid_idx,
                                                  dim_idx].astype(
                                                      np.float64,
                                                      copy=False)
                    interp_vals: np.ndarray = np.interp(seg_idx, xs, ys)
                    full_traj_interp[agent_idx, first_valid:last_valid + 1,
                                     dim_idx] = interp_vals.astype(
                                         np.float32, copy=False)

        # 타입(one-hot)은 agent당 하나로 고정해서 "연속 유효 구간"에만 채움
        type_candidates: np.ndarray = full_traj_11[agent_idx, :,
                                                   8:11]  # (T_full, 3)
        type_valid_mask: np.ndarray = (np.abs(type_candidates).sum(axis=1)
                                       > 0)
        if np.any(type_valid_mask):
            type_vec: np.ndarray = type_candidates[type_valid_mask][
                0].astype(np.float32, copy=False)  # (3,)
        else:
            type_vec = np.zeros((3,), dtype=np.float32)

        full_traj_interp[agent_idx, :, 8:11] = 0.0
        full_traj_interp[agent_idx, region_mask_all[agent_idx],
                         8:11] = type_vec

        # 연속 유효 구간 밖은 완전히 0으로
        full_traj_interp[agent_idx, ~region_mask_all[agent_idx], :] = 0.0

    # (5) width/length를 대표값으로 통일해서 "연속 유효 구간"에만 채움
    full_traj_interp = self._fill_width_length_with_representative_size(
        traj_11=full_traj_interp,
        valid_mask=region_mask_all,  # (N, T_full)
        rep_size=stable_size,  # (N, 2)
    )

    # (6) cos/sin 길이를 1로 정리 (연속 유효 구간에만 적용)
    full_traj_interp = self._normalize_cos_sin_in_traj_11(
        traj_11=full_traj_interp,
        valid_mask=region_mask_all,
    )

    # 과거/현재와 미래로 다시 분리
    new_neighbor_agents_past: np.ndarray = full_traj_interp[:, :past_len, :]
    new_neighbor_future_11: np.ndarray = full_traj_interp[:,
                                                          past_len:, :]  # 현재 제외된 미래

    return new_neighbor_agents_past, new_neighbor_future_11

@staticmethod
def _get_agents_past_cur_mask_np(
        neighbor_agents_past: np.ndarray,  # (agents_num, time_len, 11)
) -> Tuple[np.ndarray, np.ndarray]:
    """에이전트 과거/현재 시퀀스에서 유효성 마스크를 **NumPy 입출력**으로 계산한다.

    정의
    ----
    - 프레임 유효(on/off) 판정(포인트 단위):
      마지막 차원 앞 8개([x, y, cos, sin, vx, vy, width, length]) 값 중
      하나라도 0이 아니면 **유효(True)**, 모두 0이면 **무효(False)**.
    - 에이전트 유효(on/off) 판정(에이전트 단위):
      해당 에이전트의 모든 프레임이 무효이면 **무효(True)**.

    Args:
        neighbor_agents_past (np.ndarray):
            에이전트 과거/현재 시퀀스. shape = (agents_num, time_len, 11)

    Returns:
        Tuple[np.ndarray, np.ndarray]:
            - agents_past_cur_off_p_mask (np.ndarray): shape = (agents_num, time_len), dtype=bool
              각 프레임이 **무효(True)** 인지 여부(포인트 단위 마스크).
            - agents_past_cur_off_mask (np.ndarray): shape = (agents_num,), dtype=bool
              에이전트 전체가 **무효(True)** 인지 여부(에이전트 단위 마스크).

    Raises:
        ValueError: 입력이 (N, T, 11) 형태가 아니거나 마지막 차원(<8)일 때.
    """
    if neighbor_agents_past.ndim != 3 or neighbor_agents_past.shape[-1] < 8:
        raise ValueError(
            f"`neighbor_agents_past`는 (agents_num, time_len, 11) 형태여야 하며 "
            f"마지막 차원은 최소 8이어야 합니다. got {neighbor_agents_past.shape}")

    # (agents_num, time_len, 8)  — 0이 아니면 True
    agents_past_current_is_not_zero = (neighbor_agents_past[..., :8] != 0)

    # (agents_num, time_len) — 8개 값 중 하나라도 0이 아니면 유효
    agents_past_current_not_zero_num = agents_past_current_is_not_zero.sum(
        axis=-1)
    agents_past_cur_off_p_mask = (agents_past_current_not_zero_num == 0
                                 )  # 무효(True)

    # (agents_num) — 에이전트 단위: 유효 프레임 수가 0이면 무효(True)
    agents_past_cur_on_p_mask = ~agents_past_cur_off_p_mask
    agents_past_cur_off_mask = (agents_past_cur_on_p_mask.sum(axis=-1) == 0)

    return agents_past_cur_off_p_mask.astype(
        bool), agents_past_cur_off_mask.astype(bool)

def _get_past_cur_agents_feature(
    self,
    scenario: Optional[NuPlanScenario] = None,
    observation_buffer: Optional[Deque[Observation]] = None,
) -> Tuple[
        List[np.ndarray],  # past_cur_agents_world_8_list
        List[List[TrackedObjectType]],  # past_cur_agents_types_list
        np.ndarray,  # present_static_feat_5
        List[TrackedObjectType],  # static_types_list
        Dict[str, int],  # token_to_id
        Optional[TrackedObjects],  # present_tracked_objects
        Optional[List[TrackedObjects]],  # past_cur_tracked_objects
]:
    """과거+현재 에이전트 / 정적 객체 정보를 공통 포맷으로 추출하는 함수.

    두 가지 입력 경로를 지원합니다.

    1) 오프라인 전처리 (scenario 기반)
        - nuPlanScenario 에서
          · 과거+현재 프레임의 동적 객체(차량/보행자/자전거) 배열
          · 현재 프레임의 정적 객체(표지판, 배리어 등) 배열
          을 추출합니다.

    2) 온라인 시뮬레이션 (observation_buffer 기반)
        - 시뮬레이터의 observation_buffer(연속 관측치)에서
          같은 형태의 정보를 뽑아냅니다.

    두 입력을 동시에 쓰거나, 둘 다 안 주면 오류를 발생시킵니다.

    Args:
        scenario (Optional[NuPlanScenario]):
            - 오프라인 전처리용 nuPlan 시나리오.
            - 과거/현재의 TrackedObjects 를 직접 얻을 때 사용.
        observation_buffer (Optional[Deque[Observation]]):
            - 시뮬레이션 중 관측 버퍼(과거 → 현재 순서).
            - 각 원소는 보통 `DetectionsTracks` 타입이며,
              그 안에 `.tracked_objects` 가 들어 있습니다.

    Returns:
            - past_cur_agents_world_8_list:
                · 길이: num_frames
                · 각 원소: (frame_agents_num, 8) float 배열
                · 각 행 = 한 에이전트, 열 = ID/속도/방향/크기/위치 등
            - past_cur_agents_types_list:
                · 길이: num_frames
                · 각 프레임에서 에이전트 타입(차량/보행자/자전거) 리스트
            - present_static_feat_5:
                · 모양: (cur_static_num, 5)
                · [x, y, heading, width, length] (현재 프레임의 정적 객체)
            - static_types_list:
                · 길이: cur_static_num
                · 각 정적 객체의 타입 리스트
            - token_to_id: Dict[str, int]:
                · 현재 프레임에 등장하는 에이전트 토큰 → 정수 ID
                - (딕셔너리)
            - present_tracked_objects:
                · scenario 경로일 때만 유효(현재 프레임의 TrackedObjects)
                · observation_buffer 경로에서는 None
            - past_cur_tracked_objects:
                · scenario 경로일 때만 유효(과거+현재 TrackedObjects 리스트)
                · observation_buffer 경로에서는 None
    """
    # 입력 유효성 검사
    if (scenario is None and observation_buffer is None) or \
       (scenario is not None and observation_buffer is not None):
        raise ValueError(
            "scenario 또는 observation_buffer 중 정확히 하나만 전달해야 합니다.")
    # --------------------------------------------------
    # 1) scenario 기반 (오프라인 전처리 / work()에서 사용)
    # --------------------------------------------------
    if scenario is not None:
        # 현재 프레임의 동적 객체
        present_tracked_objects = scenario.initial_tracked_objects.tracked_objects

        # 과거 프레임의 동적 객체들
        past_tracked_objects: List[TrackedObjects] = [
            tracked_objects.tracked_objects
            for tracked_objects in scenario.get_past_tracked_objects(
                iteration=0,
                time_horizon=self.past_time_horizon,
                num_samples=self.num_past_poses,
            )
        ]

        # 과거 + 현재를 시간순으로 이어붙인 리스트
        past_cur_tracked_objects = past_tracked_objects + [
            present_tracked_objects
        ]

        agents_source_seq = past_cur_tracked_objects  # List[TrackedObjects]
        static_source = present_tracked_objects  # TrackedObjects
    else:
        # --------------------------------------------------
        # 2) observation_buffer 기반 (온라인 inference / observation_adapter)
        # --------------------------------------------------
        assert observation_buffer is not None  # 타입 체커용

        present_tracked_objects = None
        past_cur_tracked_objects = None

        agents_source_seq = observation_buffer  # Deque[Observation]
        static_source = observation_buffer[-1]  # 가장 최근 프레임, Observation

    # 공통 로직: 에이전트 시퀀스 → 프레임별 에이전트 배열/타입
    past_cur_agents_world_8_list, past_cur_agents_types_list, token_to_id = \
        sampled_tracked_objects_to_array_list(agents_source_seq)

    # 공통 로직: 현재 프레임의 정적 객체 배열/타입
    present_static_feat_5, static_types_list = \
        sampled_static_objects_to_array_list(static_source)

    # 하나의 return 지점
    return (
        past_cur_agents_world_8_list,  # List[np.ndarray], #  (frame_agents_num, 8)
        past_cur_agents_types_list,  # List[List[TrackedObjectType]],
        present_static_feat_5,  # np.ndarray, (len(static_obj), 5)
        static_types_list,  # List[TrackedObjectType],
        token_to_id,  # Dict[str, int],
        present_tracked_objects,  # Optional[TrackedObjects],
        past_cur_tracked_objects,  # Optional[List[TrackedObjects]]
    )

def _prepare_map(
    self,
    scenario: NuPlanScenario,
    ego_state: EgoState,
    ego_point2d: Point2D,
    ego_heading: float,
    map_api: NuPlanMap,
    traffic_light_data: Optional[List[TrafficLightStatusData]] = None,
) -> Tuple[List[str], Dict[str, MapObjectPolylines], Dict[
        str, LaneSegmentTrafficLightData], Dict[str, np.ndarray],
           List[str]]:
    """지도 관련 입력(route/차선/신호/속도제한)을 한 번에 준비하는 공통 유틸.

    공통 흐름:
      1) 시나리오의 route_roadblock_ids 를 가져와 끊어진 구간을 보정한다.
      2) ego 주변의 차선/경계/신호/속도제한 정보를 get_neighbor_vector_set_map 으로 뽑는다.
         - 온라인(inference) 경로: 외부에서 넘어온 traffic_light_data 사용
         - 오프라인(work) 경로: traffic_light_data 가 None 이므로 iteration=0 기준으로 자체 조회

    Args:
        scenario: nuPlan 시나리오 객체.
        ego_state: 현재 ego 상태 (rear_axle 기준).
        ego_point2d: ego 위치 (x, y).
        ego_heading: ego 진행 방향(rad).
        map_api: NuPlanMap 인스턴스.
        traffic_light_data:
            - observation_adapter 경로: 현재 시점의 신호등 리스트를 그대로 전달
            - work 경로: None → 시나리오 0번 iteration 에서 조회
    """
    # 1) route roadblock 보정
    route_roadblock_ids = scenario.get_route_roadblock_ids()
    if route_roadblock_ids != ['']:
        route_roadblock_ids = route_roadblock_correction(
            ego_state, map_api, list(route_roadblock_ids))
    else:
        route_roadblock_ids = []

    # 2) 신호등 데이터 준비
    if traffic_light_data is None:
        traffic_light_data = list(
            scenario.get_traffic_light_status_at_iteration(0))

    # 3) ego 주변 차선/경계/신호/속도제한 추출
    """
1. elements_to_obj_polylines: Dict[str, MapObjectPolylines],
   - 키: 맵 요소 이름 문자열 "LANE", "LEFT_BOUNDARY", "RIGHT_BOUNDARY", "CROSSWALK", ...
   - 값: 해당 요소를 이루는 점들의 모음(MapObjectPolylines)
- 내부 구조: [num_elements, num_points_i, 2]
2. elements_to_traffic_light: Dict[str, LaneSegmentTrafficLightData],
   - 키: 맵 요소 이름 문자열(현재 "LANE"만 사용)
   - 값: 해당 요소에 대응되는 신호등 상태 정보 (LaneSegmentTrafficLightData)
        - 내부 구조: (num_lanes, 4) one-hot
3. speed_limit_dict: Dict[str, np.ndarray],
   - "lane_has_speed_limit": (num_lanes,), bool
   - "lane_speed_limit": (num_lanes,), float32
4. lanes_roadblock_id_list: List[str],
   - 각 차선이 속한 도로 묶음(roadblock) ID 리스트 (길이 = num_lanes)
    """
    (
        elements_to_obj_polylines,
        elements_to_traffic_light,
        speed_limit_dict,
        lanes_roadblock_id_list,
    ) = get_neighbor_vector_set_map(
        map_api,
        self._map_elements,
        ego_point2d,
        ego_heading,
        self._get_map_query_radius_m(),
        traffic_light_data,
    )

    return (
        route_roadblock_ids,
        elements_to_obj_polylines,
        elements_to_traffic_light,
        speed_limit_dict,
        lanes_roadblock_id_list,
    )

def _get_road_safety_features(
    self,
    scenario: NuPlanScenario,
    ego_cur_pose_np: np.ndarray,
) -> Dict[str, np.ndarray]:
    key_to_road_safety = {}
    stop_sign_points = extract_stop_sign_points(
        scenario,
        ego_cur_pose_np,
        self.config.safety_len,
        self._get_map_query_radius_m(),
    )
    crosswalk_points = extract_crosswalk_points(
        scenario,
        ego_cur_pose_np,
        self.config.safety_len,
        self._get_map_query_radius_m(),
    )
    key_to_road_safety["stop_sign_points"] = stop_sign_points
    key_to_road_safety["crosswalk_points"] = crosswalk_points
    return key_to_road_safety

# Use for data preprocess
def work(self, scenarios: List[NuPlanScenario]) -> None:
    for scenario in scenarios:
        map_name = scenario._map_name
        scenario_token = scenario.token
        map_api = scenario.map_api

        (ego_state, ego_point2d, ego_heading, ego_cur_pose_np,
         past_cur_ego_world_10,
         past_cur_time_np) = self._get_past_cur_ego_feature(
             scenario=scenario,
             set_coord_as_center=self.set_coord_as_center,
         )
        # ✅ 추가: ego 기준 좌표계 원점의 세계좌표 포즈 저장
        # origin_world_pose: shape (4,) = [x_world, y_world, cos(yaw), sin(yaw)]
        origin_world_pose: np.ndarray = self._build_origin_world_pose(
            ego_cur_pose_np)

        ego_agent_past = build_ego_past_feature(
            past_cur_ego_world_10=past_cur_ego_world_10,
            ego_cur_pose_np=ego_cur_pose_np,
        )

        # ─────────────────────────────────────────────
        # ✅ (요구조건 b) ego: past+future(101) 기반으로 규칙 적용
        #    그리고 ego_future_gt_3_dim은 “규칙 적용된 11dim”에서 다시 생성
        # ─────────────────────────────────────────────
        (_, ego_future_gt_11_dim_raw) = get_ego_future_array_from_scenario(
            scenario, ego_state, self.num_future_poses,
            self.future_time_horizon)

        ego_agent_past, ego_future_gt_11_dim = self._merge_and_interpolate_ego_11dim(
            ego_agent_past=ego_agent_past,
            ego_future_gt_11_dim=ego_future_gt_11_dim_raw,
        )

        # ✅ 3차원은 반드시 “정리된 11차원”에서 다시 만들기
        ego_future_gt_3_dim = self._traj11_to_traj3_yaw(
            ego_future_gt_11_dim)

        # ✅ center 기준 옵션이면 x,y 원점 보정
        ego_future_gt_3_dim, ego_future_gt_11_dim = self._adjust_ego_future_outputs_to_center_frame(
            ego_state=ego_state,
            ego_future_gt_3_dim=ego_future_gt_3_dim,
            ego_future_gt_11_dim=ego_future_gt_11_dim,
            set_coord_as_center=self.set_coord_as_center,
        )

        (
            past_cur_agents_world_8_list,
            past_cur_agents_types_list,
            present_static_feat_5,
            static_types_list,
            token_to_id,
            present_tracked_objects,
            past_cur_tracked_objects,
        ) = self._get_past_cur_agents_feature(scenario=scenario)

        (neighbor_agents_past, agents_cur_frame_indices, neighbors_id,
         neighbor_track_token) = build_neighbor_past_feature(
             past_cur_agents_world_8_list=past_cur_agents_world_8_list,
             past_cur_agents_types_list=past_cur_agents_types_list,
             max_agent_num=self.caching_max_agent_num,
             ego_cur_pose_np=ego_cur_pose_np,
             max_pedestrians=self.max_pedestrians,
             max_bicycles=self.max_bicycles,
             token_to_id=token_to_id,
             filter_radius=self._get_effective_filter_radius_m(),
         )

        ego_time_len = ego_agent_past.shape[0]
        neighbor_time_len = neighbor_agents_past.shape[1]
        assert ego_time_len == neighbor_time_len == self.num_past_poses + 1, \
            f"Expected time length {self.num_past_poses + 1}, got ego {ego_time_len}, neighbor {neighbor_time_len}"

        cur_fut_agents_world_8_list = self._get_cur_fut_agents_world_8_list(
            scenario, token_to_id, do_inference=False)

        neighbor_cur_fut_gt_11_dim = agent_future_all_process(
            ego_cur_pose_np=ego_cur_pose_np,
            cur_fut_agents_world_8_list=cur_fut_agents_world_8_list,
            neighbor_token_id=neighbors_id,
            neighbor_agents_past=neighbor_agents_past,
        )

        # ─────────────────────────────────────────────
        # ✅ (요구조건 a) neighbor: past+future(101) 기반으로 규칙 적용 후
        #    neighbor_future_gt_3_dim / neighbor_future_gt_11_dim 최종 생성
        # ─────────────────────────────────────────────
        neighbor_agents_past, neighbor_future_gt_11_dim, neighbor_future_gt_3_dim = \
            self._build_neighbor_future_gt_from_past_and_cur_fut_11dim(
                neighbor_agents_past=neighbor_agents_past,
                neighbor_cur_fut_gt_11_dim=neighbor_cur_fut_gt_11_dim,
            )

        static_objects = build_static_feature(
            present_static_feat_5=present_static_feat_5,
            static_types_list=static_types_list,
            max_static_num=self.caching_max_static_num,
            ego_cur_pose_np=ego_cur_pose_np,
            filter_radius=self._get_effective_filter_radius_m(),
        )

        key_to_array = {
            "origin_world_pose": origin_world_pose,  # (4,)
            "ego_agent_past": ego_agent_past,  # (time_len, 11)
            "ego_future_gt_3_dim": ego_future_gt_3_dim,  # (future_len, 3)
            "ego_future_gt_11_dim": ego_future_gt_11_dim,
            # (future_len, 11)
            "neighbor_agents_past": neighbor_agents_past,
            # (chosen_agent_num, time_len, 11)

            # ✅ (요구조건 a) 최종 출력
            "neighbor_future_gt_3_dim": neighbor_future_gt_3_dim,
            # (chosen_agent_num, future_len, 3)
            "neighbor_future_gt_11_dim": neighbor_future_gt_11_dim,
            # (chosen_agent_num, future_len, 11)
            "static_objects": static_objects,  # (chosen_static_num, 10)
        }

        key_to_road_safety = self._get_road_safety_features(
            scenario=scenario,
            ego_cur_pose_np=ego_cur_pose_np,
        )
        key_to_array.update(key_to_road_safety)

        (
            route_roadblock_ids,
            elements_to_obj_polylines,
            elements_to_traffic_light,
            speed_limit_dict,
            lanes_roadblock_id_list,
        ) = self._prepare_map(
            scenario=scenario,
            ego_state=ego_state,
            ego_point2d=ego_point2d,
            ego_heading=ego_heading,
            map_api=map_api,
        )

        car_token_to_rr_ids: Dict[str,
                                  List[str]] = get_npc_route_roadblock_ids(
                                      scenario, past_cur_tracked_objects,
                                      neighbor_track_token)

        neighbor_agents_current = neighbor_agents_past[:, -1, :]

        map_key_to_array = map_process(
            route_roadblock_ids, car_token_to_rr_ids, neighbor_track_token,
            neighbor_agents_current, ego_cur_pose_np,
            elements_to_obj_polylines, elements_to_traffic_light,
            speed_limit_dict, lanes_roadblock_id_list, self._map_elements,
            self._caching_max_map_elements, self._map_points_num)
        key_to_array.update(map_key_to_array)

        chore_data = {
            "map_name": map_name,
            "token": scenario_token,
        }
        key_to_array.update(chore_data)

        if self.config.make_statistics_when_caching:
            veh_cnt, ped_cnt, bic_cnt = self._count_valid_neighbors_by_type(
                neighbor_agents_past=neighbor_agents_past)
            ratio_percent, mean_speed_kmh = self._compute_lane_speed_stats(
                map_key_to_array)
            stats_payload = {
                "vehicle_count":
                    int(veh_cnt),
                "pedestrian_count":
                    int(ped_cnt),
                "bicycle_count":
                    int(bic_cnt),
                "lane_speed_limit_ratio_percent":
                    float(ratio_percent),
                "mean_speed_limit_kmh": (None if mean_speed_kmh is None else
                                         float(mean_speed_kmh)),
            }
            self._save_sample_stats_json(map_name, scenario_token,
                                         stats_payload)

        final_file_name = f"{key_to_array['map_name']}_{key_to_array['token']}"
        ego_agent_past = key_to_array["ego_agent_past"]  # (time_len, 11)
        ego_agent_type = ego_agent_past[:, 8:11]  # (time_len, 3)
        print("ego_agent_type sum all: ",
              ego_agent_type.sum(axis=0))  # (3, )
        ego_future_gt_11_dim = key_to_array[
            "ego_future_gt_11_dim"]  # (future_len, 11)
        ego_agent_type_future = ego_future_gt_11_dim[:, 8:
                                                     11]  # (future_len, 3)
        print("ego_agent_type_future sum all: ",
              ego_agent_type_future.sum(axis=0))  # (3, )

        self.save_to_disk(self._save_dir, final_file_name, key_to_array)

        key_to_array["neighbor_track_token"] = neighbor_track_token

        if self.config.save_image:
            save_dir = os.path.join(self._save_dir, "debug_vis")
            save_path = os.path.join(save_dir, f"{final_file_name}.png")
            os.makedirs(save_dir, exist_ok=True)
            key_to_array["token_to_future_traj_wrt_ego"] = None
            draw_machine.draw_world_model_to_png(key_to_array,
                                                 output_data={},
                                                 save_path=save_path)

@staticmethod
def _estimate_stable_neighbor_sizes(
    full_traj_11: np.ndarray,  # shape: (N, T_full, 11)
    past_len: int,
    *,
    use_future: bool = False,
    eps: float = 1e-3,
    width_max: float = 20.0,
    length_max: float = 60.0,
) -> np.ndarray:  # shape: (N, 2)
    """이웃 에이전트별 width/length 대표값(하나)을 만든다.

    배경
    ----
    nuPlan의 박스 width/length는 프레임마다 조금씩 흔들릴 수 있다.
    그런데 실제 물체의 크기는 시간에 따라 바뀌지 않는 값이므로,
    여러 프레임을 보고 "대표 크기" 하나를 만든 뒤 시간축 전체에 쓰는 편이 안정적이다.

    이 함수가 하는 일
    -----------------
    - 각 에이전트(i)에 대해, 여러 프레임에서 관측된 width/length를 모은다.
    - 그 중에서 "쓸 만한 값"만 남긴 뒤,
      정렬했을 때 가운데 값(중간값)을 대표값으로 선택한다.
      (한두 번 튀는 값이 있어도 평균보다 덜 흔들리기 때문)

    "쓸 만한 값" 조건
    ----------------
    1) 해당 프레임이 패딩이 아님:
       - [x, y, cos, sin, vx, vy] 중 하나라도 0이 아니면 패딩이 아니라고 본다.
    2) width > eps, length > eps
    3) 너무 큰 값은 버린다:
       - width <= width_max, length <= length_max

    시간 구간 선택
    ------------
    - use_future=False:
        과거~현재(past_len 프레임)만 보고 대표값을 만든다.
        (실제로 미래가 없는 환경과 맞추려면 이게 더 안전하다.)
    - use_future=True:
        과거~현재~미래 전체(full_traj_11 전체 프레임)를 보고 대표값을 만든다.
        (완전 오프라인에서 더 많이 평균내고 싶을 때 선택)

    값이 하나도 없을 때(예외 처리)
    ----------------------------
    - 위 조건을 통과한 width/length가 하나도 없다면,
      과거~현재의 마지막 프레임(현재 프레임)의 width/length를 fallback으로 쓴다.
      그것마저 0이면 결과도 0으로 남는다.

    Args:
        full_traj_11 (np.ndarray):
            shape = (N, T_full, 11)
            [x, y, cos, sin, vx, vy, width, length, onehot(3)]
        past_len (int):
            shape 관점에서 과거~현재 길이.
            full_traj_11[:, :past_len, :] 구간이 과거~현재라고 본다.
        use_future (bool):
            True면 미래까지 포함해서 대표 크기를 만든다.
        eps (float):
            0에 매우 가까운 값들을 "없는 값"으로 보기 위한 기준.
        width_max (float):
            말도 안 되게 큰 width를 버리기 위한 상한.
        length_max (float):
            말도 안 되게 큰 length를 버리기 위한 상한.

    Returns:
        np.ndarray:
            shape = (N, 2)
            각 에이전트의 [width_rep, length_rep] (float32).
    """
    if full_traj_11.ndim != 3 or full_traj_11.shape[-1] != 11:
        raise ValueError(
            f"`full_traj_11` shape는 (N, T, 11)이어야 합니다. got {full_traj_11.shape}"
        )
    if past_len <= 0 or past_len > full_traj_11.shape[1]:
        raise ValueError(
            f"`past_len`은 1 이상이며 T_full 이하이어야 합니다. got past_len={past_len}, T_full={full_traj_11.shape[1]}"
        )

    N: int = int(full_traj_11.shape[0])
    T_full: int = int(full_traj_11.shape[1])

    # 대표값 계산에 사용할 구간 길이
    T_src: int = T_full if use_future else int(past_len)

    # src_traj: (N, T_src, 11)
    src_traj: np.ndarray = full_traj_11[:, :T_src, :]

    # 패딩이 아닌 프레임 마스크: (N, T_src)
    #  - [x, y, cos, sin, vx, vy] 중 하나라도 0이 아니면 True
    dynamic_valid: np.ndarray = (np.abs(src_traj[:, :, :6])
                                 > eps).any(axis=-1)

    # size: (N, T_src, 2) = [width, length]
    size: np.ndarray = src_traj[:, :, 6:8]

    # size 값이 "쓸 만한지" 마스크: (N, T_src)
    size_valid: np.ndarray = ((size[:, :, 0] > eps) &
                              (size[:, :, 1] > eps) &
                              (size[:, :, 0] <= float(width_max)) &
                              (size[:, :, 1] <= float(length_max)) &
                              dynamic_valid)

    # fallback: 현재 프레임(과거~현재의 마지막) size
    # fallback_size: (N, 2)
    fallback_size: np.ndarray = full_traj_11[:, past_len - 1,
                                             6:8].astype(np.float32,
                                                         copy=False)

    # out: (N, 2)
    out: np.ndarray = fallback_size.copy()

    # 에이전트별로 대표값(중간값) 계산
    for i in range(N):
        # valid_vals: (K, 2)  K는 유효 샘플 개수(가변)
        valid_vals: np.ndarray = size[i][size_valid[i]]
        if valid_vals.shape[0] == 0:
            continue
        # 중간값(정렬했을 때 가운데 값): (2,)
        out[i] = np.median(valid_vals, axis=0).astype(np.float32,
                                                      copy=False)

    return out.astype(np.float32, copy=False)

@staticmethod
def _fill_width_length_with_representative_size(
        traj_11: np.ndarray,  # shape: (N, T, 11)
        valid_mask: np.ndarray,  # shape: (N, T)
        rep_size: np.ndarray,  # shape: (N, 2)
) -> np.ndarray:
    """width/length 채널(6:8)을 대표값으로 통일해서 넣는다.

    이 함수가 하는 일
    -----------------
    - traj_11의 width/length 채널을 먼저 대표값으로 채운다.
    - 그 다음 valid_mask가 False인 프레임은 width/length를 0으로 만든다.
      (즉, "유효한 프레임에서만" size가 존재하도록 맞춘다.)

    Args:
        traj_11 (np.ndarray):
            shape = (N, T, 11)
            [x, y, cos, sin, vx, vy, width, length, onehot(3)]
        valid_mask (np.ndarray):
            shape = (N, T)
            True면 유효 프레임, False면 패딩/무효 프레임이라고 본다.
        rep_size (np.ndarray):
            shape = (N, 2)
            각 에이전트의 [width_rep, length_rep].

    Returns:
        np.ndarray:
            shape = (N, T, 11)
            width/length가 대표값으로 통일된 traj_11 (입력을 직접 수정하고 그대로 반환).
    """
    if traj_11.ndim != 3 or traj_11.shape[-1] != 11:
        raise ValueError(
            f"`traj_11` shape는 (N, T, 11)이어야 합니다. got {traj_11.shape}")
    if valid_mask.shape != traj_11.shape[:2]:
        raise ValueError(
            f"`valid_mask` shape는 (N, T)이어야 합니다. got {valid_mask.shape}, expected {traj_11.shape[:2]}"
        )
    if rep_size.shape != (traj_11.shape[0], 2):
        raise ValueError(
            f"`rep_size` shape는 (N, 2)이어야 합니다. got {rep_size.shape}, expected {(traj_11.shape[0], 2)}"
        )

    # 대표 size를 모든 프레임에 채우고, 유효 마스크로 무효 프레임은 0 처리
    # traj_11[:, :, 6:8]: (N, T, 2)
    traj_11[:, :, 6:8] = rep_size[:, None, :].astype(traj_11.dtype,
                                                     copy=False)
    traj_11[:, :, 6:8] *= valid_mask[:, :, None].astype(traj_11.dtype,
                                                        copy=False)
    return traj_11

def _get_future_tracked_objects_array_list(
    self,
    scenario: NuPlanScenario,
    token_to_id: Dict[str, int],
    iteration: int = 0,
    future_time_horizon: Optional[float] = None,
    num_samples: Optional[int] = None,
) -> Tuple[List[np.ndarray], Dict[str, int]]:
    """현재 시점부터 일정 시간 동안의 모든 에이전트 상태를
    프레임별 배열 리스트로 뽑아낸다.

    하는 일 요약
    -------------
    1) 주어진 iteration 에서
       - 현재 프레임의 TrackedObjects
       - 그 이후 future_time_horizon 동안, num_samples 개의 미래 TrackedObjects
       를 가져온다.

    2) `sampled_tracked_objects_to_array_list` 를 통해,
       각 프레임을 (frame_agents_num, 8) 형태의 배열로 바꾼다.
       - 각 행: [track_id, vx, vy, heading, width, length, x, y]
       - 프레임마다 에이전트 수(frame_agents_num)는 달라질 수 있다.
       - 리스트 순서는 [현재, t+1, t+2, ...] 시간 순서.

    3) 동시에, track_token(문자열)을 일관된 정수 ID 로 바꿔주는
       token_to_id 매핑 사전도 함께 만든다.

    Args:
        iteration (int, optional):
            기준이 되는 현재 step 인덱스(0 기반).
        future_time_horizon (Optional[float], optional):
            현재 이후로 몇 초까지 볼 것인지. None 이면 self.future_time_horizon 사용.
        num_samples (Optional[int], optional):
            몇 개의 미래 프레임을 뽑을지. None 이면 self.num_future_poses 사용.

    Returns:
        Tuple[List[np.ndarray], Dict[str, int]]:
            - cur_fut_agents_world_8_list: List[np.ndarray]
                · 길이: 1 + num_samples
                · 각 원소 shape: (frame_agents_num_t, 8)
                  [track_id, vx, vy, heading, width, length, x, y]
                · 리스트 순서: [현재, t+1, t+2, ...]
            - token_to_id: Dict[str, int]
                · 전체 프레임에서 등장한 track_token → 정수 ID 매핑 사전.
    """
    present_tracked_objects: TrackedObjects = scenario.get_tracked_objects_at_iteration(
        iteration).tracked_objects

    if future_time_horizon is None:
        future_time_horizon = self.future_time_horizon
    if num_samples is None:
        num_samples = self.num_future_poses

    # 미래 프레임들의 TrackedObjects 리스트
    future_tracked_objects: List[TrackedObjects] = [
        tracked_objects.tracked_objects
        for tracked_objects in scenario.get_future_tracked_objects(
            iteration=iteration,
            time_horizon=future_time_horizon,
            num_samples=num_samples,
        )
    ]

    # [현재] + [미래들] 을 하나의 시퀀스로 합친다.
    sampled_future_observations: List[TrackedObjects] = [
        present_tracked_objects
    ] + future_tracked_objects

    # cur_fut_agents_world_8_list: List[np.ndarray]
    #   - 각 원소: (frame_agents_num, 8)
    # token_to_id: Dict[str, int]
    (cur_fut_agents_world_8_list, _,
     token_to_id) = sampled_tracked_objects_to_array_list(
         sampled_future_observations, token_to_id)

    return cur_fut_agents_world_8_list, token_to_id

def save_to_disk(self, dir: str, final_file_name: str,
                 data: Dict[str, np.ndarray]) -> None:
    final_path = f"{dir}/{final_file_name}.npz"
    tmp_path = final_path + ".tmp"

    os.makedirs(dir, exist_ok=True)

    try:
        with open(tmp_path, "wb") as f:
            np.savez_compressed(f, **data)
            f.flush()
            os.fsync(f.fileno())
        os.replace(tmp_path, final_path)

    except BaseException:  # ✅ Ctrl+C(KeyboardInterrupt)도 여기로 들어옴
        with contextlib.suppress(Exception):
            if os.path.exists(tmp_path):
                os.remove(tmp_path)
        raise

==============
<diffusion_planner/data_process/agent_process.py>
"""
Module: Agent Data Preprocessing Functions
Description: This module contains functions for agents related data processing.

Categories:
1. Get list of agent array from raw data
2. Get agents array for model input
"""
import numpy as np
from typing import Dict, Deque, List, Tuple, Optional, Union
from nuplan.common.actor_state.tracked_objects import TrackedObjects, TrackedObject
from nuplan.planning.training.preprocessing.utils.agents_preprocessing import AgentInternalIndex
from nuplan.common.actor_state.tracked_objects_types import TrackedObjectType
from nuplan.planning.simulation.observation.observation_type import DetectionsTracks, Observation
from nuplan.common.actor_state.ego_state import EgoState
from diffusion_planner.data_process.utils import convert_absolute_quantities_to_relative
from nuplan.common.geometry.convert import numpy_array_to_absolute_velocity

=====================

1. Get list of agent array from raw data

=====================

def _extract_agent_array(
tracked_objects: TrackedObjects,
token_to_id: Dict[str, int],
) -> Tuple[np.ndarray, Dict[str, int], List[TrackedObjectType]]:
"""단일 시점의 감지 결과에서 에이전트 정보를 배열로 뽑아내는 함수.

이 함수는 한 프레임 안에 존재하는 여러 객체들 중,
지정된 종류(차량, 보행자, 자전거 등)에 해당하는 것만 골라서
**숫자 배열**로 정리해 줍니다. 각 에이전트는 다음과 같은 값들을 가집니다.

- 고유 ID(정수, track_token 을 숫자로 매핑한 값)
- 속도(vx, vy)
- 진행 방향(heading, radian)
- 너비/길이(width, length)
- 위치(x, y)

또한 문자열 기반의 track_token 을 재사용 가능한 정수 ID로 바꾸기 위해
`token_to_id` 딕셔너리를 갱신합니다.

Args:
    tracked_objects (TrackedObjects):
        - 현재 프레임에서 감지된 모든 객체들의 모음입니다.
    token_to_id (Dict[str, int]):
        - track_token(문자열)을 정수 ID로 매핑하는 사전입니다.
        - 새로 등장한 track_token 은 여기서 새로운 ID를 부여받습니다.

Returns:
    Tuple[np.ndarray, Dict[str, int], List[TrackedObjectType]]:
        - a_frame_agents_feat_8 (np.ndarray):
            - 모양: (frame_agents_num, AgentInternalIndex.dim() = 8 )
            - int id , vx, vy, heading, width, length, x, y
            - dtype: float64
            - 각 행은 한 에이전트에 해당하며,
              `AgentInternalIndex` 정의 순서대로 값이 들어 있습니다.
        - token_to_id (Dict[str, int]):
            - 갱신된 track_token → 정수 ID 매핑 사전입니다.
        - agent_types (List[TrackedObjectType]):
            - 각 행(에이전트)에 해당하는 객체 타입 목록입니다.

"""
object_types: List[TrackedObjectType] = [
    TrackedObjectType.VEHICLE,
    TrackedObjectType.PEDESTRIAN,
    TrackedObjectType.BICYCLE,
]
# 선택한 타입(object_types)에 해당하는 에이전트만 모음
agents: List[TrackedObject] = tracked_objects.get_tracked_objects_of_types(
    object_types)
agent_types: List[TrackedObjectType] = []

frame_agents_num: int = len(agents)
# a_frame_agents_feat_8: (frame_agents_num, dim=8)
a_frame_agents_feat_8: np.ndarray = np.zeros(
    (frame_agents_num, AgentInternalIndex.dim()), dtype=np.float64)

max_id_number: int = len(token_to_id)

# 프레임 내 index 순회
for idx, agent in enumerate(agents):
    # 문자열 track_token → 정수 ID로 매핑
    if agent.track_token not in token_to_id:
        token_to_id[agent.track_token] = max_id_number
        max_id_number += 1
    int_id: int = token_to_id[agent.track_token]

    # 각 에이전트의 특성 값을 배열에 채움
    a_frame_agents_feat_8[idx,
                          AgentInternalIndex.track_token()] = float(int_id)
    a_frame_agents_feat_8[idx, AgentInternalIndex.vx()] = agent.velocity.x
    a_frame_agents_feat_8[idx, AgentInternalIndex.vy()] = agent.velocity.y
    a_frame_agents_feat_8[
        idx, AgentInternalIndex.heading()] = agent.center.heading
    a_frame_agents_feat_8[idx, AgentInternalIndex.width()] = agent.box.width
    a_frame_agents_feat_8[idx,
                          AgentInternalIndex.length()] = agent.box.length
    a_frame_agents_feat_8[idx, AgentInternalIndex.x()] = agent.center.x
    a_frame_agents_feat_8[idx, AgentInternalIndex.y()] = agent.center.y

    agent_types.append(agent.tracked_object_type)

return a_frame_agents_feat_8, token_to_id, agent_types

def sampled_tracked_objects_to_array_list(
tracked_objects_list: List[Union[TrackedObjects, DetectionsTracks]],
token_to_id: Optional[Dict[str, int]] = None,
) -> Tuple[List[np.ndarray], List[List[TrackedObjectType]], Dict[str, int]]:
"""여러 시점의 감지 결과를 프레임별 에이전트 배열 리스트로 바꾸는 함수.

이 함수는 연속된 여러 시점(프레임)의 감지 결과를 입력으로 받아,
각 시점마다 에이전트들을 숫자 배열로 바꾸어 리스트로 모아 줍니다.

간단히 말해,
- "시간에 따라 에이전트들이 어떻게 움직였는지"를
  프레임별 2차원 배열 목록으로 만드는 역할을 합니다.
시간 흐름:
    - 입력 리스트의 0번 인덱스: 가장 과거 시점
    - 마지막 인덱스: 가장 최근 시점

Args:
    tracked_objects_list (List[Union[TrackedObjects, DetectionsTracks]]):
        - 시간 순서대로 정렬된 감지 결과 리스트입니다.

Returns:
    Tuple[List[np.ndarray], List[List[TrackedObjectType]], Dict[str, int]]:
        - agents_seq_world_8_list (List[np.ndarray]):
            - 길이: num_frames
            - 각 원소 모양: (frame_agents_num, AgentInternalIndex.dim() = 8)
            - int id , vx, vy, heading, width, length, x, y
            - 각 시점의 에이전트 배열입니다. 행이 에이전트, 열이 특성입니다.
        - past_cur_agents_types_list (List[List[TrackedObjectType]]):
            - 길이: num_frames
            - 각 시점에서, 에이전트별 타입(차량/보행자/자전거 등)을 담은 리스트입니다.
        - token_to_id (Dict[str, int]):
            - 전체 시퀀스 동안 등장한 track_token을
              정수 ID로 매핑하는 사전입니다.

"""

# agents_seq_world_8_list:
#   - 길이: num_frames
#   - 각 원소: np.ndarray, shape (frame_agents_num, 8)
agents_seq_world_8_list: List[np.ndarray] = []

# past_cur_agents_types_list:
#   - 길이: num_frames
#   - 각 원소: List[TrackedObjectType], 길이 = frame_agents_num
past_cur_agents_types_list: List[List[TrackedObjectType]] = []

# track_token(문자열) → int ID
if token_to_id is None:
    token_to_id: Dict[str, int] = {}
for timestep_idx in range(len(tracked_objects_list)):
    # 현재 시점의 원시 감지 결과
    if type(tracked_objects_list[timestep_idx]) == DetectionsTracks:
        tracked_objects: TrackedObjects = tracked_objects_list[
            timestep_idx].tracked_objects
    else:
        tracked_objects = tracked_objects_list[
            timestep_idx]  # type: ignore[assignment]

    # a_frame_agents_feat_8: (frame_agents_num, 8)
    #   int id , vx, vy, heading, width, length, x, y
    # token_to_id: Dict[str, int]
    # agent_types: List[TrackedObjectType], 길이 = frame_agents_num
    a_frame_agents_feat_8, token_to_id, agent_types = _extract_agent_array(
        tracked_objects, token_to_id)

    agents_seq_world_8_list.append(a_frame_agents_feat_8)
    past_cur_agents_types_list.append(agent_types)
"""
agents_seq_world_8_list: List[np.ndarray]
    - 각 원소: (frame_agents_num, 8)  # frame_agents_num 은 프레임마다 다름
past_cur_agents_types_list: List[List[TrackedObjectType]]
    - 각 원소 길이: frame_agents_num
token_to_id: Dict[str, int]
    - track_token 문자열 → int ID 매핑
"""
return agents_seq_world_8_list, past_cur_agents_types_list, token_to_id

def _extract_ego_array(track_ego: EgoState) -> np.ndarray:

# (10)
frame_ego_feature = np.zeros((10,), dtype=np.float64)
# x, y, heading, vx, vy, width, length, (car, pedestrian, cyclist)

frame_ego_feature[0] = track_ego.center.x
frame_ego_feature[1] = track_ego.center.y
frame_ego_feature[2] = track_ego.center.heading
# EgoState의 속도는 자차량 좌표계 기준 벡터이므로, 세계 좌표계로 변환이 필요하다.
v_local = track_ego.dynamic_car_state.center_velocity_2d
he = float(track_ego.center.heading)
c, s = np.cos(he), np.sin(he)
vx_w = c * float(v_local.x) - s * float(v_local.y)
vy_w = s * float(v_local.x) + c * float(v_local.y)
frame_ego_feature[3] = vx_w
frame_ego_feature[4] = vy_w
frame_ego_feature[5] = track_ego.car_footprint.width
frame_ego_feature[6] = track_ego.car_footprint.length
frame_ego_feature[7:10] = [1, 0, 0]  # Mark as VEHICLE

return frame_ego_feature

def sampled_ego_objects_to_array_list(
ego_state_buffer: Deque[EgoState]) -> np.ndarray: # (num_frames, 10)

# x, y, heading, vx, vy, width, length, (car, pedestrian, cyclist)
all_frame_ego_feature = []

for past_idx in range(len(ego_state_buffer)):
    # 가장 과거 -> 가장 최근 순서
    track_ego: EgoState = ego_state_buffer[past_idx]
    frame_agents_feature = _extract_ego_array(track_ego)
    all_frame_ego_feature.append(frame_agents_feature)
all_frame_ego_feature = np.stack(all_frame_ego_feature)  # (num_frames, 10)

return all_frame_ego_feature

def sampled_static_objects_to_array_list(
present_tracked_objects: TrackedObjects):

static_object_types = [
    TrackedObjectType.CZONE_SIGN, TrackedObjectType.BARRIER,
    TrackedObjectType.TRAFFIC_CONE, TrackedObjectType.GENERIC_OBJECT
]

if type(present_tracked_objects) == DetectionsTracks:
    (present_tracked_objects
    ): TrackedObjects = present_tracked_objects.tracked_objects

static_obj: List[
    TrackedObject] = present_tracked_objects.get_tracked_objects_of_types(
        static_object_types)
static_types_list = []
present_static_feat_5 = np.zeros((len(static_obj), 5), dtype=np.float64)

for idx, agent in enumerate(static_obj):
    present_static_feat_5[idx, 0] = agent.center.x
    present_static_feat_5[idx, 1] = agent.center.y
    present_static_feat_5[idx, 2] = agent.center.heading
    present_static_feat_5[idx, 3] = agent.box.width
    present_static_feat_5[idx, 4] = agent.box.length
    static_types_list.append(agent.tracked_object_type)

return present_static_feat_5, static_types_list

=====================

2. Get agents array for model input

=====================

def _filter_agents_array(
all_frame_agents_world_8: List[
np.ndarray], # len = num_frames, 각 원소 shape: (frame_agents_num, 8)
reverse: bool = False,
) -> List[
np.ndarray]: # len = num_frames, 각 원소 shape: (frame_save_agents_num, 8)
"""프레임 전체에서 같은 에이전트들만 남기도록 걸러내는 함수.

한 시퀀스 안에 여러 시점(프레임)의 에이전트 목록이 있을 때,
첫 프레임(또는 `reverse=True`이면 마지막 프레임)에 등장한 에이전트만
나머지 모든 프레임에서도 유지하고, 그렇지 않은 에이전트는 제거합니다.

쉽게 말해서,
- “기준 시점에 존재하던 에이전트들만 끝까지 추적하겠다”
라는 필터링을 수행하는 함수입니다.

각 프레임의 배열은
`AgentInternalIndex` 순서( track_id, vx, vy, heading, width, length, x, y )
를 따릅니다.

**행 순서에 대한 설명**

- 기준 프레임(첫 프레임 또는 마지막 프레임)에서:
  · 기준 프레임 배열의 행을 위에서부터 순차적으로 보면서,
    해당 에이전트가 “기준 프레임에 존재하는지”를 검사하고,
    통과하는 행만 그대로 순서대로 쌓습니다.
  · 따라서 기준 프레임의 출력 배열 `(frame_save_agents_num, 8)` 에서
    행의 순서는 **입력 기준 프레임에서의 원래 순서를 그대로 유지**합니다.
- 다른 프레임들에서도:
  · 각 프레임의 입력 배열 `(frame_agents_num, 8)` 을 위에서부터 순회하면서,
    그 행의 track_id 가 기준 프레임 id 집합에 속하면 그 행을 그대로 추가합니다.
  · 이때도 “해당 프레임에서의 등장 순서”를 유지한 채 필터링만 할 뿐,
    별도의 정렬이나 재배열은 하지 않습니다.

즉, 출력 리스트 각 원소의 행 순서는
- **그 프레임의 입력 배열에서의 상대적 순서를 그대로 유지**하면서,
  기준 프레임에 없는 에이전트 행만 제거된 형태입니다.

Args:
    all_frame_agents_world_8 (List[np.ndarray]):
        - 길이: num_frames
        - 각 원소 shape: (frame_agents_num, 8)
        - 각 행은 한 에이전트에 해당하고, 열은 ID/속도/방향/크기/위치 정보를 담습니다.
    reverse (bool, optional):
        - False:
            · 첫 번째 프레임(가장 과거 시점)을 기준 프레임으로 사용합니다.
        - True:
            · 마지막 프레임(가장 최근 시점)을 기준 프레임으로 사용합니다.

Returns:
    List[np.ndarray]:
        - 길이: num_frames
        - 각 원소 shape: (frame_save_agents_num, 8)
        - 기준 프레임에 등장했던 에이전트만 남기고, 각 프레임마다 배열을 다시 구성한 결과입니다.
          에이전트 수(frame_save_agents_num)는 프레임마다 같을 수도, 다를 수도 있습니다.
        - 각 프레임의 행 순서는, 해당 프레임 입력 배열에서의 순서를 유지한 채
          필터링만 적용된 것입니다(추가적인 정렬 없음).
"""
# target_frame_agents_feature: (target_frame_agents_num, 8)
target_frame_agents_feature = all_frame_agents_world_8[
    -1] if reverse else all_frame_agents_world_8[0]
# target_frame_agents_id: (target_frame_agents_num,)
target_frame_agents_id = target_frame_agents_feature[:,
                                                     AgentInternalIndex.
                                                     track_token()]
for time_idx in range(len(all_frame_agents_world_8)):
    frame_exist_agents = []  # len: frame_save_agents_num # 길이 가변적
    # frame_agents_feature: (frame_agents_num, 8)
    frame_agents_feature: np.ndarray = all_frame_agents_world_8[
        time_idx]  # (_, 8)
    for agent_idx in range(frame_agents_feature.shape[0]):
        if target_frame_agents_feature.shape[0] > 0:
            agent_id = float(
                frame_agents_feature[agent_idx,
                                     int(AgentInternalIndex.track_token())])
            is_in_target_frame = bool(
                (agent_id == target_frame_agents_id).max())
            if is_in_target_frame:
                frame_exist_agents.append(
                    frame_agents_feature[agent_idx, :].squeeze())

    if len(frame_exist_agents) > 0:
        # (frame_save_agents_num, 8)
        all_frame_agents_world_8[time_idx] = np.stack(frame_exist_agents)
    else:
        # 기준 프레임에 존재하지 않는 에이전트만 있었던 경우 → 빈 배열 유지
        all_frame_agents_world_8[time_idx] = np.empty(
            (0, frame_agents_feature.shape[1]), dtype=np.float32)  # (0, 8)

return all_frame_agents_world_8

def _filter_agents_array_w_id(
cur_fut_agents_world_8_list: List[
np.ndarray], # len = num_frames_all, 각 원소: (frame_agents_num_t, 8)
neighbor_token_id: np.ndarray, # shape: (chosen_agent_num,)
) -> List[np.ndarray]:
"""각 프레임의 에이전트 배열에서, 원하는 track_id(neighbor_token_id)에
해당하는 에이전트만 남긴다.

하는 일
--------
- 입력으로 "현재 + 여러 미래 프레임"에 대한 에이전트 상태가 들어온다.
  · cur_fut_agents_world_8_list : List[np.ndarray]
    - 길이: num_frames_all
    shape: (frame_agents_num_t, 8)
    [track_id, vx, vy, heading, width, length, x, y]

- neighbor_token_id 에는 "우리가 계속 추적하고 싶은 에이전트 ID" 들이 들어 있다.
  예: [10, 25, 31] 같은 1차원 배열.

- 각 프레임에 대해:
    1) 해당 프레임의 track_id 열을 보고,
       neighbor_token_id 중 어떤 것들이 있는지 찾는다.
    2) 그 ID 를 가진 행만 남겨서 새 배열(frame_save_agents_num_t, 8)을 만든다.
    3) 한 프레임에 하나도 없다면 (0, 8) 빈 배열을 넣는다.

결과적으로,
- 원래 프레임 수는 그대로 유지하고
- 각 프레임마다 "선택된 에이전트들만 남은 배열" 리스트를 돌려준다.

Args:
    cur_fut_agents_world_8_list:
        - 길이: num_frames_all
        - 각 원소 shape: (frame_agents_num_t, 8)
          [track_id, vx, vy, heading, width, length, x, y] (월드 좌표).
    neighbor_token_id:
        - shape: (chosen_agent_num,)
        - 선택된 에이전트들의 track_id 배열.

Returns:
    List[np.ndarray]:
        - cur_fut_chosen_agents_world_8_list
        - 길이: num_frames_all
        - 각 원소 shape: (frame_save_agents_num_t, 8)
          · frame_save_agents_num_t 는 프레임마다 달라질 수 있다.
          · 선택된 에이전트가 하나도 없으면 (0, 8) 배열.
"""
cur_fut_chosen_agents_world_8_list: List[np.ndarray] = []

# 선택된 토큰이 하나도 없으면, 모든 프레임에 대해 (0, 8) 빈 배열 반환
if neighbor_token_id.size == 0:
    for frame_agents_world_8 in cur_fut_agents_world_8_list:
        cur_fut_chosen_agents_world_8_list.append(
            np.empty((0, frame_agents_world_8.shape[1]),
                     dtype=frame_agents_world_8.dtype))
    return cur_fut_chosen_agents_world_8_list

# 비교를 편하게 하기 위해 float32 로 맞춘다.
neighbor_token_id_f32 = neighbor_token_id.astype(np.float32, copy=False)

for frame_agents_world_8 in cur_fut_agents_world_8_list:
    # frame_agents_world_8: (frame_agents_num_t, 8) 또는 (0, 8)
    if frame_agents_world_8.size == 0:
        cur_fut_chosen_agents_world_8_list.append(
            np.empty((0, frame_agents_world_8.shape[1]),
                     dtype=frame_agents_world_8.dtype))
        continue

    # 현재 프레임의 track_id 열: (frame_agents_num_t,)
    frame_ids = frame_agents_world_8[:, AgentInternalIndex.track_token()]

    # 이 프레임에서 neighbor_token_id 에 속하는 행만 True
    mask = np.isin(
        frame_ids.astype(np.float32, copy=False),
        neighbor_token_id_f32,  # (chosen_agent_num,)
    )  # shape: (frame_agents_num_t,)

    if np.any(mask):
        # 선택된 행만 남긴 배열: (frame_save_agents_num_t, 8)
        cur_fut_chosen_agents_world_8_list.append(
            frame_agents_world_8[mask])
    else:
        cur_fut_chosen_agents_world_8_list.append(
            np.empty((0, frame_agents_world_8.shape[1]),
                     dtype=frame_agents_world_8.dtype))

return cur_fut_chosen_agents_world_8_list

def _pad_agent_states(
all_frame_cur_exists_agents: List[
np.ndarray], # len = num_frames, 각 원소 shape: (current_agents_num_t, 8)
reverse: bool,
) -> List[np.ndarray]: # len = num_frames, 각 원소 shape: (current_agents_num, 8)
"""프레임마다 빠지는 에이전트를 0으로 채우면서, 에이전트 순서를 기준 프레임에 맞춘다.

간단한 규칙:
    - 기준 프레임(현재 프레임 기준)에서 살아 있는 에이전트 집합을 고정한다.
    - 각 시점마다:
        · 해당 시점에 관측된 에이전트는 그 값으로 채우고
        · 관측되지 않은 에이전트는 [0, 0, ..., 0] 으로 둔다.
    - 모든 프레임의 행 개수/순서는 기준 프레임과 같다.

Args:
    all_frame_cur_exists_agents (List[np.ndarray]):
        - 길이: num_frames
        - 각 원소 shape: (current_agents_num_t, 8)
          [track_id, vx, vy, heading, width, length, x, y]
    reverse (bool):
        - True 이면 리스트를 뒤집어서 마지막 프레임을 기준으로 사용한다.

Returns:
    List[np.ndarray]:
        - 길이: num_frames
        - 각 원소 shape: (current_agents_num, 8)
          기준 프레임 에이전트 순서로 정렬되고, 없는 시점은 0으로 채워진 배열.
"""
track_id_idx = AgentInternalIndex.track_token()

# 시간 방향 뒤집기 (현재 프레임을 기준 프레임으로 쓰기 위해)
if reverse:
    all_frame_cur_exists_agents = all_frame_cur_exists_agents[::-1]

if len(all_frame_cur_exists_agents) == 0:
    return all_frame_cur_exists_agents

# 기준 프레임: (current_agents_num, 8)
key_frame_agents: np.ndarray = all_frame_cur_exists_agents[0]
current_agents_num: int = int(key_frame_agents.shape[0])
feature_dim: int = int(key_frame_agents.shape[1])

if current_agents_num == 0:
    # 에이전트가 아예 없으면 그대로 반환
    return all_frame_cur_exists_agents

# 기준 프레임에서 track_id → 행 인덱스 매핑
key_frame_id_to_row: Dict[int, int] = {}
for cur_agent_idx, agent_id in enumerate(key_frame_agents[:, track_id_idx]):
    key_frame_id_to_row[int(agent_id)] = cur_agent_idx

# 기준 프레임의 id 벡터 (shape: (current_agents_num,))
key_frame_ids: np.ndarray = key_frame_agents[:, track_id_idx].copy()

new_all_frame_cur_exists_agents: List[np.ndarray] = []

for time_idx in range(len(all_frame_cur_exists_agents)):
    # frame_cur_exists_agents: (current_agents_num_t, 8)
    frame_cur_exists_agents: np.ndarray = all_frame_cur_exists_agents[
        time_idx]

    # frame_state: (current_agents_num, 8), 처음엔 전부 0
    frame_state: np.ndarray = np.zeros(
        (current_agents_num, feature_dim),
        dtype=np.float64,
    )

    # track_id 칸은 기준 프레임 id 로 채워 둔다.
    frame_state[:, track_id_idx] = key_frame_ids

    # 현재 프레임에서 실제로 관측된 에이전트만 해당 행 위치에 복사
    for agent_row_idx in range(frame_cur_exists_agents.shape[0]):
        agent_id = int(frame_cur_exists_agents[agent_row_idx, track_id_idx])
        if agent_id not in key_frame_id_to_row:
            # 기준 프레임에 없는 id 는 이웃 후보가 아니므로 무시
            continue
        key_frame_row: int = key_frame_id_to_row[agent_id]
        frame_state[key_frame_row, :] = frame_cur_exists_agents[
            agent_row_idx, :]

    new_all_frame_cur_exists_agents.append(frame_state)

# 시간 방향을 다시 원래대로 복원
if reverse:
    new_all_frame_cur_exists_agents = new_all_frame_cur_exists_agents[::-1]

return new_all_frame_cur_exists_agents

def _pad_agent_states_with_zeros_w_id(
cur_fut_chosen_agents_local_8_list:
List[
np.
ndarray], # 길이: num_frames_all, 각 원소 shape: (frame_save_agents_num_t, 8)
neighbor_token_id: np.ndarray, # shape: (chosen_agent_num,)
neighbor_types_one_hot: Optional[
np.ndarray] = None, # shape: (chosen_agent_num, 3) 또는 None
) -> np.ndarray:
"""선택된 이웃 토큰에 대해, 시간 전체 구간의 궤적을 고정 크기 텐서로 채운다.

각 시점에서 관측된 에이전트의 상태는 해당 슬롯에 그대로 넣고,
관측되지 않은 시점은 0으로 둔다. 타입 one-hot은 시간축 전체에 복사한다.

Args:
    cur_fut_chosen_agents_local_8_list (List[np.ndarray]):
        - 길이: num_frames_all
        - 각 원소 shape: (frame_save_agents_num_t, 8)
          [track_id, vx, vy, heading, width, length, x, y] (ego 기준)
    neighbor_token_id (np.ndarray):
        - shape: (chosen_agent_num,)
        - 이웃 에이전트의 track_id 배열.
    neighbor_types_one_hot (Optional[np.ndarray]):
        - shape: (chosen_agent_num, 3)
        - [onehot_vehicle, onehot_ped, onehot_bike]
        - None 이면 타입 one-hot은 모두 0으로 둔다.

Returns:
     np.ndarray:
        - cur_fut_chosen_agents_full_11:
            · shape: (chosen_agent_num, num_frames_all, 11)
            · [x, y, cos(yaw), sin(yaw), v_x, v_y, width, length, one_hot(3)]
"""
track_id_idx = AgentInternalIndex.track_token()
vx_idx = AgentInternalIndex.vx()
vy_idx = AgentInternalIndex.vy()
heading_idx = AgentInternalIndex.heading()
width_idx = AgentInternalIndex.width()
length_idx = AgentInternalIndex.length()
x_idx = AgentInternalIndex.x()
y_idx = AgentInternalIndex.y()

# neighbor_token_id: (chosen_agent_num,)
chosen_agent_num: int = int(neighbor_token_id.shape[0])
# cur_fut_chosen_agents_local_8_list 길이: num_frames_all
cur_future_all_len: int = len(cur_fut_chosen_agents_local_8_list)

# 출력 버퍼 (모두 0으로 초기화)

# cur_fut_chosen_agents_full_11: (chosen_agent_num, num_frames_all, 11)
cur_fut_chosen_agents_full_11: np.ndarray = np.zeros(
    (chosen_agent_num, cur_future_all_len, 11),
    dtype=np.float32,
)

if chosen_agent_num == 0:
    return cur_fut_chosen_agents_full_11

# 타입 one-hot: (chosen_agent_num, 3) → (chosen_agent_num, 1, 3) → 시간축 브로드캐스트
if neighbor_types_one_hot is not None:
    # neighbor_types_one_hot: (chosen_agent_num, 3)
    neighbor_types_one_hot = neighbor_types_one_hot.astype(np.float32,
                                                           copy=False)
    # cur_fut_chosen_agents_full_11[:, :, 8:11]: (chosen_agent_num, num_frames_all, 3)
    cur_fut_chosen_agents_full_11[:, :,
                                  8:] = neighbor_types_one_hot[:, None, :]

# 각 시점 프레임 처리
for timestep_idx, frame_chosen_agents_local_8 in enumerate(
        cur_fut_chosen_agents_local_8_list):
    # frame_chosen_agents_local_8: (frame_chosen_agents_num_t, 8) 또는 (0, 8)
    if frame_chosen_agents_local_8.shape[0] == 0:
        continue

    # 현재 프레임의 track_id 들: (frame_chosen_agents_num_t,)
    frame_ids: np.ndarray = frame_chosen_agents_local_8[:,
                                                        track_id_idx].astype(
                                                            np.float32,
                                                            copy=False)

    # frame_ids 와 neighbor_token_id 의 공통 원소 및 위치
    # inter: 공통 track_id 값들 (길이 M)
    # idx_frame: frame_ids 에서의 인덱스 (shape: (M,))
    # idx_neighbors: neighbor_token_id 에서의 인덱스 (shape: (M,))
    inter, idx_frame, idx_neighbors = np.intersect1d(
        frame_ids,
        neighbor_token_id.astype(np.float32, copy=False),
        assume_unique=False,
        return_indices=True,
    )
    if inter.size == 0:
        continue

    # 선택된 행만 모으기
    # frame_selected: (M, 8)
    frame_selected: np.ndarray = frame_chosen_agents_local_8[idx_frame]

    # 위치/각도: (M,)
    x_local: np.ndarray = frame_selected[:, x_idx].astype(np.float32,
                                                          copy=False)
    y_local: np.ndarray = frame_selected[:, y_idx].astype(np.float32,
                                                          copy=False)
    heading_local: np.ndarray = frame_selected[:, heading_idx].astype(
        np.float32, copy=False)

    # 속도/크기: (M,)
    vx_local: np.ndarray = frame_selected[:, vx_idx].astype(np.float32,
                                                            copy=False)
    vy_local: np.ndarray = frame_selected[:, vy_idx].astype(np.float32,
                                                            copy=False)
    width_local: np.ndarray = frame_selected[:,
                                             width_idx].astype(np.float32,
                                                               copy=False)
    length_local: np.ndarray = frame_selected[:,
                                              length_idx].astype(np.float32,
                                                                 copy=False)

    # ---- (2) full 11차원 채우기: [x, y, cos, sin, vx, vy, w, h, one_hot(3)] ----
    # cur_fut_chosen_agents_full_11[idx_neighbors, timestep_idx, :8]: (M, 8)
    cur_fut_chosen_agents_full_11[idx_neighbors, timestep_idx, 0] = x_local
    cur_fut_chosen_agents_full_11[idx_neighbors, timestep_idx, 1] = y_local
    cur_fut_chosen_agents_full_11[idx_neighbors, timestep_idx,
                                  2] = np.cos(heading_local)
    cur_fut_chosen_agents_full_11[idx_neighbors, timestep_idx,
                                  3] = np.sin(heading_local)
    cur_fut_chosen_agents_full_11[idx_neighbors, timestep_idx, 4] = vx_local
    cur_fut_chosen_agents_full_11[idx_neighbors, timestep_idx, 5] = vy_local
    cur_fut_chosen_agents_full_11[idx_neighbors, timestep_idx,
                                  6] = width_local
    cur_fut_chosen_agents_full_11[idx_neighbors, timestep_idx,
                                  7] = length_local
    # 8:11 (one_hot)은 위에서 시간축 전체에 이미 채워져 있음

return cur_fut_chosen_agents_full_11

def build_ego_past_feature(
past_cur_ego_world_10: np.ndarray, # (num_frames, 10)
ego_cur_pose_np: np.ndarray, # (3,)
) -> np.ndarray: # (num_frames, 11)
"""이고 차량의 과거+현재 궤적을 이고 기준 상대 좌표계로 변환한다.

- 입력은 월드 좌표계 기준 이고 궤적(여러 시점의 상태 값)이다.
- 기준이 되는 현재 이고 상태(`ego_cur_pose_np`)를 중심으로
  모든 시점의 좌표/속도 등을 상대 좌표계로 바꾼다.
- 모델에서 바로 쓸 수 있도록 float32 형으로 정리한다.

Args:
    past_cur_ego_world_10 (np.ndarray):
        - x, y, heading, vx, vy, width, length, (car, pedestrian, cyclist)
        - shape: (num_frames, 10)
        - 월드 좌표계 이고 궤적.
    ego_cur_pose_np (np.ndarray):
        - shape: (3,)
        - [x_ego, y_ego, yaw_ego], 기준 이고 상태.

Returns:
    [np.ndarray]:
        - shape: (num_frames, 11)
        - 이고 기준 상대 좌표계 궤적.
"""
# past_cur_ego_world_10: (num_frames, 10)
# ego_agent_past: (num_frames, 11)
ego_agent_past = convert_absolute_quantities_to_relative(
    past_cur_ego_world_10, ego_cur_pose_np)
assert ego_agent_past.shape[1] == 11
ego_agent_past = ego_agent_past.astype(np.float32)
return ego_agent_past

def _convert_all_frames_agents_to_ego_local(
all_frame_cur_exists_agents: List[
np.ndarray], # len = num_frames, 각 원소: (current_agents_num_t, 8)
ego_cur_pose_np: np.ndarray, # (3,)
) -> List[np.ndarray]:
"""각 프레임의 에이전트 상태를 먼저 ego 기준으로 바꾸고,
이후에 프레임별 크기를 맞춰준다.

처리 단계:
    1) 각 시점의 (world) 에이전트 상태를 개별적으로 ego 기준으로 변환.
       - 입력: (current_agents_num_t, 8)
       - 출력: (current_agents_num_t, 8)  (좌표계만 변경)
    2) `_pad_agent_states` 를 이용해
       - 기준 프레임 에이전트 집합/순서에 맞춰
       - 없는 시점은 0으로 채운다.

Args:
    all_frame_cur_exists_agents (List[np.ndarray]):
        - 길이: num_frames
        - 각 원소 shape: (current_agents_num_t, 8)
          [track_id, vx, vy, heading, width, length, x, y]
          (월드 좌표계 기준)
    ego_cur_pose_np (np.ndarray):
        - shape: (3,)
        - [x_ego, y_ego, yaw_ego]

Returns:
    List[np.ndarray]:
        - 길이: num_frames
        - 각 원소 shape: (current_agents_num, 8)
          ego 기준 좌표계이며, 기준 프레임 에이전트 순서를 따른다.
          관측이 없는 시점은 0으로 채워진다.
"""
# 에이전트가 없으면 그대로 반환
if len(all_frame_cur_exists_agents) == 0:
    return all_frame_cur_exists_agents

all_frame_cur_exists_agents_local_world: List[np.ndarray] = []

for frame_agents_world_8 in all_frame_cur_exists_agents:
    # frame_agents_world_8: (current_agents_num_t, 8)
    if frame_agents_world_8.shape[0] == 0:
        # 관측이 아예 없는 프레임은 0 배열 유지
        all_frame_cur_exists_agents_local_world.append(
            np.zeros_like(frame_agents_world_8, dtype=np.float64))
        continue

    # 복사본을 만들어 좌표계를 변환 (shape: (current_agents_num_t, 8))
    frame_agents_world_8_copy = frame_agents_world_8.astype(np.float64,
                                                            copy=True)
    frame_agents_local_8 = convert_absolute_quantities_to_relative(
        frame_agents_world_8_copy,
        ego_cur_pose_np,
        'agent',
    )
    all_frame_cur_exists_agents_local_world.append(frame_agents_local_8)

# 기준 프레임에 맞춰 에이전트 개수/순서를 고정하고, 없는 시점은 0 유지
all_frame_cur_exists_agents_local: List[np.ndarray] = _pad_agent_states(
    all_frame_cur_exists_agents=all_frame_cur_exists_agents_local_world,
    reverse=True,
)

return all_frame_cur_exists_agents_local

def _pack_ego_local_agents(
all_frame_cur_exists_agents_local: List[
np.ndarray], # len = num_frames, 각: (current_agents_num, 8)
agents_states_dim: int,
) -> np.ndarray:
"""ego 기준 에이전트 상태 리스트를 3D 텐서로 모아 쌓는다.

각 프레임별 에이전트 상태(ego 기준)를 받아서,
- [x, y, cos(heading), sin(heading), vx, vy, width, length, id]
  형식의 9차원 특성으로 재구성한다.

Args:
    all_frame_cur_exists_agents_local (List[np.ndarray]):
        - 길이: num_frames
        - 각 원소 shape: (current_agents_num, 8)
            - [track_id, vx, vy, heading, width, length, x, y]
    agents_states_dim (int):
        - x, y, cos h, sin h, vx, vy, length, width 의 차원 수(=8).

Returns:
    np.ndarray:
        - shape: (num_frames, current_agents_num, agents_states_dim + 1)
        - 마지막 채널(+1)은 track_id 를 저장한다.
            - [x, y, cos(heading), sin(heading), vx, vy, width, length, id]
"""
# all_frame_np_agents_local: (num_frames, current_agents_num, agents_states_dim + 1)
#   마지막 채널(+1)은 track_id 저장용
num_frames: int = len(all_frame_cur_exists_agents_local)
if num_frames == 0:
    return np.zeros((0, 0, agents_states_dim + 1), dtype=np.float64)

current_agents_num: int = int(all_frame_cur_exists_agents_local[0].shape[0])
# all_frame_np_agents_local: (T, N, 9)
all_frame_np_agents_local: np.ndarray = np.zeros(
    (num_frames, current_agents_num, agents_states_dim + 1),
    dtype=np.float64,
)

track_id_idx = AgentInternalIndex.track_token()
vx_idx = AgentInternalIndex.vx()
vy_idx = AgentInternalIndex.vy()
heading_idx = AgentInternalIndex.heading()
width_idx = AgentInternalIndex.width()
length_idx = AgentInternalIndex.length()
x_idx = AgentInternalIndex.x()
y_idx = AgentInternalIndex.y()

for t in range(num_frames):
    # frame_cur_exists_agents_local: (N, 8)
    frame_cur_exists_agents_local: np.ndarray = all_frame_cur_exists_agents_local[
        t]

    # empty_mask: (N,)
    # - track_id 를 제외한 나머지가 전부 0이면 "관측 없는 패딩 row"로 본다.
    frame_non_id = frame_cur_exists_agents_local.astype(np.float64,
                                                        copy=False).copy()
    frame_non_id[:, track_id_idx] = 0.0
    empty_mask: np.ndarray = (np.abs(frame_non_id).sum(axis=-1) == 0.0)

    # heading: (N,)
    heading = frame_cur_exists_agents_local[:, heading_idx].astype(
        np.float64, copy=False).reshape(-1)
    cos_heading = np.cos(heading)
    sin_heading = np.sin(heading)

    # (N,) 동적 값들
    x_local = frame_cur_exists_agents_local[:, x_idx].astype(
        np.float64, copy=False).reshape(-1)
    y_local = frame_cur_exists_agents_local[:, y_idx].astype(
        np.float64, copy=False).reshape(-1)
    vx_local = frame_cur_exists_agents_local[:, vx_idx].astype(
        np.float64, copy=False).reshape(-1)
    vy_local = frame_cur_exists_agents_local[:, vy_idx].astype(
        np.float64, copy=False).reshape(-1)
    width_local = frame_cur_exists_agents_local[:, width_idx].astype(
        np.float64, copy=False).reshape(-1)
    length_local = frame_cur_exists_agents_local[:, length_idx].astype(
        np.float64, copy=False).reshape(-1)

    # ✅ 패딩 row는 8개 채널 전부 0으로 고정
    cos_heading[empty_mask] = 0.0
    sin_heading[empty_mask] = 0.0
    x_local[empty_mask] = 0.0
    y_local[empty_mask] = 0.0
    vx_local[empty_mask] = 0.0
    vy_local[empty_mask] = 0.0
    width_local[empty_mask] = 0.0
    length_local[empty_mask] = 0.0

    # pack: (T, N, 9)
    all_frame_np_agents_local[t, :, 0] = x_local
    all_frame_np_agents_local[t, :, 1] = y_local
    all_frame_np_agents_local[t, :, 2] = cos_heading
    all_frame_np_agents_local[t, :, 3] = sin_heading
    all_frame_np_agents_local[t, :, 4] = vx_local
    all_frame_np_agents_local[t, :, 5] = vy_local
    all_frame_np_agents_local[t, :, 6] = width_local
    all_frame_np_agents_local[t, :, 7] = length_local
    all_frame_np_agents_local[
        t, :, 8] = frame_cur_exists_agents_local[:, track_id_idx].astype(
            np.float64, copy=False).reshape(-1)

return all_frame_np_agents_local

def build_agents_past_ego_frame_array(
past_cur_agents_world_8_list: List[
np.ndarray], # len = num_frames, 각 원소: (frame_agents_num, 8)
ego_cur_pose_np: np.ndarray, # (3,)
agents_states_dim: int,
) -> np.ndarray:
"""과거+현재 에이전트 상태를 이고 기준 상대 좌표계 3차원 텐서로 만드는 함수.

처리 단계:
    1) `_filter_agents_array` 로 현재 프레임에 존재하는 에이전트만 남긴다.
    2) `_pad_agent_states` 로 프레임마다 사라지는 에이전트를 이전 상태로 채운다.
    3) 각 프레임을 이고 기준 상대 좌표계로 변환한다.
    4) [x, y, cos(yaw), sin(yaw), vx, vy, width, length, id] 를 모아
       (num_frames, current_agents_num, 9) 텐서를 만든다.

Args:
    past_cur_agents_world_8_list (List[np.ndarray]):
        - 길이: num_frames
        - 각 원소 shape: (frame_agents_num, 8)
        - [track_id, vx, vy, heading, width, length, x, y]
    ego_cur_pose_np (np.ndarray):
        - shape: (3,), [x_ego, y_ego, yaw_ego]
    agents_states_dim (int):
        - x, y, cos h, sin h, vx, vy, length, width 의 차원 수(=8).

Returns:
    np.ndarray
        - all_frame_np_agents_local (np.ndarray):
            · shape: (num_frames, current_agents_num, agents_states_dim + 1 = 9)
            -  [x, y, cos(heading), sin(heading), vx, vy, width, length, id]
"""
# 1) 현재 프레임 기준 에이전트 필터링 및 타입 수집
# all_frame_cur_exists_agents: len = num_frames,
#   각 원소 shape: (frame_save_agents_num, 8)
all_frame_cur_exists_agents: List[np.ndarray] = _filter_agents_array(
    past_cur_agents_world_8_list, reverse=True)

if all_frame_cur_exists_agents[-1].shape[0] == 0:
    # Return zero array when there are no agents in the scene
    # all_frame_np_agents_local: (num_frames, 0, agents_states_dim)
    all_frame_np_agents_local = np.zeros(
        (len(all_frame_cur_exists_agents), 0, agents_states_dim))
else:
    # 2) (world→ego) 프레임별 에이전트 상태 변환
    """ all_frame_cur_exists_agents_local
    List[np.ndarray]:
        - 길이: num_frames
        - 각 원소 shape: (current_agents_num, 8) # current_agents_num 고정
            = [track_id, vx, vy, heading, width, length, x, y]
    """
    all_frame_cur_exists_agents_local = _convert_all_frames_agents_to_ego_local(
        all_frame_cur_exists_agents=all_frame_cur_exists_agents,
        ego_cur_pose_np=ego_cur_pose_np,
    )

    # 3) (프레임 × 에이전트 × 특성) 3D 텐서로 패킹
    # (num_frames, current_agents_num, agents_states_dim + 1 = 9)
    #  [x, y, cos(heading), sin(heading), vx, vy, width, length, id]
    all_frame_np_agents_local = _pack_ego_local_agents(
        all_frame_cur_exists_agents_local=all_frame_cur_exists_agents_local,
        agents_states_dim=agents_states_dim,
    )

return all_frame_np_agents_local

def _build_present_static_feature_6(
present_static_feat_5: np.ndarray, # (cur_static_num, 5)
ego_cur_pose_np: np.ndarray, # (3,)
) -> np.ndarray:
"""현재 프레임의 정적 객체 정보를 -> 이고 기준 6차원 표현으로 확장한다.

- 입력: [x, y, heading, width, length] (월드 좌표계)
- 출력: [x, y, cos(heading), sin(heading), width, length] (ego-relative)

Args:
    present_static_feat_5 (np.ndarray):
        - shape: (cur_static_num, 5)
    ego_cur_pose_np (np.ndarray):
        - shape: (3,), [x_ego, y_ego, yaw_ego]

Returns:
    np.ndarray:
        - shape: (cur_static_num, 6)
        - [x, y, cos(heading), sin(heading), width, length]
"""
# present_static_feature_6: (cur_static_num, 6)
present_static_feature_6 = np.zeros((present_static_feat_5.shape[0], 6))
if present_static_feat_5.shape[0] != 0:
    # present_static_feature_local: (cur_static_num, 5)
    present_static_feature_local = convert_absolute_quantities_to_relative(
        present_static_feat_5, ego_cur_pose_np, 'static')

    present_static_feature_6[:, 0] = present_static_feature_local[:, 0]
    present_static_feature_6[:, 1] = present_static_feature_local[:, 1]
    present_static_feature_6[:, 2] = np.cos(present_static_feature_local[:,
                                                                         2])
    present_static_feature_6[:, 3] = np.sin(present_static_feature_local[:,
                                                                         2])
    present_static_feature_6[:, 4] = present_static_feature_local[:, 3]
    present_static_feature_6[:, 5] = present_static_feature_local[:, 4]

return present_static_feature_6

from typing import Optional, List, Tuple # 이미 있으면 중복 import는 제거해도 OK

def _filter_out_neighbors_not_present_at_current(
neighbor_agents_past: np.ndarray, # shape: (K, T, 11)
agents_cur_frame_indices: np.ndarray, # shape: (K,)
neighbors_id: np.ndarray, # shape: (K,)
*,
eps: float = 1e-8,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""현재 프레임에 '실제로 존재하는' 이웃 에이전트만 남긴다.

여기서 "현재 프레임"은 neighbor_agents_past의 마지막 시간 인덱스(T-1)입니다.

어떤 이웃이 현재에 존재한다고 볼지(판정 기준)
-------------------------------------------
- neighbor_agents_past[k, -1, :8] = [x, y, cos, sin, vx, vy, width, length]
  이 8개 값이 전부 0(또는 0에 매우 가까움)이면,
  그 이웃은 "현재 프레임에 존재하지 않는다"고 판단합니다.
- (주의) 타입 one-hot(마지막 3차원)은 시간 전체에 복사되는 구조라서,
  타입 값이 0이 아니더라도 동적 상태가 전부 0이면 "없는 에이전트"로 봐야 합니다.
  그래서 판정은 앞 8차원만 사용합니다.

이 함수가 필요한 이유
--------------------
아주 드문 엣지 케이스(업스트림에서 섞인 0-padding 등)로 인해
현재 프레임이 전부 0인 이웃이 선택 결과에 들어오면,
이후 단계에서 그 이웃이 "전 시간 0"으로 남아버릴 수 있습니다.
당신의 의도는 그런 이웃은 결과 텐서에 아예 포함되지 않는 것이므로,
여기서 agent 축에서 제거합니다.

Args:
    neighbor_agents_past (np.ndarray):
        shape: (K, T, 11)
        - K: 선택된 이웃 수
        - T: 과거+현재 프레임 수
    agents_cur_frame_indices (np.ndarray):
        shape: (K,)
        - 현재 프레임 기준으로 선택된 이웃의 원본 인덱스들
    neighbors_id (np.ndarray):
        shape: (K,)
        - 선택된 이웃들의 track_id
    eps (float):
        0과 "거의 0"을 구분하기 위한 작은 값

Returns:
    Tuple[np.ndarray, np.ndarray, np.ndarray]:
        - filtered_neighbor_agents_past: shape (K_valid, T, 11)
        - filtered_agents_cur_frame_indices: shape (K_valid,)
        - filtered_neighbors_id: shape (K_valid,)
"""
if neighbor_agents_past.ndim != 3 or neighbor_agents_past.shape[-1] != 11:
    raise ValueError(
        f"`neighbor_agents_past`는 (K, T, 11) shape 이어야 합니다. got {neighbor_agents_past.shape}"
    )
if agents_cur_frame_indices.ndim != 1 or neighbors_id.ndim != 1:
    raise ValueError(
        "`agents_cur_frame_indices`와 `neighbors_id`는 1차원 배열이어야 합니다.")
if neighbor_agents_past.shape[0] != agents_cur_frame_indices.shape[0] or \
   neighbor_agents_past.shape[0] != neighbors_id.shape[0]:
    raise ValueError(
        "세 입력의 agent 축 길이가 서로 달라서 정렬/필터링이 불가능합니다. "
        f"K={neighbor_agents_past.shape[0]}, "
        f"indices={agents_cur_frame_indices.shape[0]}, ids={neighbors_id.shape[0]}"
    )

K: int = int(neighbor_agents_past.shape[0])
if K == 0:
    return neighbor_agents_past, agents_cur_frame_indices, neighbors_id

# current_state_8: (K, 8)
current_state_8: np.ndarray = neighbor_agents_past[:, -1, :8]

# current_present_mask: (K,)
# - 현재 프레임에서 8개 값 중 하나라도 0이 아니면 "존재"
current_present_mask: np.ndarray = (np.abs(current_state_8)
                                    > eps).any(axis=1)

return (
    neighbor_agents_past[current_present_mask],
    agents_cur_frame_indices[current_present_mask],
    neighbors_id[current_present_mask],
)

def _compute_valid_sorted_indices(
all_frame_np_agents_local: np.
ndarray, # shape: (num_frames, current_agents_num, 9)
filter_radius: Optional[float],
) -> Tuple[np.ndarray, np.ndarray]:
"""filter_radius 를 적용한 뒤, ego와의 거리 기준으로 에이전트 인덱스를 정렬한다.

추가로, "현재 프레임에 존재하지 않는 에이전트(현재 상태가 전부 0)"는
후보에서 아예 제외한다.

Args:
    all_frame_np_agents_local:
        - shape: (num_frames, current_agents_num, 9)
        - 마지막 채널(8)은 track_id
        - 앞 8채널은 [x, y, cos, sin, vx, vy, width, length]
    filter_radius:
        - None 이 아니면, ego 기준 거리 <= filter_radius 인 에이전트만 후보.

Returns:
    sorted_cur_agent_indices:
        - shape: (M,)  # M = 유효 후보 수
        - ego 로부터 가까운 순서대로 정렬된 현재 프레임 인덱스.
    dist_from_cur_agent_to_ego:
        - shape: (current_agents_num,)
        - 각 에이전트의 ego 기준 2D 거리.
"""
if all_frame_np_agents_local.ndim != 3 or all_frame_np_agents_local.shape[
        -1] < 9:
    raise ValueError(
        f"`all_frame_np_agents_local`는 (T, N, 9) shape 이어야 합니다. got {all_frame_np_agents_local.shape}"
    )

# 현재 프레임(마지막 프레임 기준)에서 ego까지의 거리: (current_agents_num,)
dist_from_cur_agent_to_ego: np.ndarray = np.linalg.norm(
    all_frame_np_agents_local[-1, :, :2], axis=-1)

current_agents_num: int = int(all_frame_np_agents_local.shape[1])

# (핵심) 현재 프레임 존재 여부 판단: (current_agents_num,)
# - [x, y, cos, sin, vx, vy, width, length] 이 전부 0이면 "현재에 없음"
eps: float = 1e-8
current_state_8: np.ndarray = all_frame_np_agents_local[-1, :, :8]  # (N, 8)
present_mask: np.ndarray = (np.abs(current_state_8)
                            > eps).any(axis=1)  # (N,)

# filter_radius 내의 에이전트만 후보로 사용
if filter_radius is not None:
    within_radius = dist_from_cur_agent_to_ego <= float(
        filter_radius)  # (N,)
    valid_mask = present_mask & within_radius
else:
    valid_mask = present_mask

valid_indices: np.ndarray = np.nonzero(valid_mask)[0].astype(int)  # (M,)

if valid_indices.size == 0:
    # 유효한 에이전트가 하나도 없는 경우
    return np.zeros((0,), dtype=int), dist_from_cur_agent_to_ego

# 유효한 에이전트들만 뽑아서 거리 기준 정렬
dist_valid: np.ndarray = dist_from_cur_agent_to_ego[valid_indices]  # (M,)
order_local: np.ndarray = np.argsort(dist_valid)  # (M,)
sorted_cur_agent_indices: np.ndarray = valid_indices[order_local]  # (M,)

return sorted_cur_agent_indices, dist_from_cur_agent_to_ego

def _select_indices_with_type_cap(
sorted_cur_agent_indices: np.ndarray, # (M,)
current_agent_types_list: List[TrackedObjectType],
max_agent_num: int,
max_pedestrians: int,
max_bicycles: int,
dist_from_cur_agent_to_ego: np.ndarray, # (current_agents_num,)
) -> np.ndarray:
"""보행자/자전거 상한을 적용하여 에이전트를 선택한다.

절차:
    1) sorted_cur_agent_indices 를 타입별로 세 그룹으로 나눈다.
       - 보행자 / 자전거 / 차량
    2) 보행자 → 자전거 → 차량 순으로 슬롯을 채우되,
       각각 max_pedestrians / max_bicycles / 나머지 로 상한을 둔다.
    3) 최종 선택 집합을 ego-거리 기준으로 다시 정렬하고,
       max_agent_num 개까지 사용한다.

Returns:
    agents_cur_frame_indices:
        - shape: (K,), K ≤ max_agent_num
"""
if max_agent_num <= 0:
    return np.zeros((0,), dtype=int)

# 타입별로 거리 오름차순 리스트 분리
ped_sorted_indices = [
    idx for idx in sorted_cur_agent_indices
    if current_agent_types_list[idx] == TrackedObjectType.PEDESTRIAN
]
bike_sorted_indices = [
    idx for idx in sorted_cur_agent_indices
    if current_agent_types_list[idx] == TrackedObjectType.BICYCLE
]
vehicle_sorted_indices = [
    idx for idx in sorted_cur_agent_indices
    if current_agent_types_list[idx] == TrackedObjectType.VEHICLE
]

# 보행자/자전거 상한 적용 (max_agent_num을 넘지 않도록)
ped_cap = min(max_pedestrians, max_agent_num)
sel_peds = ped_sorted_indices[:ped_cap]

remain = max_agent_num - len(sel_peds)
bike_cap = min(max_bicycles, remain)
sel_bikes = bike_sorted_indices[:bike_cap]

remain -= len(sel_bikes)
sel_vehs = vehicle_sorted_indices[:max(0, remain)]

# 1차 선택 결과_select_neighbor_agents_and_build_past
selected_indices = sel_peds + sel_bikes + sel_vehs

if not selected_indices:
    return np.zeros((0,), dtype=int)

# 최종 후보를 ego-거리 기준으로 다시 정렬 후 max_agent_num 까지만 사용
agents_cur_frame_indices = np.array(
    sorted(
        selected_indices,
        key=lambda idx: dist_from_cur_agent_to_ego[idx],
    )[:max_agent_num],
    dtype=int,
)
return agents_cur_frame_indices

def _build_neighbor_vectors(
all_frame_np_agents_local: np.
ndarray, # (num_frames, current_agents_num, 9)
current_agent_types_list:
List[
TrackedObjectType], # List[TrackedObjectType], # 길이 = current_agents_num
agents_states_dim: int,
agents_cur_frame_indices: np.ndarray, # (chosen_agent_num,)
) -> Tuple[np.ndarray, np.ndarray]:
"""선택된 에이전트 인덱스들에 대해, 한 번에 이웃 궤적 벡터와 track_id 벡터를 만든다.

입력 텐서 구조
--------------
- all_frame_np_agents_local: (T, N, 9)
    · T: num_frames (과거 + 현재 시점 수)
    · N: 현재 프레임에서 살아남은 에이전트 수
    · 채널: x, y, cos(heading), sin(heading), vx, vy, width, length, id

- current_agent_types_list: 길이 N 의 리스트.
    · 각 인덱스 i 에 대해 TrackedObjectType.VEHICLE / PEDESTRIAN / BICYCLE 중 하나.

- agents_cur_frame_indices: (K,)
    · 현재 프레임(마지막 시점 T-1)의 에이전트 배열에서
      “이웃으로 선택된 행 인덱스” 들.
    · 이 순서가 곧 출력 텐서의 에이전트 축 순서가 된다.

알고리즘 개요
-------------
1) 동적 상태 8차원 복사 (완전 벡터화)
    - 후보 텐서에서 선택된 인덱스만 골라 (T, K, 8) 로 뽑은 뒤,
      축을 바꿔 (K, T, 8) 로 만든다.
    - 이걸 neighbor_agents_past[:, :, :8] 에 그대로 넣는다.

2) track_id 추출 (벡터화)
    - 현재 프레임 T-1 에서 선택된 인덱스의 id 채널(8)을
      한 번에 뽑아서 neighbors_id (K,) 로 만든다.

3) 타입 one-hot 생성 (부분 벡터화 + 브로드캐스트)
    - current_agent_types_list 를 numpy 배열로 만든 후,
      agents_cur_frame_indices 로 인덱싱해 길이 K 의 neighbors_types 배열을 만든다.
    - neighbors_types == VEHICLE / PEDESTRIAN / BICYCLE 로
      boolean mask 세 개를 만든 다음,
      type_one_hot[mask, col] = 1.0 방식으로 한 번에 채운다.
    - 마지막으로 (K, 3) → (K, 1, 3) 로 늘리고
      시간축(T) 방향으로 브로드캐스트 해서 neighbor_agents_past[:, :, 8:] 에 넣는다.


Args:
    all_frame_np_agents_local:
        이웃 후보 에이전트들의 ego 기준 과거+현재 시퀀스.
        - shape: (num_frames, current_agents_num, 9)
    current_agent_types_list:
        - 길이: current_agents_num
        - 각 에이전트 인덱스에 대응하는 TrackedObjectType 값.
    agents_states_dim:
        - 동적 상태 차원 수 (=8).
    agents_cur_frame_indices:
        - shape: (chosen_agent_num,)
        - 현재 프레임에서 선택된 에이전트 행 인덱스들.

Returns:
    neighbor_agents_past:
        - shape: (chosen_agent_num, num_frames, 11)
        - [x, y, cos, sin, vx, vy, width, length,
           track_id, onehot_vehicle, onehot_ped, onehot_bike]
    neighbors_id:
        - shape: (chosen_agent_num,)
        - 각 이웃 에이전트의 track_id (현재 프레임 기준).
"""
num_frames: int = all_frame_np_agents_local.shape[0]
agents_cur_frame_indices = np.asarray(agents_cur_frame_indices, dtype=int)
chosen_agent_num: int = int(agents_cur_frame_indices.shape[0])

if chosen_agent_num == 0:
    neighbor_agents_past = np.zeros((0, num_frames, agents_states_dim + 3),
                                    dtype=np.float32)
    neighbors_id = np.zeros((0,), dtype=np.float32)
    return neighbor_agents_past, neighbors_id

eight_ = agents_states_dim

# 1) 동적 상태 8차원 배치 추출 및 축 재배치
# (T, N, 9) → (T, chosen_agent_num, 8) → (chosen_agent_num, T, 8)
dynamic_states: np.ndarray = all_frame_np_agents_local[:,
                                                       agents_cur_frame_indices, :
                                                       eight_].transpose(
                                                           1, 0, 2)

# 출력 텐서 초기화
neighbor_agents_past = np.zeros(
    (chosen_agent_num, num_frames, agents_states_dim + 3),
    dtype=np.float32,
)
neighbor_agents_past[:, :, :eight_] = dynamic_states

# 2) track_id 벡터 배치 추출 (현재 프레임 = 마지막 프레임 기준)
# shape: (chosen_agent_num,)
neighbors_id = all_frame_np_agents_local[-1, agents_cur_frame_indices,
                                         eight_].astype(np.float32)

# 3) 타입 one-hot (chosen_agent_num, 3) 생성 → (chosen_agent_num, T, 3) 브로드캐스트
type_one_hot = np.zeros((chosen_agent_num, 3), dtype=np.float32)

# current_agent_types_list 를 배열로 바꾸고, 선택된 인덱스만 추출
# neighbors_types: shape (chosen_agent_num,), dtype=object (TrackedObjectType 인스턴스들)
neighbors_types = np.asarray(current_agent_types_list,
                             dtype=object)[agents_cur_frame_indices]

veh_mask = neighbors_types == TrackedObjectType.VEHICLE
ped_mask = neighbors_types == TrackedObjectType.PEDESTRIAN
bike_mask = neighbors_types == TrackedObjectType.BICYCLE

# 각 타입별 column에 1 할당 (행 단위로 일괄 처리)
type_one_hot[veh_mask, 0] = 1.0
type_one_hot[ped_mask, 1] = 1.0
type_one_hot[bike_mask, 2] = 1.0

# (chosen_agent_num, 3) → (chosen_agent_num, 1, 3) 로 늘려서 시간축(T)에 브로드캐스트
# → (chosen_agent_num, T, 3)
neighbor_agents_past[:, :, eight_:] = type_one_hot[:, None, :]

return neighbor_agents_past, neighbors_id

def _select_neighbor_agents_and_build_past(
all_frame_np_agents_local: np.
ndarray, # (num_frames, current_agents_num, 9)
current_agent_types_list: List[
TrackedObjectType], # 길이 = current_agents_num
agents_states_dim: int,
max_agent_num: int,
max_pedestrians: Optional[int],
max_bicycles: Optional[int],
filter_radius: Optional[float] = None,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""이고 기준 에이전트 텐서에서, 가까운 에이전트들을 선택하여
이웃 궤적 텐서를 구성한다.

특징:
    - `max_agent_num` 은 상한(최대 개수)만 의미한다. 실제 선택 수 chosen_agent_num ≤ max_agent_num.
    - `max_pedestrians` 또는 `max_bicycles` 가 None 이면
      타입 상한을 모두 끄고, 거리 기준으로만 가까운 순서대로 선택한다.
    - `filter_radius` 가 주어지면, ego 로부터 그 거리 안에 있는 에이전트만
      후보로 사용한다(그 밖은 완전히 무시).

Returns:
    neighbor_agents_past: (chosen_agent_num, num_frames, 11)
    agents_cur_frame_indices: shape chosen_agent_num (현재 프레임 기준 인덱스)
    neighbors_id:         (chosen_agent_num,)
"""
num_frames: int = all_frame_np_agents_local.shape[0]

# 0) radius 필터 + 거리 기준 정렬된 인덱스 계산
"""
    sorted_cur_agent_indices: shape: (M,)  # M = 유효 에이전트 수
        - ego 로부터 가까운 순서대로 정렬된 현재 프레임 인덱스.
    dist_from_cur_agent_to_ego: shape: (current_agents_num,)
        - 각 에이전트의 ego 기준 2D 거리.
"""
sorted_cur_agent_indices, dist_from_cur_agent_to_ego = \
    _compute_valid_sorted_indices(
        all_frame_np_agents_local=all_frame_np_agents_local, # (num_frames, current_agents_num, 9)
        filter_radius=filter_radius,
    )

# 유효한 후보가 없거나, max_agent_num 이 0 이하인 경우
if sorted_cur_agent_indices.size == 0 or max_agent_num <= 0:
    neighbor_agents_past = np.zeros(
        (0, num_frames, agents_states_dim + 3),
        dtype=np.float32,
    )
    agents_cur_frame_indices = np.zeros((0,), dtype=int)
    neighbors_id = np.zeros((0,), dtype=np.float32)
    return neighbor_agents_past, agents_cur_frame_indices, neighbors_id

# 1) 타입 상한을 끄는 경우: 거리 기준으로만 선택
if (max_pedestrians is None) or (max_bicycles is None):
    # agents_cur_frame_indices: shape: (chosen_agent_num,), chosen_agent_num ≤ max_agent_num
    agents_cur_frame_indices = sorted_cur_agent_indices[:max_agent_num]
else:
    # 2) 타입 상한을 적용하는 경우
    # agents_cur_frame_indices: shape: (chosen_agent_num,), chosen_agent_num ≤ max_agent_num
    agents_cur_frame_indices = _select_indices_with_type_cap(
        sorted_cur_agent_indices=sorted_cur_agent_indices,
        current_agent_types_list=
        current_agent_types_list,  # List[TrackedObjectType],  # 길이 = current_agents_num
        max_agent_num=max_agent_num,
        max_pedestrians=max_pedestrians,
        max_bicycles=max_bicycles,
        dist_from_cur_agent_to_ego=dist_from_cur_agent_to_ego,
    )

# 실제 선택된 에이전트 수 chosen_agent_num
chosen_agent_num = int(agents_cur_frame_indices.shape[0])
if chosen_agent_num == 0:
    neighbor_agents_past = np.zeros((0, num_frames, agents_states_dim + 3),
                                    dtype=np.float32)
    neighbors_id = np.zeros((0,), dtype=np.float32)
    return neighbor_agents_past, np.array([], dtype=int), neighbors_id

# 3) 선택된 인덱스로부터 최종 텐서 구성
"""
    neighbor_agents_past: shape: (chosen_agent_num, num_frames, 11)
        - [x, y, cos, sin, vx, vy, width, length,
           track_id, onehot_vehicle, onehot_ped, onehot_bike]
    neighbors_id: shape: (chosen_agent_num,)
        - 각 이웃 에이전트의 track_id (현재 프레임 기준).
"""
neighbor_agents_past, neighbors_id = _build_neighbor_vectors(
    all_frame_np_agents_local=
    all_frame_np_agents_local,  # (num_frames, current_agents_num, 9)
    current_agent_types_list=
    current_agent_types_list,  # List[TrackedObjectType],  # 길이 = current_agents_num
    agents_states_dim=agents_states_dim,
    agents_cur_frame_indices=agents_cur_frame_indices,  # (chosen_agent_num,)
)

# numpy → List[int]
agents_cur_frame_indices_list: List[int] = list(
    map(int,
        np.asarray(agents_cur_frame_indices).tolist()))
agents_cur_frame_indices = np.asarray(agents_cur_frame_indices_list,
                                      dtype=int)  # (K,)
return neighbor_agents_past, agents_cur_frame_indices, neighbors_id

from typing import Optional # 이미 있을 가능성 높음

def _build_static_objects(
present_static_feature_6: np.ndarray, # (cur_static_num, 6)
static_types_list: List[TrackedObjectType], # 길이 = cur_static_num
max_static_num: int,
filter_radius: Optional[float] = None,
) -> np.ndarray:
"""정적 객체 정보를 거리 기준으로 정렬하고, 타입 one-hot 을 붙여 배열로 만든다.

- 이고와의 거리를 기준으로 가까운 순서로 최대 `max_static_num` 개 선택.
- `filter_radius` 가 주어지면, ego 기준 거리 <= filter_radius 인 것만 후보.
- 실제 개수가 max_static_num 이하이면 zero-padding 없이 그 개수만 반환.

Returns:
    static_objects: (K, 10), K = min(유효 정적 객체 수, max_static_num)
"""
cur_static_num = int(present_static_feature_6.shape[0])

if cur_static_num == 0 or max_static_num <= 0:
    return np.zeros((0, present_static_feature_6.shape[-1] + 4),
                    dtype=np.float32)

# ego 기준 거리
static_distance_to_ego = np.linalg.norm(
    present_static_feature_6[:, :2],
    axis=-1,
)  # (cur_static_num,)

# filter_radius 적용
if filter_radius is not None:
    valid_mask = static_distance_to_ego <= float(filter_radius)
    valid_indices = np.nonzero(valid_mask)[0]
else:
    valid_indices = np.arange(cur_static_num, dtype=int)

if valid_indices.size == 0:
    return np.zeros((0, present_static_feature_6.shape[-1] + 4),
                    dtype=np.float32)

# 유효 객체들 안에서 거리 기준 오름차순
dist_valid = static_distance_to_ego[valid_indices]
order_local = np.argsort(dist_valid)
sorted_indices = valid_indices[order_local]

# 실제 사용할 개수 K
K = min(len(sorted_indices), max_static_num)

static_objects = np.zeros(
    (K, present_static_feature_6.shape[-1] + 4),
    dtype=np.float32,
)
six_ = present_static_feature_6.shape[-1]

for i, j in enumerate(sorted_indices[:K]):
    static_objects[i, :six_] = present_static_feature_6[j, :six_]
    if static_types_list[j] == TrackedObjectType.CZONE_SIGN:
        static_objects[i, six_:] = [1, 0, 0, 0]
    elif static_types_list[j] == TrackedObjectType.BARRIER:
        static_objects[i, six_:] = [0, 1, 0, 0]
    elif static_types_list[j] == TrackedObjectType.TRAFFIC_CONE:
        static_objects[i, six_:] = [0, 0, 1, 0]
    else:
        static_objects[i, six_:] = [0, 0, 0, 1]

return static_objects

def build_static_feature(
present_static_feat_5: np.ndarray, # (cur_static_num, 5)
static_types_list: List[TrackedObjectType], # 길이 = cur_static_num
max_static_num: int,
ego_cur_pose_np: np.ndarray, # (3,)
filter_radius: Optional[float] = None,
) -> np.ndarray:
"""현재 프레임의 정적 객체들을 ego 기준 좌표계로 변환하고,
가까운 순으로 최대 max_static_num 개까지만 선택한다.

처리 순서
----------
1) `present_static_feat_5` (월드 좌표계)를
   `_build_present_static_feature_6` 를 이용해 ego 기준으로 변환:
   - 입력: [x, y, heading, width, length]
   - 출력: [x, y, cos(heading), sin(heading), width, length]
   → shape: (cur_static_num, 6)

2) `_build_static_objects` 를 호출해,
   - ego와의 2D 거리,
   - `filter_radius`,
   - `max_static_num`
   을 기준으로 정적 객체를 선택하고 타입 one-hot 을 붙인다.
   - 최종 출력: (K, 10)
     · [x, y, cos, sin, width, length,
        onehot_CZONE, onehot_BARRIER, onehot_CONE, onehot_GENERIC]
     · K = min(유효 정적 객체 수, max_static_num)

Args:
    present_static_feat_5:
        - shape: (cur_static_num, 5)
        - [x, y, heading, width, length] (월드 좌표계)
    static_types_list:
        - 길이: cur_static_num
        - 각 정적 객체의 타입 (TrackedObjectType).
    max_static_num:
        - 선택할 정적 객체 수의 상한값.
        - 실제 K는 K ≤ max_static_num.
    ego_cur_pose_np:
        - shape: (3,), [x_ego, y_ego, yaw_ego]
    filter_radius:
        - None 이 아니면 ego 기준 거리 <= filter_radius 인 정적 객체만 후보.
        - None 이면 거리 제한 없음.

Returns:
    np.ndarray:
        - static_objects
        - shape: (K, 10)
        - [x, y, cos, sin, width, length,
           onehot_CZONE, onehot_BARRIER, onehot_CONE, onehot_GENERIC]
"""
# (cur_static_num, 6)
present_static_feature_6 = _build_present_static_feature_6(
    present_static_feat_5=present_static_feat_5,
    ego_cur_pose_np=ego_cur_pose_np,
)

static_objects = _build_static_objects(
    present_static_feature_6=present_static_feature_6,
    static_types_list=static_types_list,
    max_static_num=max_static_num,
    filter_radius=filter_radius,
)
return static_objects

def build_neighbor_past_feature(
past_cur_agents_world_8_list: List[np.ndarray],
past_cur_agents_types_list: List[List[TrackedObjectType]],
max_agent_num: int,
ego_cur_pose_np: np.ndarray, # (3,)
max_pedestrians: Optional[int],
max_bicycles: Optional[int],
token_to_id: Dict[str, int],
filter_radius: Optional[float] = None,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, List[str]]:
"""이웃 에이전트의 과거+현재 궤적을 ego 기준으로 변환하고,
가까운 에이전트들만 골라 (chosen_agent_num, T, 11) 텐서로 만든다.

추가 보장(중요)
-------------
- 최종 출력에는 "현재 프레임이 전부 0인 이웃"이 절대 포함되지 않도록,
  선택 이후 한 번 더 걸러낸다.
  (아주 드문 데이터 이상/0-padding 혼입 케이스를 확실히 막기 위함)

Args:
    past_cur_agents_world_8_list:
        - 길이: num_frames
        - 각 원소 shape: (frame_agents_num, 8)
        - [track_id, vx, vy, heading, width, length, x, y] (월드 좌표계)
    past_cur_agents_types_list:
        - 길이: num_frames
        - 현재 프레임(마지막 원소)의 타입 정보를 사용.
    max_agent_num:
        - 선택할 이웃 에이전트 수의 상한값.
    ego_cur_pose_np:
        - shape: (3,), [x_ego, y_ego, yaw_ego]
    max_pedestrians / max_bicycles:
        - None 이면 타입 상한 미사용, 정수면 타입 상한 적용.
    token_to_id:
        - track_token → int ID 매핑
    filter_radius:
        - None 이 아니면 ego 기준 거리 <= filter_radius 인 에이전트만 후보.

Returns:
    Tuple[np.ndarray, np.ndarray, np.ndarray, List[str]]:
        - neighbor_agents_past:
            · shape: (chosen_agent_num, num_frames, 11)
            · [x, y, cos, sin, vx, vy, width, length, onehot_vehicle, onehot_ped, onehot_bike]
        - agents_cur_frame_indices:
            · shape: (chosen_agent_num,)
        - neighbors_id:
            · shape: (chosen_agent_num,)
        - neighbor_track_token:
            · 길이: chosen_agent_num
"""
agents_states_dim = 8  # x, y, cos h, sin h, vx, vy, width, length

# 현재 프레임의 타입 정보 (길이 = current_agents_num)
current_agent_types_list = past_cur_agents_types_list[-1]

# (num_frames, current_agents_num, 9)
all_frame_np_agents_local = build_agents_past_ego_frame_array(
    past_cur_agents_world_8_list=past_cur_agents_world_8_list,
    ego_cur_pose_np=ego_cur_pose_np,
    agents_states_dim=agents_states_dim,
)

# 이웃 선택 + (chosen_agent_num, T, 11) 텐서 구성
neighbor_agents_past, agents_cur_frame_indices, neighbors_id = \
    _select_neighbor_agents_and_build_past(
        all_frame_np_agents_local=all_frame_np_agents_local,
        current_agent_types_list=current_agent_types_list,
        agents_states_dim=agents_states_dim,
        max_agent_num=max_agent_num,
        max_pedestrians=max_pedestrians,
        max_bicycles=max_bicycles,
        filter_radius=filter_radius,
    )

# ✅ (핵심) "현재 프레임이 전부 0"인 이웃이 섞여 들어오면 여기서 제거
neighbor_agents_past, agents_cur_frame_indices, neighbors_id = \
    _filter_out_neighbors_not_present_at_current(
        neighbor_agents_past=neighbor_agents_past,              # (K, T, 11)
        agents_cur_frame_indices=agents_cur_frame_indices,      # (K,)
        neighbors_id=neighbors_id,                              # (K,)
    )

# neighbors_id -> track_token 변환
id_to_token: Dict[int, str] = {int(v): k for k, v in token_to_id.items()}

neighbor_track_token: List[str] = []
for track_id in neighbors_id:
    track_id_int: int = int(track_id)
    if track_id_int == -1:
        raise ValueError("Neighbor agent has invalid track_id -1.")
    if track_id_int not in id_to_token:
        raise KeyError(
            f"neighbors_id={track_id_int} 가 token_to_id에 존재하지 않습니다. "
            "ID 정밀도(특히 float 변환) 또는 token_to_id 갱신 흐름을 점검해 주세요.")
    neighbor_track_token.append(id_to_token[track_id_int])

return neighbor_agents_past, agents_cur_frame_indices, neighbors_id, neighbor_track_token

def agent_future_all_process(
ego_cur_pose_np: np.ndarray, # shape: (3,) = [x_ego, y_ego, yaw_ego]
cur_fut_agents_world_8_list: List[
np.ndarray], # 길이: 1+Tf_all, 각 원소 shape: (frame_agents_num_t, 8)
neighbor_token_id: np.ndarray, # shape: (chosen_agent_num,)
neighbor_agents_past: Optional[
np.ndarray] = None, # (chosen_agent_num, Tp, 11) 또는 None
) -> np.ndarray:
"""선택된 이웃 에이전트에 대해, 현재+미래 전체 구간의 궤적을 3차원/11차원 둘 다 만든다.

흐름:
    1) track_id 로 원하는 에이전트만 골라서 프레임별 리스트로 만든다.
    2) 각 프레임을 ego 기준 좌표계로 변환한다.
    3) 시간-에이전트 고정 크기 텐서로 채우되, 관측 없는 시점은 0으로 둔다.
    4) 타입 one-hot 은 neighbor_agents_past 에서 가져와 시간축 전체에 복사한다.

Args:
    ego_cur_pose_np (np.ndarray):
        - shape: (3,)
        - [x_ego, y_ego, yaw_ego]
    cur_fut_agents_world_8_list (List[np.ndarray]):
        - 길이: num_frames_all = 1 + Tf_all
        - 각 원소 shape: (frame_agents_num_t, 8)
          [track_id, vx, vy, heading, width, length, x, y] (월드 좌표계)
    neighbor_token_id (np.ndarray):
        - shape: (chosen_agent_num,)
        - 선택된 이웃 에이전트의 track_id 배열.
    neighbor_agents_past (Optional[np.ndarray]):
        - shape: (chosen_agent_num, Tp, 11)
          과거 이웃 궤적 텐서. 마지막 3차원에 타입 one-hot 이 들어있다.
          None 이면 미래 쪽 타입 one-hot 은 0으로 둔다.

Returns:
    np.ndarray
        - cur_fut_chosen_agents_full_11:
            · shape: (chosen_agent_num, num_frames_all, 11)
            · [x, y, cos(yaw), sin(yaw), v_x, v_y, width, length, one_hot(3)]
"""
# 1) track_id 기준으로 원하는 에이전트만 남기기
# cur_fut_chosen_agents_world_8_list:
#   길이: num_frames_all, 각 원소 shape: (frame_save_agents_num_t, 8)
cur_fut_chosen_agents_world_8_list: List[
    np.ndarray] = _filter_agents_array_w_id(
        cur_fut_agents_world_8_list,
        neighbor_token_id,
    )

# 2) 각 프레임을 ego 기준 좌표계로 변환
cur_fut_chosen_agents_local_8_list: List[np.ndarray] = []
for frame_chosen_agents_world_8 in cur_fut_chosen_agents_world_8_list:
    # frame_chosen_agents_world_8: (frame_save_agents_num_t, 8)
    # → [track_id, vx, vy, heading, width, length, x, y] (ego 기준) 으로 변환
    frame_local_8: np.ndarray = convert_absolute_quantities_to_relative(
        frame_chosen_agents_world_8,
        ego_cur_pose_np,
        'agent',
    )
    cur_fut_chosen_agents_local_8_list.append(frame_local_8)

# 3) 타입 one-hot 준비 (neighbor_agents_past 가 있을 때만)
neighbor_types_one_hot: Optional[np.ndarray] = None
if neighbor_agents_past is not None and neighbor_agents_past.size > 0:
    # neighbor_agents_past: (chosen_agent_num, Tp, 11)
    # 타입 one-hot 은 마지막 차원 8:11
    # neighbor_types_one_hot: (chosen_agent_num, 3)
    neighbor_types_one_hot = neighbor_agents_past[:, -1,
                                                  8:11].astype(np.float32,
                                                               copy=False)

# 4) 시간축 패딩 + xyh / full_11 텐서 만들기
"""
cur_fut_chosen_agents_full_11: (chosen_agent_num, num_frames_all, 11)
"""
cur_fut_chosen_agents_full_11 = _pad_agent_states_with_zeros_w_id(
    cur_fut_chosen_agents_local_8_list=cur_fut_chosen_agents_local_8_list,
    neighbor_token_id=neighbor_token_id,
    neighbor_types_one_hot=neighbor_types_one_hot,
)

return cur_fut_chosen_agents_full_11

=================
<diffusion_planner/data_process/utils.py>
"""
Module: Coordination Transformation Functions and Numpy-Tensor Transformation
Description: This module contains functions for transforming the coordination to ego-centric coordination and Numpy-Tensor transformation.

Categories:
1. Ego, agent, static coordination transformation
2. Map coordination transformation
3. Numpy-Tensor transformation
"""
from nuplan.common.maps.nuplan_map.utils import get_roadblock_ids_from_trajectory
from nuplan.common.actor_state.tracked_objects import TrackedObjects
from nuplan.planning.scenario_builder.nuplan_db.nuplan_scenario import NuPlanScenario
from types import SimpleNamespace
from nuplan.database.nuplan_db.nuplan_scenario_queries import \
get_end_sensor_time_from_db
from nuplan.database.nuplan_db.nuplan_db_utils import get_lidarpc_sensor_data
import torch
from nuplan.common.actor_state.tracked_objects import TrackedObjects, TrackedObject
from nuplan.planning.training.preprocessing.utils.agents_preprocessing import EgoInternalIndex, AgentInternalIndex
from nuplan.common.maps.abstract_map_objects import RoadBlockGraphEdgeMapObject
from shapely.geometry import Point
from nuplan.planning.scenario_builder.abstract_scenario import AbstractScenario
from nuplan.common.actor_state.tracked_objects_types import TrackedObjectType
from nuplan.common.actor_state.state_representation import StateSE2
from nuplan.common.actor_state.ego_state import EgoState
from nuplan.planning.simulation.observation.observation_type import DetectionsTracks
from diffusion_planner.data_process.roadblock_utils import route_roadblock_correction
from typing import List, Optional, Union, Sequence
import numpy as np
from nuplan.common.actor_state.tracked_objects import TrackedObject
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
import math
import shapely.geometry as geom
from shapely import affinity
from nuplan.common.maps.abstract_map import AbstractMap, MapObject
from nuplan.common.maps.abstract_map import SemanticMapLayer
from nuplan.common.actor_state.state_representation import Point2D
from typing import Set
import math
from nuplan.common.actor_state.ego_state import EgoState
from nuplan.planning.simulation.observation.observation_type import \
DetectionsTracks
from nuplan.common.actor_state.tracked_objects_types import \
TrackedObjectType

utils.py (적절한 위치에 추가)

from typing import List, Optional, Sequence
import warnings
from types import SimpleNamespace
from nuplan.planning.scenario_builder.nuplan_db.nuplan_scenario import NuPlanScenario
from nuplan.common.actor_state.state_representation import StateSE2
from nuplan.common.actor_state.tracked_objects_types import TrackedObjectType
from nuplan.common.maps.nuplan_map.utils import get_roadblock_ids_from_trajectory

from typing import Dict
import numpy as np
import numpy.typing as npt

def ego_local_traj3_to_global(
local_traj_xyh: npt.NDArray[
np.floating], # shape: (T, 3) = [x_e, y_e, yaw_e]
cur_ego_global_xyyaw: npt.NDArray[
np.floating], # shape: (3,) = [x_g, y_g, yaw_g]
*,
invalid_eps: float = 0.0,
) -> npt.NDArray[np.float64]:
"""ego 좌표계 (x, y, heading) 시퀀스를 세계 절대 좌표계로 변환하되,
(0., 0., 0.)인 무효 행은 제거하고 유효 행만 반환합니다.

Args:
    local_traj_xyh (np.ndarray): shape (T, 3). 각 행은 [x_e, y_e, yaw_e].
    cur_ego_global_xyyaw (np.ndarray): shape (3,). [x_g, y_g, yaw_g].
    invalid_eps (float, optional): 무효 판정 허용 오차.
        - 0.0: 정확히 (0., 0., 0.)만 무효
        - >0.0: |x_e|, |y_e|, |yaw_e| 모두 eps 이하이면 무효

Returns:
    np.ndarray: shape (T_valid, 3). 각 행은 [x_g, y_g, yaw_g].
                유효 행이 하나도 없으면 (0, 3) 배열을 반환.
"""
if local_traj_xyh.ndim != 2 or local_traj_xyh.shape[1] != 3:
    raise ValueError(
        f"`local_traj_xyh` shape must be (T,3), got {local_traj_xyh.shape}")
if cur_ego_global_xyyaw.shape != (3,):
    raise ValueError(
        f"`cur_ego_global_xyyaw` shape must be (3,), got {cur_ego_global_xyyaw.shape}"
    )

# ── 1) 무효 행 필터링: (x, y, yaw) 모두 0(또는 eps 이내)이면 제거 ─────────────────
if invalid_eps <= 0.0:
    invalid_mask = (local_traj_xyh[:, 0] == 0.0) & (
        local_traj_xyh[:, 1] == 0.0) & (local_traj_xyh[:, 2] == 0.0)
else:
    invalid_mask = (
        np.isclose(local_traj_xyh[:, 0], 0.0, atol=invalid_eps) &
        np.isclose(local_traj_xyh[:, 1], 0.0, atol=invalid_eps) &
        np.isclose(local_traj_xyh[:, 2], 0.0, atol=invalid_eps))
valid_mask = ~invalid_mask
if not np.any(valid_mask):
    return np.empty((0, 3), dtype=np.float64)

local_valid = local_traj_xyh[valid_mask]  # (T_valid, 3)

# ── 2) ego→global 변환 ────────────────────────────────────────────────────────
x_e = local_valid[:, 0]
y_e = local_valid[:, 1]
yaw_e = local_valid[:, 2]

x_g0, y_g0, yaw_g0 = map(float,
                         cur_ego_global_xyyaw.tolist())  # 글로벌 기준(ego 현재 포즈)
c, s = np.cos(yaw_g0), np.sin(yaw_g0)
x_g = x_e * c - y_e * s + x_g0
y_g = x_e * s + y_e * c + y_g0
yaw_g = yaw_e + yaw_g0
# 필요하면 yaw_g = (yaw_g + np.pi) % (2 * np.pi) - np.pi  # [-pi, pi] 정규화

return np.stack([x_g, y_g, yaw_g], axis=1).astype(np.float64)

def get_npc_route_roadblock_ids(
scenario: NuPlanScenario,
past_cur_tracked_objects: List[TrackedObjects],
neighbor_track_token: Optional[List[str]], # 길이 = chosen_agent_num
horizon=20.,
) -> Dict[str, List[str]]:
"""
get_future_tracked_objects 를 이용해 한 번에 궤적을 수집하고,
get_roadblock_ids_from_trajectory 로 연결성 기반 ID 시퀀스를 추출합니다.
"""

# iteration=0 시점부터 시나리오 끝까지 future 트랙 객체를 한줄로 가져옴
# 전체 horizon은 시나리오 총 길이(초)로 지정
# horizon = max(30.0,  _scenario_total_horizon_s(scenario))
num_samples = int(horizon / 0.1)
# 1) 에이전트별 StateSE2 리스트 수집
if neighbor_track_token is None:
    allow_all_token = True
    neighbor_track_token_set = {}
else:
    allow_all_token = False
    neighbor_track_token_set = set([str(t) for t in neighbor_track_token])
    if not neighbor_track_token_set:
        return {}

future_observations: List[TrackedObjects] = []
for dets in scenario.get_future_tracked_objects(0, horizon, num_samples):
    future_observations.append(dets.tracked_objects)
past_future_observations: List[TrackedObjects] = []
past_future_observations.extend(past_cur_tracked_objects)
past_future_observations.extend(future_observations)

token_to_state_list: Dict[str, List[SimpleNamespace]] = defaultdict(list)
first_pose: Dict[str, SimpleNamespace] = {}
for tracked_objects in past_future_observations:
    # tracked_objects: TrackedObjects
    for obj in tracked_objects:
        # obj: TrackedObject
        if obj.tracked_object_type != TrackedObjectType.VEHICLE:
            continue
        token = str(obj.track_token)
        if (not allow_all_token) and (token
                                      not in neighbor_track_token_set):
            continue
        # heading은 실제로 사용하지 않지만, 넣어도 무방(여기서는 0.0 또는 obj.center.heading 가능)
        rear_axle_state = StateSE2(obj.center.x, obj.center.y,
                                   obj.center.heading)
        pseudo_ego = SimpleNamespace(rear_axle=rear_axle_state)
        if first_pose.get(token, None) is None:
            first_pose[token] = pseudo_ego

        token_to_state_list[token].append(pseudo_ego)

# 2) 연결성 기반 roadblock ID 추출
car_token_to_rb_ids_list: Dict[str, List[str]] = {}
for token, states_list in token_to_state_list.items():
    if not states_list:
        raise RuntimeError(
            f"Internal error: token_to_state_list[{token}] is empty.")
    # 덕 타이핑: states_list[*].rear_axle.point 만 참조됨
    rb_ids_list: List[str] = get_roadblock_ids_from_trajectory(
        scenario.map_api, states_list)
    if len(rb_ids_list) == 0:
        car_token_to_rb_ids_list[token] = []
        continue
    corrected_ids = route_roadblock_correction(
        first_pose[token],
        scenario.map_api,
        rb_ids_list,
    )

    car_token_to_rb_ids_list[token] = corrected_ids
return car_token_to_rb_ids_list

def _prefer_rr_on_conflict(
rb_ids: Sequence[str],
rbc_ids: Sequence[str],
route_rr_ids: Optional[Sequence[str]] = None,
verbose: bool = False,
) -> List[str]:
"""RB vs RBC 동시 검출 시 우선순위 규칙으로 선택한다.

규칙:
  1) route_rr_ids 와의 교집합이 존재하면 그쪽을 우선
     - RBC ∩ route  → 우선 반환
     - RB  ∩ route  → 차선책
  2) 그렇지 않다면 RBC 우선 (교차로/연결부 가정)
  3) 그래도 비어 있으면 RB
  4) 그래도 없으면 []

Args:
    rb_ids: RoadBlock id 리스트
    rbc_ids: RoadBlock-Connector id 리스트
    route_rr_ids: 시나리오의 글로벌 경로 roadblock ids
    verbose: 경고 메시지 출력 여부

Returns:
    우선순위 규칙으로 정한 id 리스트(비어 있을 수 있음)
"""
rb_ids = list(rb_ids) if rb_ids else []
rbc_ids = list(rbc_ids) if rbc_ids else []

if route_rr_ids:
    route_set = set(route_rr_ids)
    rbc_on_route = [rid for rid in rbc_ids if rid in route_set]
    if rbc_on_route:
        if verbose:
            warnings.warn(f"[RR-Resolve] RBC∩Route 선택: {rbc_on_route}")
        return rbc_on_route

    rb_on_route = [rid for rid in rb_ids if rid in route_set]
    if rb_on_route:
        if verbose:
            warnings.warn(f"[RR-Resolve] RB∩Route 선택: {rb_on_route}")
        return rb_on_route

if rbc_ids:
    if verbose:
        warnings.warn(f"[RR-Resolve] Route 교집합 없음 → RBC 우선: {rbc_ids}")
    return rbc_ids

if rb_ids:
    if verbose:
        warnings.warn(f"[RR-Resolve] RBC 없음 → RB 사용: {rb_ids}")
    return rb_ids

if verbose:
    warnings.warn(f"[RR-Resolve] 비어 있음 → []")
return []

def _map_object_to_geometry(obj: MapObject) -> Optional[geom.base.BaseGeometry]:
"""맵 객체(MapObject)에서 도형(점/선/면) 정보를 꺼내는 작은 도우미 함수.

이 함수는 나중에
`get_directional_proximal_map_objects` 같은 곳에서
“이 물체가 내가 만든 영역과 겹치는지”를 확인하기 위해,
맵 객체를 Shapely에서 이해할 수 있는 도형으로 바꿔주는 역할을 한다.

동작 규칙
--------
1) 먼저 `obj.polygon` 이 있는지 확인한다.
   - 예: 도로 묶음(roadblock), 교차로, 횡단보도 등은 보통 폴리곤(면)으로 제공된다.
   - 있으면 그대로 돌려준다.
     · 반환 도형 예시: Polygon

2) 폴리곤이 없다면, 차선처럼 “중심선 polyline” 형태를 갖고 있는지 본다.
   - `obj.baseline_path.discrete_path` 가 있는 경우:
     · 이 안에는 (x, y, heading) 형태의 점들이 순서대로 들어있다고 보면 된다.
       - 길이: N
       - 좌표 배열로 바꾸면 개념적으로 (N, 2) 모양
     · 이 점들로 Shapely LineString 을 만들어 반환한다.
       - N >= 2 인 경우에만 선(LineString)으로 만들고,
       - N == 1 이면 점(Point)으로 반환한다.

3) 위 두 가지 경우 모두 아니면, 공개된 속성만으로는 모양을 알 수 없으므로
   `None` 을 돌려준다.

Args:
    obj (MapObject):
        - NuPlan 맵에서 가져온 아무 종류의 맵 객체.
        - 예: 차선, 차선연결, 도로묶음, 교차로 등.

Returns:
    Optional[geom.base.BaseGeometry]:
        - Polygon / LineString / Point 같은 Shapely 도형 객체.
        - 도형 정보를 만들 수 없을 때는 `None`.
"""
# 1) 다각형이 있는 타입(예: Lane, RoadBlock, Connector 등)
polygon = getattr(obj, "polygon", None)
if polygon is not None:
    return polygon

# 2) 차선류 등: baseline_path → LineString
baseline_path = getattr(obj, "baseline_path", None)
if baseline_path is not None and hasattr(baseline_path, "discrete_path"):
    # pts: 길이 = N, 각 원소 = (x, y)  → 개념적 shape: (N, 2)
    pts = [(n.x, n.y) for n in baseline_path.discrete_path]
    if len(pts) >= 2:
        return geom.LineString(pts)
    elif len(pts) == 1:
        return geom.Point(pts[0])

# 3) 기타(공개 속성으로는 기하 획득 불가)
return None

def get_directional_proximal_map_objects(
map_api: AbstractMap,
point: Point2D,
heading: float,
radius: float,
layers: List[SemanticMapLayer],
) -> Dict[SemanticMapLayer, List[MapObject]]:
"""ego 진행 방향을 기준으로 회전된 정사각형 안에 걸치는 맵 객체를 조회한다.

이 함수는 NuPlanMap 의 :meth:`get_proximal_map_objects` 와 비슷하지만,
패치 모양이 다르다.

- 기존: point 를 중심으로 한 **가로·세로 방향 정사각형**
    · [x - radius, x + radius] × [y - radius, y + radius]
- 이 함수: point 를 중심으로 한 **ego heading 방향에 맞춰 회전된 정사각형**
    · 한 변 길이: 2 * radius
    · 한 변이 ego heading 과 평행, 다른 변은 그에 수직

포함 기준
----------
각 레이어의 모든 객체에 대해, 객체의 도형(geometry)이
회전된 정사각형과 `intersects` 인지 검사한다.

Shapely 의 `intersects` 는
“도형이 서로 **한 점이라도 겹치면** True” 이므로, 아래 경우 모두 포함된다.

* 정사각형 안에 완전히 들어온 경우
* 정사각형 모서리에 살짝 걸치는 경우
* 거의 밖에 있지만, 일부 꼭짓점이나 변이 정사각형에 닿는 경우

내부 동작 흐름
--------------
1) 지원 레이어 확인
    - `map_api.get_available_map_objects()` 로 실제 지원 레이어 목록을 가져온다.
    - 요청한 `layers` 중 지원되지 않는 레이어가 있으면 assert 로 바로 실패시킨다.

2) 회전 전 정사각형 패치 생성 (축에 정렬된 네모)
    - x 방향 범위: [point.x - radius, point.x + radius]
    - y 방향 범위: [point.y - radius, point.y + radius]
    - 이 범위로 shapely 의 `geom.box(...)` 를 사용해 네모(Polygon)를 만든다.
      · `patch`: Polygon, 모양 = 축에 정렬된 정사각형
      · 개념적 shape: 직사각형 이지만, 여기서는 항상 정사각형
        - 한 변 길이 = 2 * radius

3) ego heading 기준으로 정사각형 회전
    - heading(라디안) → degree 로 변환: `angle_deg = heading * 180 / π`
    - `affinity.rotate(patch, angle_deg, origin=(point.x, point.y))` 호출
        · origin 을 ego 위치로 지정해서,
          정사각형이 ego 위치를 중심으로 회전하게 만든다.
    - 결과:
        · `rotated_patch`: Polygon
        · 한 변은 ego 진행 방향과 평행,
          나머지 한 변은 그에 정확히 수직

4) 레이어별로 geometry ∩ rotated_patch 검사
    - 여기서부터는 NuPlanMap 구체 구현에 의존하므로
      `map_api` 가 `NuPlanMap` 인지 확인 후 캐스팅한다.
    - 각 레이어에 대해:
        a) `layer_df = map_api._get_vector_map_layer(layer)` 로
           해당 레이어의 벡터 데이터를 가져온다.
           · `layer_df["geometry"]`: 각 행의 도형(Polygon 등), shape ≈ (num_objects,)
        b) `mask = layer_df["geometry"].intersects(rotated_patch)` 로
           각 도형이 회전된 정사각형과 겹치는지 계산한다.
           · `mask`: pandas Series(bool), shape: (num_objects,)
               - True  → 정사각형과 최소 한 점이라도 겹침
               - False → 전혀 안 겹침
        c) `map_object_ids = layer_df.loc[mask]["fid"]` 로
           겹치는 행들의 id 를 뽑는다.
        d) `map_api.get_map_object(fid, layer)` 를 사용해
           실제 `MapObject` 인스턴스를 얻는다.
           이렇게 얻은 객체들을 리스트로 모아 `object_map[layer]` 에 저장한다.

자료 구조 / shape 정리
----------------------
입력
  - map_api (AbstractMap):
      · 실제로는 NuPlanMap 인스턴스여야 한다.
      · 그렇지 않으면 TypeError 를 일으킨다.

  - point (Point2D):
      · ego 위치 (x, y), 단위 m

  - heading (float):
      · ego 진행 방향(라디안)

  - radius (float):
      · 회전된 정사각형 한 변의 절반 길이 [m]
      · 정사각형 전체 크기 = (2 * radius) × (2 * radius)

  - layers (List[SemanticMapLayer]):
      · 예: [SemanticMapLayer.LANE, SemanticMapLayer.ROADBLOCK]

중간 변수
  - patch: geom.Polygon
      · 축에 정렬된 정사각형
      · 좌표 범위: x ∈ [x-radius, x+radius], y ∈ [y-radius, y+radius]

  - rotated_patch: geom.Polygon
      · ego heading 에 따라 회전된 정사각형
      · patch 와 꼭짓점 좌표는 같지만, 회전된 상태

  - layer_df: VectorLayer (실제로는 GeoDataFrame)
      · 각 레이어의 벡터 데이터
      · `layer_df["geometry"]`: 각 객체의 도형, 길이 ≈ num_objects

  - mask: pandas.Series[bool]
      · shape: (num_objects,)
      · True 인 인덱스는 rotated_patch 와 교차하는 객체

출력
  - object_map: Dict[SemanticMapLayer, List[MapObject]]
      · key: 입력으로 넘긴 각 레이어
      · value: 해당 레이어에서 “회전된 정사각형과 조금이라도 겹치는” MapObject 리스트
      · 각 리스트 길이 = 해당 레이어에서 조건을 만족하는 객체 수

Args:
    map_api (AbstractMap):
        NuPlanMap 인스턴스여야 한다(내부 벡터 레이어 접근 필요).
    point (Point2D):
        정사각형 중심이 될 ego 위치 (x, y).
    heading (float):
        ego 진행 방향 (라디안).
    radius (float):
        정사각형 한 변의 절반 길이 [m].
    layers (List[SemanticMapLayer]):
        조회할 레이어 목록.

Returns:
    Dict[SemanticMapLayer, List[MapObject]]:
        레이어별로, 회전된 정사각형과 조금이라도 겹치는 MapObject 들을 모은 딕셔너리.

Raises:
    TypeError:
        - map_api 가 NuPlanMap 타입이 아닐 때.
    AssertionError:
        - 요청한 레이어 중 현재 맵에서 지원하지 않는 레이어가 있을 때.
"""

# 1) 지원 레이어 확인
supported_layers: List[
    SemanticMapLayer] = map_api.get_available_map_objects()
unsupported_layers: List[SemanticMapLayer] = [
    layer for layer in layers if layer not in supported_layers
]
assert len(unsupported_layers) == 0, (
    f"Object representation for layer(s): {unsupported_layers} is unavailable"
)

# 2) 회전 전 축정렬 정사각형 생성
x_min, x_max = point.x - radius, point.x + radius
y_min, y_max = point.y - radius, point.y + radius
patch: geom.Polygon = geom.box(x_min, y_min, x_max, y_max)

# 3) ego heading 기준으로 정사각형 회전 (deg 단위 필요)
angle_deg: float = float(heading) * 180.0 / np.pi
rotated_patch: geom.Polygon = affinity.rotate(
    patch,
    angle_deg,
    origin=(point.x, point.y),
)

object_map: Dict[SemanticMapLayer, List[MapObject]] = defaultdict(list)

# 4) 각 레이어에서 rotated_patch 와 intersects 인 객체만 선택
for layer in layers:
    # VectorLayer 는 GeoDataFrame 과 비슷한 구조라고 보면 된다.
    layer_df = map_api._get_vector_map_layer(layer)

    # geometry 컬럼과 회전된 정사각형의 "겹침 여부"를 벡터화해서 계산
    # mask: (num_objects,) bool
    mask = layer_df["geometry"].intersects(rotated_patch)

    # mask 가 True 인 행들의 fid 를 가져온다.
    map_object_ids = layer_df.loc[mask]["fid"]

    # fid 로 실제 MapObject 인스턴스를 생성
    object_map[layer] = [
        map_api.get_map_object(map_object_id, layer)
        for map_object_id in map_object_ids
    ]

return object_map

def get_circular_proximal_map_objects(
map_api: AbstractMap,
point: Point2D,
radius: float,
layers: List[SemanticMapLayer],
) -> Dict[SemanticMapLayer, List[MapObject]]:
"""원(동그라미) 반경 안에 조금이라도 걸치는 맵 객체들을 레이어별로 모아준다.

이 함수는 NuPlanMap 의 :meth:`get_proximal_map_objects` 와 비슷하지만,
**축에 정렬된 네모** 대신 **원 모양 영역**을 기준으로 객체를 찾는다.

- 기존: point 를 중심으로 한 네모 영역
    · x ∈ [point.x - radius, point.x + radius]
    · y ∈ [point.y - radius, point.y + radius]
- 이 함수: point 를 중심으로 한 **원(반지름 radius)**

포함 기준
----------
각 레이어의 모든 객체에 대해, 그 객체의 도형(geometry)이
이 원과 shapely 의 `intersects` 여부를 체크한다.

- `intersects(...)` 가 True 인 경우:
    · 원 안에 완전히 들어온 경우
    · 원 경계에 살짝 걸친 경우
    · 객체 대부분은 밖에 있지만 일부 모서리/변만 원에 닿는 경우
  모두 **포함**된다.

내부 동작 순서
--------------
1) 입력 맵 타입 확인
    - `map_api` 가 실제로 NuPlanMap 인스턴스인지 확인한다.
      (내부 벡터 레이어에 접근해야 하므로 필수)

2) 지원 레이어 검증
    - `map_api.get_available_map_objects()` 로 현재 맵에서 지원하는 레이어 목록을 얻는다.
    - 요청한 `layers` 중 지원하지 않는 레이어가 있으면 `assert` 로 바로 실패시킨다.

3) 원(원판) 도형 생성
    - 중심점: `center = geom.Point(point.x, point.y)`
    - 원 도형: `patch = center.buffer(radius)`
      · `patch` 는 Shapely Polygon 이고, 원을 다각형으로 근사한 결과.
      · 개념적으로는 “반지름이 radius 인 원”이라고 보면 된다.

4) 레이어별 geometry ∩ 원 검사
    - 각 레이어에 대해:
        a) `layer_df = map_api._get_vector_map_layer(layer)`
           · NuPlan 내부의 벡터 레이어(GeoDataFrame 유사 구조) 조회
           · `layer_df["geometry"]` 컬럼에는 각 행의 도형(Polygon 등)이 들어 있음
             - shape: (num_objects,)
        b) `mask = layer_df["geometry"].intersects(patch)`
           · `mask`: 길이 (num_objects,) 의 bool Series
           · True  → 해당 geometry 가 원과 한 점이라도 겹친다
           · False → 전혀 겹치지 않는다
        c) `map_object_ids = layer_df.loc[mask]["fid"]`
           · 원과 겹치는 객체들의 id(fId)만 추출
        d) 각 id 에 대해 `map_api.get_map_object(fid, layer)` 를 호출하여
           실제 `MapObject` 인스턴스를 만들고 리스트에 담는다.
    - 이렇게 만들어진 리스트를 `object_map[layer]` 에 저장한다.

자료 구조 / shape 정리
----------------------
입력
  - map_api (AbstractMap):
      · 실제 타입: NuPlanMap (아니면 TypeError 발생)
  - point (Point2D):
      · ego 위치 (x, y), 단위 m
  - radius (float):
      · 원의 반지름 [m]
  - layers (List[SemanticMapLayer]):
      · 예: [SemanticMapLayer.LANE, SemanticMapLayer.ROADBLOCK]

중간 변수
  - center: shapely.geometry.Point
      · (point.x, point.y)
  - patch: shapely.geometry.Polygon
      · `center.buffer(radius)` 로 만든 원 모양 영역
  - layer_df: VectorLayer (GeoDataFrame 비슷)
      · `layer_df["geometry"]`: 길이 = num_objects
  - mask: pandas.Series(bool)
      · shape: (num_objects,)
      · True 인 인덱스만 원과 겹치는 객체

출력
  - object_map: Dict[SemanticMapLayer, List[MapObject]]
      · key: 입력으로 받은 각 레이어
      · value: 해당 레이어에서 원과 조금이라도 겹치는 맵 객체 리스트

Args:
    map_api (AbstractMap):
        NuPlanMap 인스턴스여야 한다. (내부 벡터 레이어 접근 필요)
    point (Point2D):
        원의 중심이 될 포인트 (x, y).
    radius (float):
        원의 반경 [m].
    layers (List[SemanticMapLayer]):
        조회할 레이어 목록.

Returns:
    Dict[SemanticMapLayer, List[MapObject]]:
        레이어별로, 중심 원과 한 점이라도 겹치는 MapObject 들을 모은 딕셔너리.

Raises:
    TypeError:
        - map_api 가 NuPlanMap 타입이 아닐 때.
    AssertionError:
        - 요청한 레이어 중 지원되지 않는 레이어가 있을 때.
"""
# 1) 요청 레이어가 실제로 지원되는지 확인
supported_layers: List[
    SemanticMapLayer] = map_api.get_available_map_objects()
unsupported_layers: List[SemanticMapLayer] = [
    layer for layer in layers if layer not in supported_layers
]
assert len(unsupported_layers) == 0, (
    f"Object representation for layer(s): {unsupported_layers} is unavailable"
)

# 2) 원(패치) 생성: 중심은 point, 반경은 radius
center: geom.Point = geom.Point(point.x, point.y)
patch: geom.Polygon = center.buffer(radius)

object_map: Dict[SemanticMapLayer, List[MapObject]] = defaultdict(list)

# 3) 각 레이어에서, 원과 intersects 인 geometry 만 선택
for layer in layers:
    layer_df = map_api._get_vector_map_layer(layer)

    # geometry 가 원(patch)와 한 점이라도 겹치는 행만 선택
    # mask: (num_objects,) bool
    mask = layer_df["geometry"].intersects(patch)
    map_object_ids = layer_df.loc[mask]["fid"]

    object_map[layer] = [
        map_api.get_map_object(map_object_id, layer)
        for map_object_id in map_object_ids
    ]

return object_map

def build_agent_route_lane_order(
npc_route_on_chosen_lane_idx_list: List[
List[int]], # 길이: chosen_agent_num, 각 원소 길이 가변
chosen_lane_num: int,
dtype: np.dtype = np.int32,
) -> np.ndarray: # shape: (chosen_agent_num, chosen_lane_num)
"""에이전트별 “경로 위 차선 인덱스 리스트”를
정수 랭크 행렬로 바꾼다.

개념
----
- npc_route_on_chosen_lane_idx_list[i] 가 [5, 2, 7] 이라면,
  i번째 에이전트 입장에서
    · lane 5 → 0번째로 가까운 route 차선
    · lane 2 → 1번째
    · lane 7 → 2번째
  로 해석한다.
- 따라서 반환 행렬의 [i, j] 원소는
  “에이전트 i 입장에서 j번 차선이 route 위에서 몇 번째인지”를 뜻한다.
  · 0, 1, 2, ... : 해당 순서
  · -1           : 해당 에이전트의 route 에 없는 차선

동작 순서
--------
1) 결과 행렬을 -1 로 채운다. shape = (chosen_agent_num, chosen_lane_num).
2) 각 에이전트 i 에 대해:
   - npc_route_on_chosen_lane_idx_list[i] 를 배열로 만든다.
   - 음수 인덱스 또는 범위를 벗어나는 인덱스가 있으면 예외를 발생시킨다.
   - 중복된 lane 인덱스는 “첫 등장 순서”만 인정한다.
     (np.unique + first_indices + argsort 로 구현)
   - 유효한 lane 인덱스를 등장 순서대로 정렬하고,
     0,1,2,... 랭크를 한 번에 할당한다.

Args:
    npc_route_on_chosen_lane_idx_list:
        - 길이: chosen_agent_num
        - 각 원소는 해당 에이전트의 “경로 위 차선 인덱스 리스트”.
    chosen_lane_num:
        - 전체 차선 개수 (행렬의 두 번째 차원 크기).
    dtype:
        - 반환 행렬의 정수 dtype. 기본값 np.int32.

Returns:
    np.ndarray:
        - agent_route_lane_order
        - shape = (chosen_agent_num, chosen_lane_num)
        - 각 [i, j] 원소는 에이전트 i 에서 lane j 의 순서(0,1,2,...) 또는 -1.
"""
chosen_agent_num: int = len(npc_route_on_chosen_lane_idx_list)

# 결과 행렬 초기화: (chosen_agent_num, chosen_lane_num)
agent_route_lane_order: np.ndarray = np.full(
    (chosen_agent_num, chosen_lane_num),
    -1,
    dtype=dtype,
)

# 에이전트가 없거나 차선이 없으면 바로 반환
if chosen_agent_num == 0 or chosen_lane_num == 0:
    return agent_route_lane_order

for agent_i, route_on_chosen_lane_idx in enumerate(
        npc_route_on_chosen_lane_idx_list):
    # 이 에이전트는 route 상에 차선이 없는 경우
    if not route_on_chosen_lane_idx:
        continue

    # lane_idx_list: (L,)
    lane_idx_list: np.ndarray = np.asarray(route_on_chosen_lane_idx,
                                           dtype=int)

    # 범위 체크 (벡터화)
    if np.any(lane_idx_list < 0):
        bad_idx: int = int(lane_idx_list[lane_idx_list < 0][0])
        raise ValueError(f"음수 인덱스가 발견되었습니다: {bad_idx}")
    if np.any(lane_idx_list >= chosen_lane_num):
        bad_idx = int(lane_idx_list[lane_idx_list >= chosen_lane_num][0])
        raise ValueError(
            f"lane_idx {bad_idx} 가 chosen_lane_num={chosen_lane_num} 범위를 벗어났습니다."
        )

    # 중복 제거 (첫 등장 순서 유지)
    # unique_vals: (M,), first_indices: (M,)
    unique_vals, first_indices = np.unique(lane_idx_list, return_index=True)
    # 원래 등장 순서대로 정렬
    order: np.ndarray = np.argsort(first_indices)  # shape: (M,)
    unique_lane_idx: np.ndarray = unique_vals[order]  # shape: (M,)

    # 랭크 벡터: [0, 1, 2, ...], shape: (M,)
    ranks: np.ndarray = np.arange(unique_lane_idx.shape[0], dtype=dtype)

    # 한 번에 대입
    agent_route_lane_order[agent_i, unique_lane_idx] = ranks

return agent_route_lane_order

def _lane_min_dist_order(
lanes_xy: np.ndarray, # shape: (chosen_lane_num, lane_len, 2)
neighbor_current_xy: np.ndarray, # shape: (2,)
) -> np.ndarray: # shape: (chosen_lane_num,)
"""에이전트 위치에서 각 차선까지의 최소 거리를 계산해, 가까운 순으로 정렬된 인덱스를 만든다.

Args:
    lanes_xy:
        차선 폴리라인 좌표.
        shape = (chosen_lane_num, lane_len, 2).
    neighbor_current_xy:
        에이전트 현재 위치 [x, y].
        shape = (2,).

Returns:
    np.ndarray:
        lane 인덱스가 "에이전트와 가까운 순"으로 정렬된 배열.
        shape = (chosen_lane_num,).
"""
# diff: (chosen_lane_num, lane_len, 2)
diff: np.ndarray = lanes_xy - neighbor_current_xy[None, None, :]
# dists: (chosen_lane_num, lane_len)
dists: np.ndarray = np.linalg.norm(diff, axis=-1)
# min_dists: (chosen_lane_num,)
min_dists: np.ndarray = np.min(dists, axis=1)
# 가까운 순 정렬 인덱스 반환
return np.argsort(min_dists)

def _select_lanes_by_order(
chosen_lane_dist_order: np.ndarray, # shape: (chosen_lane_num,)
chosen_lanes_route_mask_arr: np.ndarray, # shape: (chosen_lane_num,)
) -> List[int]:
"""거리 정렬 결과와 True/False 마스크를 이용해
“경로 위에 있는 차선 인덱스”만 가까운 순으로 고른다.

동작 방식
--------
- `chosen_lane_dist_order` 는 “가까운 차선부터 먼 차선까지” 정렬된 인덱스 배열이다.
- 같은 길이의 `chosen_lanes_route_mask_arr` 에서 True 인 위치만 골라,
  그 인덱스를 리스트에 담아 반환한다.
- 별도의 개수 제한은 없으며, True 인 차선은 전부 사용한다.

Args:
    chosen_lane_dist_order:
        에이전트와의 최소 거리 기준으로 정렬된 lane 인덱스 배열.
        shape = (chosen_lane_num,).
    chosen_lanes_route_mask_arr:
        해당 lane 이 “경로 위(True)”에 있는지 여부 마스크.
        shape = (chosen_lane_num,).

Returns:
    List[int]:
        경로 위에 있는 lane 인덱스 리스트.
        · 가까운 순서대로 정렬되어 있음.
"""
route_on_chosen_lane_idx: List[int] = []
for lane_idx in chosen_lane_dist_order:
    if chosen_lanes_route_mask_arr[lane_idx]:
        route_on_chosen_lane_idx.append(int(lane_idx))

return route_on_chosen_lane_idx

def _select_token_and_ordered_npc_route_indices(
car_token_to_chosen_lanes_route_mask: Dict[
str, List[bool]], # 길이: chosen_car_num / 값: 길이 = chosen_lane_num
neighbor_track_token: List[str], # 길이: chosen_agent_num
neighbor_agents_current: np.ndarray, # shape: (chosen_agent_num, 11)
vector_map_lanes: np.ndarray, # shape: (chosen_lane_num, lane_len, D)
) -> np.ndarray: # shape: (chosen_agent_num, chosen_lane_num)
"""차량 토큰별 route 차선 마스크를 이용해,
에이전트별 “경로 위 차선 순서 행렬”을 만든다.

개념
----
- 입력으로, 각 차량 토큰에 대해
  `[차선이 그 차량 경로 위에 있으면 True, 아니면 False]` 리스트가 주어진다.
- 각 에이전트 슬롯에는 track_token 이 있으므로,
  해당 토큰이 가진 True/False 마스크를 꺼내 쓸 수 있다.
- 에이전트별로:
    1) 에이전트 위치와 각 차선 폴리라인(lanes_xy) 사이의 최소 거리를 구해,
       가까운 순으로 lane 인덱스를 정렬한다. (`_lane_min_dist_order`)
    2) 그 순서대로, True 인 차선만 골라 route 위 차선 인덱스 리스트를 만든다.
       (`_select_lanes_by_order`)
    3) 전체 에이전트에 대해 위 리스트들을 모아
       `build_agent_route_lane_order` 로 랭크 행렬을 만든다.

주의 사항
--------
- `car_token_to_chosen_lanes_route_mask` 에 해당 토큰이 없으면
  “경로 위 차선이 하나도 없다”고 보고, 그 에이전트는 빈 리스트(모든 lane=-1)를 가지게 된다.

Args:
    car_token_to_chosen_lanes_route_mask:
        - 차량 토큰 → 길이 chosen_lane_num 의 True/False 리스트.
    neighbor_track_token:
        - 길이 chosen_agent_num.
        - 각 슬롯에 대응하는 에이전트의 track_token.
    neighbor_agents_current:
        - 현재 프레임 이웃 에이전트 상태.
        - shape = (chosen_agent_num, 11).
        - 여기서는 위치 x,y 만 사용 ([:, 0], [:, 1]).
    vector_map_lanes:
        - 차선 벡터(좌표+기타 정보).
        - shape = (chosen_lane_num, lane_len, D).
        - 여기서는 좌표 부분 [:, :, :2] 만 사용.

Returns:
    np.ndarray:
        - agent_route_lane_order:
            에이전트별 route 차선 순서 행렬.
            shape = (chosen_agent_num, chosen_lane_num), dtype = int64.
"""
chosen_agent_num: int = int(neighbor_agents_current.shape[0])
chosen_lane_num: int = int(vector_map_lanes.shape[0])

# lanes_xy: (chosen_lane_num, lane_len, 2)
lanes_xy: np.ndarray = vector_map_lanes[:, :, :2]

# npc_route_on_chosen_lane_idx_list:
#   길이 = chosen_agent_num, 각 원소: 선택된 lane 인덱스 리스트
npc_route_on_chosen_lane_idx_list: List[List[int]] = []

for agent_idx in range(chosen_agent_num):
    token: str = neighbor_track_token[agent_idx]
    chosen_lanes_route_mask: List[
        bool] = car_token_to_chosen_lanes_route_mask.get(
            token,
            [False] * chosen_lane_num,
        )

    # chosen_lanes_route_mask_arr: (chosen_lane_num,)
    chosen_lanes_route_mask_arr: np.ndarray = np.asarray(
        chosen_lanes_route_mask,
        dtype=bool,
    )

    # 이 에이전트 경로 위에 있는 차선이 하나도 없으면 빈 리스트
    if chosen_lanes_route_mask_arr.sum() == 0:
        npc_route_on_chosen_lane_idx_list.append([])
        continue

    # neighbor_current_xy: (2,)
    neighbor_current_xy: np.ndarray = neighbor_agents_current[agent_idx, :2]

    # chosen_lane_dist_order: (chosen_lane_num,)
    chosen_lane_dist_order: np.ndarray = _lane_min_dist_order(
        lanes_xy=lanes_xy,
        neighbor_current_xy=neighbor_current_xy,
    )

    # 경로 위(True)인 lane 전부 선택 (가까운 순으로)
    route_on_chosen_lane_idx: List[int] = _select_lanes_by_order(
        chosen_lane_dist_order=chosen_lane_dist_order,  # (chosen_lane_num,)
        chosen_lanes_route_mask_arr=
        chosen_lanes_route_mask_arr,  # (chosen_lane_num,)
    )
    npc_route_on_chosen_lane_idx_list.append(route_on_chosen_lane_idx)

# 에이전트 수와 길이 정합성 체크
assert len(npc_route_on_chosen_lane_idx_list) == chosen_agent_num, (
    f"npc_route_on_chosen_lane_idx_list 길이({len(npc_route_on_chosen_lane_idx_list)}) "
    f"!= chosen_agent_num({chosen_agent_num})")

# (chosen_agent_num, chosen_lane_num)
agent_route_lane_order: np.ndarray = build_agent_route_lane_order(
    npc_route_on_chosen_lane_idx_list=npc_route_on_chosen_lane_idx_list,
    chosen_lane_num=chosen_lane_num,
)
return agent_route_lane_order.astype(np.int64)

def get_neighbor_track_tokens(
present_tracked_objects: TrackedObjects,
agents_cur_frame_indices: Union[Sequence[int], np.ndarray],
agents_num: int,
object_types: Optional[Sequence[TrackedObjectType]] = None,
) -> List[Optional[str]]:
"""현재 프레임에서 선택된 이웃 에이전트들의 track_token 리스트를 만든다.

개요
----
이 함수는 다음 두 정보를 합쳐서,
**“이웃 에이전트 슬롯 순서에 맞는 track_token 리스트”**를 만들어 줍니다.

1) `present_tracked_objects`
    - 현재 프레임에서 감지된 모든 객체 묶음입니다.
    - 여기서 차량/보행자/자전거 등 관심 있는 타입만 추려,
      내부적으로 “에이전트 배열”을 만들었다고 가정합니다.
    - 이때의 순서는 `_extract_agent_array` 에서 사용한 것과 동일합니다
      (즉, 같은 타입 필터 순서로 정렬됨).

2) `agents_cur_frame_indices`
    - `agent_past_process` 가 선택한 이웃 에이전트의
      “현재 프레임 기준 행 인덱스” 목록입니다.
    - 길이 K(≤ agents_num) 인 정수 시퀀스이며,
      이 순서가 곧 이웃 에이전트 슬롯 순서가 됩니다.

이 함수는,
- 현재 프레임에서 관심 타입 에이전트들을 순서대로 나열한 뒤
- `agents_cur_frame_indices[k]` 를 이용해 해당 행의 `track_token` 을 꺼내
  `neighbor_track_token[k]` 에 채워 넣습니다.
- 슬롯 개수 `agents_num` 만큼의 리스트를 항상 반환하며,
  인덱스 범위를 벗어나거나 매핑할 수 없는 경우에는 `None` 으로 채웁니다.

Args:
    present_tracked_objects (TrackedObjects):
        현재 프레임의 감지 결과.
        여러 타입의 객체를 포함할 수 있으며,
        내부에서 `get_tracked_objects_of_types(object_types)` 로
        관심 타입만 추려 사용합니다.
    agents_cur_frame_indices (Union[Sequence[int], np.ndarray]):
        - shape: (K,)
        - 이웃 에이전트들이 현재 프레임 에이전트 배열에서 차지하는 행 인덱스들.
        - `agent_past_process` 의 `agents_cur_frame_indices` 를 그대로 넘겨 사용합니다.
    agents_num (int):
        - 출력할 이웃 슬롯의 개수입니다.
        - 반환되는 리스트 길이가 됩니다.
    object_types (Optional[Sequence[TrackedObjectType]]):
        - 필터링할 객체 타입 목록입니다.
        - 기본값은 `(VEHICLE, PEDESTRIAN, BICYCLE)` 이며,
          `_extract_agent_array` 에서 사용한 타입 순서와 동일해야
          인덱스 매핑이 올바르게 유지됩니다.

Returns:
    List[Optional[str]]:
        - 길이: `agents_num`
        - 각 원소는 해당 이웃 슬롯에 대응하는 `track_token` (문자열) 이거나,
          매핑할 수 없을 때는 `None` 입니다.
        - `agents_cur_frame_indices` 가 `None` 이면
          길이 `agents_num` 의 `[None, None, ...]` 리스트를 반환합니다.

Raises:
    ValueError:
        - `agents_num` 이 음수인 경우.

Notes:
    - `agents_cur_frame_indices` 의 길이가 `agents_num` 보다 길면,
      앞에서부터 `agents_num` 개까지만 사용합니다.
    - 현재 프레임에서 관심 타입으로 필터링한 에이전트 개수를 `M` 이라 할 때,
      인덱스가 `0 <= idx < M` 범위를 벗어나면 해당 슬롯은 `None` 으로 남습니다.
"""
if agents_num < 0:
    raise ValueError(f"`agents_num`은 음수가 될 수 없습니다. got {agents_num}")

# `_extract_agent_array`와 동일한 타입 필터 순서 유지
if object_types is None:
    object_types = (
        TrackedObjectType.VEHICLE,
        TrackedObjectType.PEDESTRIAN,
        TrackedObjectType.BICYCLE,
    )

# 현재 프레임에서 관심 타입만 '그 순서 그대로' 나열
# current_agents: List[TrackedObject], 길이 = M
current_agents: List[
    TrackedObject] = present_tracked_objects.get_tracked_objects_of_types(
        object_types)  # type: ignore[assignment]
# tokens_in_present_order: (M,) — 관심 타입 에이전트들의 track_token 문자열
tokens_in_present_order: List[str] = [
    str(agent.track_token) for agent in current_agents
]

# 반환 버퍼 준비: 길이 = agents_num
neighbor_track_token: List[Optional[str]] = [None] * int(agents_num)

# agents_cur_frame_indices 정규화(int list)
if agents_cur_frame_indices is None:
    return neighbor_track_token
# numpy, list, tuple 등 모두 int 리스트로 캐스팅
# idx_list: List[int], 길이 = K
idx_list: List[int] = list(
    map(int,
        np.asarray(agents_cur_frame_indices).reshape(-1).tolist()))

# 앞에서부터 agents_num개만 매핑
max_fill = min(len(idx_list), agents_num)
for slot_idx in range(max_fill):
    src_idx = idx_list[slot_idx]
    if 0 <= src_idx < len(tokens_in_present_order):
        neighbor_track_token[slot_idx] = tokens_in_present_order[src_idx]
    else:
        # 범위를 벗어나면 안전하게 None 유지
        neighbor_track_token[slot_idx] = None

return neighbor_track_token

시나리오 전체 horizon(초) 계산: 시작~끝 타임스탬프 차이

def _scenario_total_horizon_s(scn: AbstractScenario) -> float:
"""
시나리오 시작 시각(초)부터 로그 파일의 끝 시각(초) 까지의 horizon을 계산한다.
- 시나리오 토큰이 1개뿐이라 duration이 0이어도, DB의 end time을 사용해 올바르게 계산한다.

Args:
    scn: nuPlan Scenario 객체

Returns:
    float: horizon [s]
"""
# 1) 시작 시각(초): 공개 API 사용
start_s = float(scn.get_time_point(0).time_s)

# 3) fallback: DB의 실제 끝 시각(마이크로초)으로 계산
# 내부 모듈: nuPlan devkit 표준

# NuPlanScenario는 _log_file을 보유 (public은 아니지만 일반적으로 접근 가능)
log_file_path: str = getattr(scn, "_log_file")
end_us: int = get_end_sensor_time_from_db(log_file_path,
                                          get_lidarpc_sensor_data())
end_s = float(end_us) * 1e-6
return max(0.0, end_s - start_s)

def get_npc_route_roadblock_ids2(
scenario: NuPlanScenario,
neighbor_track_token: List[Optional[str]],
) -> Dict[str, Optional[List[str]]]:

from collections import defaultdict
from typing import Dict, List, Optional, Set

def select_nearest_connectors_by_mean_distance(
    connector_candidates: List[RoadBlockGraphEdgeMapObject],
    sampled_trajectory_points: List["Point2D"],
    *,
    tolerance: float = 1e-6,
) -> List[RoadBlockGraphEdgeMapObject]:
    """평균 수선거리로 가장 가까운 Connector 후보(들)를 선택한다.

    Args:
        connector_candidates (List[RoadBlockGraphEdgeMapObject]):
            후보 Connector 객체 리스트. 길이 K(가변).
        sampled_trajectory_points (List[Point2D]):
            Connector 영역에서 샘플링한 궤적 점들. 길이 T(가변).
        tolerance (float):
            부동소수 오차 허용치. 최솟값과의 차이가 `≤ tolerance`면 동률로 간주.

    Returns:
        List[RoadBlockGraphEdgeMapObject]:
            평균 수선거리가 최솟값인 Connector 객체(들).
    """
    mean_distance_by_connector: Dict[RoadBlockGraphEdgeMapObject,
                                     float] = {}
    for connector in connector_candidates:
        mean_dist: float = _mean_perpendicular_distance(
            connector, sampled_trajectory_points)
        mean_distance_by_connector[connector] = mean_dist

    minimum_distance: float = min(mean_distance_by_connector.values())
    return [
        conn for conn, dist in mean_distance_by_connector.items()
        if abs(dist - minimum_distance) <= tolerance
    ]

def _mean_perpendicular_distance(
    roadblock_connector: RoadBlockGraphEdgeMapObject,
    trajectory_points: List["Point2D"],
) -> float:
    """궤적 점들과 Connector 폴리곤 간 평균 수선거리를 계산한다.

    Args:
        roadblock_connector (RoadBlockGraphEdgeMapObject):
            거리 계산 대상 Connector.
        trajectory_points (List[Point2D]):
            궤적 포인트 리스트. 길이 T.

    Returns:
        float: 평균 수선거리 값.
    """
    polygon = roadblock_connector.polygon
    return float(
        np.mean([
            Point(pt.x, pt.y).distance(polygon) for pt in trajectory_points
        ]))

def _decide_roadblock_ids_at_connector(
    connector_candidate_objects: Set['RoadBlockGraphEdgeMapObject'],
    sampled_points_inside_connector: List['Point2D'],
    roadblock_sequence: List[str],
    previous_roadblocks_set: Set['RoadBlockGraphEdgeMapObject'],
    current_roadblocks: Set['RoadBlockGraphEdgeMapObject'],
) -> None:
    """Connector 구간 종료 시, 후보 중 연결성/거리 기준으로 선택하여 시퀀스에 확정한다.

    우선 연결성(이전/다음 RoadBlock 연결) 조건을 만족하는 후보를 우선 선택하고,
    그렇지 않으면 연결성 중 어느 하나라도 만족하는 후보들 중에서 평균 수선거리
    최소 후보(들)를 선택한다. 해당 경우가 없으면 모든 후보 중 평균 수선거리
    최소 후보(들)를 선택한다.

    Args:
        connector_candidate_objects (Set[RoadBlockGraphEdgeMapObject]):
            구간 동안 누적된 Connector 후보 집합.
        sampled_points_inside_connector (List[Point2D]):
            해당 Connector 구간에서 수집한 궤적 점들. 길이 T.
        roadblock_sequence (List[str]):
            결과를 축적할 RoadBlock/Connector id 시퀀스(가변).
        previous_roadblocks_set (Set[RoadBlockGraphEdgeMapObject]):
            직전 프레임의 RoadBlock 집합.
        current_roadblocks (Set[RoadBlockGraphEdgeMapObject]):
            현재 프레임의 RoadBlock 집합(다음 구간 연결 확인용).
    """
    graph_linkable_connectors: List[RoadBlockGraphEdgeMapObject] = []
    graph_linkable_connectors_candidates: List[
        RoadBlockGraphEdgeMapObject] = []

    incoming_and_outcoming_condition = len(
        previous_roadblocks_set) > 0 and len(current_roadblocks) > 0
    incoming_or_outgoing_condition = len(
        previous_roadblocks_set) > 0 or len(current_roadblocks) > 0

    # (1) 이전/다음 RoadBlock 모두와 연결되는 Connector 우선
    if incoming_and_outcoming_condition:
        for conn in connector_candidate_objects:
            incoming_ids = {rb.id for rb in conn.incoming_edges}
            previous_ids = {rb.id for rb in previous_roadblocks_set}
            outgoing_ids = {rb.id for rb in conn.outgoing_edges}
            current_ids = {rb.id for rb in current_roadblocks}

            if bool(previous_ids & incoming_ids) and bool(current_ids &
                                                          outgoing_ids):
                graph_linkable_connectors.append(conn)

        if graph_linkable_connectors:
            roadblock_sequence.extend(
                [conn.id for conn in graph_linkable_connectors])
            return

    # (2) 이전 또는 다음 RoadBlock 중 하나와 연결되면 후보로 인정 후 거리 최소
    if (incoming_and_outcoming_condition or incoming_or_outgoing_condition):
        for conn in connector_candidate_objects:
            incoming_ids = {rb.id for rb in conn.incoming_edges}
            previous_ids = {rb.id for rb in previous_roadblocks_set}
            outgoing_ids = {rb.id for rb in conn.outgoing_edges}
            current_ids = {rb.id for rb in current_roadblocks}

            if bool(previous_ids & incoming_ids) or bool(current_ids &
                                                         outgoing_ids):
                graph_linkable_connectors_candidates.append(conn)

        if graph_linkable_connectors_candidates:
            closest_connectors = select_nearest_connectors_by_mean_distance(
                graph_linkable_connectors_candidates,
                sampled_points_inside_connector,
                tolerance=1e-6,
            )
            roadblock_sequence.extend(
                conn.id for conn in closest_connectors)
            return

    # (3) 연결성 조건 없으면 전체 후보 중 거리 최소
    closest_connectors = select_nearest_connectors_by_mean_distance(
        connector_candidate_objects,
        sampled_points_inside_connector,
        tolerance=1e-6,
    )
    roadblock_sequence.extend(conn.id for conn in closest_connectors)

# ───────────────────────── 보조 함수들(가독성) ─────────────────────────

def _collect_candidate_tokens(
        neighbor_track_token: List[Optional[str]]) -> Set[str]:
    """None을 제외한 후보 토큰 집합을 만든다.

    Args:
        neighbor_track_token (List[Optional[str]]):
            길이 = max_agent_num. 토큰 또는 None.

    Returns:
        Set[str]: 후보 토큰 집합.
    """
    return {t for t in neighbor_track_token if t is not None}

def _collect_future_vehicle_trajectories(
    scenario: NuPlanScenario,
    candidate_tokens: Set[str],
    total_horizon_s: float = 20.,
) -> Dict[str, List[TrackedObject]]:
    """미래 기간 동안의 차량 궤적을 토큰별로 수집한다.

    Args:
        scenario (NuPlanScenario): 시나리오.
        candidate_tokens (Set[str]): 후보 토큰 집합.
        total_horizon_s (float): 수집할 미래 수평선(초).

    Returns:
        Dict[str, List[SceneObject]]:
            키=토큰, 값=해당 차량의 시간 순 궤적 리스트(길이 가변).
    """
    car_token_to_object_list: Dict[str,
                                   List[TrackedObject]] = defaultdict(list)
    num_samples = int(total_horizon_s * 10)  # 0.1 s 간격
    for det_batch in scenario.get_future_tracked_objects(
            0, total_horizon_s, num_samples):
        for det in det_batch.tracked_objects:
            if det.tracked_object_type == TrackedObjectType.VEHICLE and (
                    det.track_token in candidate_tokens):
                car_token_to_object_list[det.track_token].append(det)
    return car_token_to_object_list

def _finalize_connector_segment_if_open(
    inside_connector_flag: bool,
    connector_candidate_objects: Set['RoadBlockGraphEdgeMapObject'],
    sampled_points_inside_connector: List['Point2D'],
    roadblock_sequence: List[str],
    previous_roadblocks_set: Set['RoadBlockGraphEdgeMapObject'],
    current_roadblocks: Set['RoadBlockGraphEdgeMapObject'],
) -> bool:
    """열린 Connector 구간이 있으면 후보 결정 후 버퍼를 리셋한다.

    Returns:
        bool: 정리 후 inside_connector_flag(False).
    """
    if inside_connector_flag:
        _decide_roadblock_ids_at_connector(
            connector_candidate_objects,
            sampled_points_inside_connector,
            roadblock_sequence,
            previous_roadblocks_set,
            current_roadblocks,
        )
        inside_connector_flag = False
        connector_candidate_objects.clear()
        sampled_points_inside_connector.clear()
    return inside_connector_flag

"""NPC 차량들의 경로(RoadBlock/RoadBlock-Connector 시퀀스)를 토큰별로 구성한다.

이 함수는 `neighbor_track_token`에 제시된 에이전트 토큰들(차량만)에 대해,
시나리오의 **미래 궤적(약 20 s)**을 주행 순서로 훑어보며 RoadBlock·RoadBlock‑Connector
교차 여부를 추출한다. Connector 구간에서는 평균 수선거리 기반으로 **가장 가까운**
Connector(동률 허용)를 선택하며, 구간 전환 시점에만 최종 확정한다.
구축된 시퀀스는 `route_roadblock_correction`로 보정한 뒤 토큰별로 반환한다.

Args:
    scenario (NuPlanScenario):
        NuPlan 시나리오 객체.
    neighbor_track_token (List[Optional[str]]):
        길이 = `max_agent_num`. 각 슬롯에 NPC의 `track_token`(없으면 `None`).

Returns:
    car_token_to_rr_ids
    Dict[str, Optional[List[str]]]:
        키 = 토큰(str).
        값 = 해당 NPC의 **보정된** RoadBlock id 시퀀스(List[str]) 또는 `None`
        (미추출 시).
        길이 : max_agent_num 중, 자동차 토큰 개수.

Raises:
    ValueError: 동일 프레임에서 RoadBlock과 RoadBlock‑Connector가 동시에
        관측될 경우(데이터 불일치).

Notes:
    - 궤적 수집은 `TrackedObjectType.VEHICLE` 에 한정.
    - Connector 구간이 여러 후보를 만들면 평균 수선거리가 **최소**인 후보들을 모두 선택.
    - 내부 보조 함수들로 단계별 처리를 분리(가독성 향상).
"""

# ───────────────────────── 메인 로직(동작 동일) ─────────────────────────

candidate_tokens = _collect_candidate_tokens(neighbor_track_token)

car_token_to_object_list: Dict[
    str, List[TrackedObject]] = _collect_future_vehicle_trajectories(
        scenario,
        candidate_tokens,  # 기존 필터는 유지 (neighbor 토큰 기반)
        total_horizon_s=20.,
    )
car_token_to_rr_ids: Dict[str, Optional[List[str]]] = {}

for car_token, car_list in car_token_to_object_list.items():
    if not car_list:
        car_token_to_rr_ids[car_token] = None
        continue

    roadblock_sequence: List[str] = []
    previous_roadblocks_set: Set['RoadBlockGraphEdgeMapObject'] = set()
    inside_connector_flag = False
    connector_candidate_objects: Set['RoadBlockGraphEdgeMapObject'] = set()
    sampled_points_inside_connector: List['Point2D'] = []

    for time_idx, car_ in enumerate(car_list):
        npc_point = car_.center.point
        current_roadblocks = set(
            scenario.map_api.get_all_map_objects(
                npc_point, SemanticMapLayer.ROADBLOCK))
        current_connectors = set(
            scenario.map_api.get_all_map_objects(
                npc_point, SemanticMapLayer.ROADBLOCK_CONNECTOR))

        # 동시 검출 예외
        # 둘 다 잡힌 경우: tie-breaker 로 하나만 사용하도록 current_*를 덮어쓴다.
        if current_roadblocks and current_connectors:
            # 시나리오의 글로벌 경로 (빈 리스트 형태 [''] 는 None 으로 처리)
            try:
                route_rr_ids = scenario.get_route_roadblock_ids()
                if isinstance(route_rr_ids, list) and route_rr_ids == ['']:
                    route_rr_ids = None
            except Exception:
                route_rr_ids = None

            rb_ids_list = [rb.id for rb in current_roadblocks]
            rbc_ids_list = [rc.id for rc in current_connectors]

            chosen_ids = _prefer_rr_on_conflict(
                rb_ids=rb_ids_list,
                rbc_ids=rbc_ids_list,
                route_rr_ids=route_rr_ids,
                verbose=False,
            )

            if chosen_ids:
                # 선택 결과와 교집합 되는 쪽을 남기고, 반대편은 비운다.
                chosen_rbc = {
                    rc for rc in current_connectors if rc.id in chosen_ids
                }
                chosen_rb = {
                    rb for rb in current_roadblocks if rb.id in chosen_ids
                }

                if chosen_rbc and not chosen_rb:
                    current_connectors = chosen_rbc
                    current_roadblocks = set()
                elif chosen_rb and not chosen_rbc:
                    current_roadblocks = chosen_rb
                    current_connectors = set()
                else:
                    # 혹시 양쪽과도 교집합이 없거나 둘 다 생기는 예외 상황이면 RBC 우선
                    if chosen_rbc:
                        current_connectors = chosen_rbc
                        current_roadblocks = set()
                    elif chosen_rb:
                        current_roadblocks = chosen_rb
                        current_connectors = set()
                    else:
                        current_roadblocks = set()  # RBC 우선 fallback
            else:
                # tie-breaker가 비었으면 RBC 우선
                current_roadblocks = set()

        # ── (A) Connector 영역 ──
        if current_connectors:
            if not inside_connector_flag:
                connector_candidate_objects.clear()
                sampled_points_inside_connector.clear()
                inside_connector_flag = True
            connector_candidate_objects.update(current_connectors)
            sampled_points_inside_connector.append(npc_point)

            # 마지막 프레임이면 곧바로 결정
            if time_idx == len(car_list) - 1:
                inside_connector_flag = _finalize_connector_segment_if_open(
                    inside_connector_flag,
                    connector_candidate_objects,
                    sampled_points_inside_connector,
                    roadblock_sequence,
                    previous_roadblocks_set,
                    current_roadblocks,
                )
            continue

        # ── (B) RoadBlock 영역 ──
        if current_roadblocks:
            # 직전이 Connector 구간이면 우선 결정
            inside_connector_flag = _finalize_connector_segment_if_open(
                inside_connector_flag,
                connector_candidate_objects,
                sampled_points_inside_connector,
                roadblock_sequence,
                previous_roadblocks_set,
                current_roadblocks,
            )
            # RoadBlock id 중복 없이 추가
            for roadblock in current_roadblocks:
                if not roadblock_sequence or roadblock_sequence[
                        -1] != roadblock.id:
                    roadblock_sequence.append(roadblock.id)
            previous_roadblocks_set = current_roadblocks

    # 결과 보정
    if roadblock_sequence:
        start = car_list[0]
        npc_state = SimpleNamespace(rear_axle=StateSE2(
            start.center.x, start.center.y, start.center.heading))
        corrected_ids = route_roadblock_correction(
            npc_state,
            scenario.map_api,
            roadblock_sequence,
            remove_route_loops_flag=False,
        )
        car_token_to_rr_ids[car_token] = roadblock_sequence
    else:
        car_token_to_rr_ids[car_token] = None

return car_token_to_rr_ids

=====================

1. Ego, agent, static coordination transformation

=====================

def _local_to_local_transforms(
global_states1: np.ndarray, # (N, 3) = [x1, y1, heading1] ...
global_states2: np.ndarray, # (3,) = [x_ref, y_ref, heading_ref]
) -> np.ndarray:
"""한 좌표계 기준의 포즈 집합을, 다른 좌표계 기준으로 한 번에 변환하는 함수.

이 함수는 다음과 같이 동작합니다.

- `global_states2`:
  · 새로운 기준 좌표계(로컬 프레임)의 포즈 [x, y, heading] 입니다.
- `global_states1`:
  · 예전 기준(세계 좌표계라고 가정)에서 표현된 포즈들의 집합입니다.
  · 각 행이 하나의 포즈 [x, y, heading] 입니다.

절차:
    1. `global_states2`로부터 3x3 변환행렬(포즈 → 동차변환)을 만든다.
    2. 이를 역행렬로 뒤집어, "세계 → 새로운 로컬 프레임" 변환행렬을 얻는다.
    3. `global_states1`의 각 포즈에 대해서도 3x3 변환행렬을 만든다.
    4. (2)의 행렬을 (3)에 왼쪽에서 곱해, 모두 새로운 로컬 좌표계 기준으로 변환한다.

결과적으로,
- 입력으로 주어진 여러 포즈의 변환행렬 묶음이
  "새 기준 좌표계에서 본 포즈"로 바뀐 형태로 반환됩니다.

Args:
    global_states1 (np.ndarray):
        - shape: (N, 3)
        - 각 행: [x, y, heading] (예: 세계 좌표계 기준 포즈들).
    global_states2 (np.ndarray):
        - shape: (3,)
        - 기준이 될 포즈 [x_ref, y_ref, heading_ref].

Returns:
    np.ndarray:
        - shape: (N, 3, 3)
        - 각 원소는 `global_states1`의 각 포즈를
          `global_states2` 기준 로컬 프레임으로 본 3x3 변환행렬입니다.
"""
# local_xform: (3, 3) — 기준 포즈(global_states2)에 대한 동차 변환행렬
local_xform = _state_se2_array_to_transform_matrix(global_states2)
# local_xform_inv: (3, 3) — 기준 포즈의 역변환(세계→로컬)
local_xform_inv = np.linalg.inv(local_xform)

# transforms: (N, 3, 3) — global_states1 의 각 포즈에 대한 변환행렬
transforms = _state_se2_array_to_transform_matrix_batch(global_states1)

# (N, 3, 3) — 새 로컬 프레임 기준으로 재표현된 변환행렬들
transforms = np.matmul(local_xform_inv, transforms)
return transforms

def _state_se2_array_to_transform_matrix(
input_data: np.ndarray, # (3,) = [x, y, heading]
) -> np.ndarray: # (3, 3)
"""단일 SE(2) 상태 [x, y, heading] 을 3x3 동차 변환행렬로 바꾸는 함수.

행렬 구조:
    [[ cos(h), -sin(h), x ],
     [ sin(h),  cos(h), y ],
     [   0   ,    0   , 1 ]]

이 행렬을 점 [x', y', 1]^T 에 곱하면, 회전+평행이동이 한 번에 적용됩니다.

Args:
    input_data (np.ndarray):
        - shape: (3,)
        - [x, y, heading] (라디안).

Returns:
    np.ndarray:
        - shape: (3, 3)
        - 주어진 포즈를 표현하는 2D SE(2) 동차 변환행렬.
"""
x: float = float(input_data[0])
y: float = float(input_data[1])
h: float = float(input_data[2])

cosine = np.cos(h)
sine = np.sin(h)

# (3, 3)
return np.array([[cosine, -sine, x], [sine, cosine, y], [0.0, 0.0, 1.0]])

def _state_se2_array_to_transform_matrix_batch(
input_data: np.ndarray, # (N, 3) = [[x1, y1, h1], ..., [xN, yN, hN]]
) -> np.ndarray: # (N, 3, 3)
"""여러 개의 [x, y, heading] 포즈를 한 번에 3x3 변환행렬 묶음으로 바꾸는 함수.

이 함수는 각 행이 [x, y, heading] 인 2D 포즈 배열을 입력받아,
각 포즈마다 SE(2) 동차 변환행렬을 만들어 (N, 3, 3) 형태로 반환합니다.

내부 아이디어:
    1. 각 포즈를 [x, y, cos(h), sin(h), 1] 형태로 확장한다.
    2. 미리 준비된 `reshaping_array` (5x9) 를 곱해,
       [c, -s, x, s, c, y, 0, 0, 1] 형태의 행(길이 9)을 만든다.
    3. 이를 (3, 3) 으로 reshape 하면, 개별 변환행렬이 완성된다.
    4. 이런 행을 N개 쌓아 (N, 3, 3) 배열을 얻는다.

Args:
    input_data (np.ndarray):
        - shape: (N, 3)
        - 각 행: [x, y, heading] (라디안).

Returns:
    np.ndarray:
        - shape: (N, 3, 3)
        - 각 원소는 해당 행 포즈에 대한 동차 변환행렬입니다.
"""
# input_data: (N, 3) = [x, y, heading]
# processed_input: (N, 5) = [x, y, cos(h), sin(h), 1]
processed_input = np.column_stack((
    input_data[:, 0],
    input_data[:, 1],
    np.cos(input_data[:, 2]),
    np.sin(input_data[:, 2]),
    np.ones_like(input_data[:, 0]),
))

# reshaping_array: (5, 9)
reshaping_array = np.array([
    [0, 0, 1, 0, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 1, 0, 0, 0],
    [1, 0, 0, 0, 1, 0, 0, 0, 0],
    [0, -1, 0, 1, 0, 0, 0, 0, 0],
    [0, 0, 0, 0, 0, 0, 0, 0, 1],
])
# processed_input @ reshaping_array: (N, 9)
# → 각 행이 [c, -s, x, s, c, y, 0, 0, 1] 꼴이 되고,
#   이를 (N, 3, 3) 으로 reshape 하면 변환행렬 세트가 됨.
return (processed_input @ reshaping_array).reshape(-1, 3, 3)

def _transform_matrix_to_state_se2_array_batch(
input_data: np.ndarray, # (N, 3, 3)
) -> np.ndarray: # (N, 3)
"""여러 개의 3x3 변환 행렬을 [x, y, heading] 형태의 포즈 배열로 되돌리는 함수.

이 함수는 SE(2) 동차 변환행렬 묶음(회전+이동 정보를 가진 3x3 행렬들)을 입력으로 받아,
각 행렬에 대해 다음 정보를 추출합니다.

- x: 3번째 열의 x 성분 (translation x)
- y: 3번째 열의 y 성분 (translation y)
- heading: 회전 행렬의 첫 번째 열로부터 atan2를 사용해 추출한 각도

즉, 다음과 같은 과정을 거칩니다.

1. 각 3x3 행렬의 첫 번째 열을 모아서
   [cos(heading), sin(heading), _] 꼴의 벡터들을 만든다.
2. 이로부터 `atan2(sin, cos)` 계산으로 heading(라디안)을 구한다.
3. 원래 변환행렬의 3번째 열(translation [x, y, 1])에 대해,
   마지막 요소를 heading 값으로 덮어써 [x, y, heading] 형태로 만든다.

Args:
    input_data (np.ndarray):
        - shape: (N, 3, 3)
        - 각 [i, :, :] 는 하나의 SE(2) 동차 변환행렬입니다.

Returns:
    np.ndarray:
        - shape: (N, 3)
        - 각 행은 [x, y, heading] 형태의 포즈를 나타냅니다.
"""
# first_columns: (N, 3) — 각 변환행렬의 첫 번째 열 [cos, sin, 0]
first_columns = input_data[:, :, 0].reshape(-1, 3)
# angles: (N,) — atan2(sin, cos) 로부터 구한 heading
angles = np.arctan2(first_columns[:, 1], first_columns[:, 0])

# result: (N, 3) — 원래는 3번째 열 [x, y, 1] 이었음
result = input_data[:, :, 2]
# 마지막 성분을 heading 으로 덮어써 [x, y, heading] 으로 만듦
result[:, 2] = angles

return result

def _global_state_se2_array_to_local(
global_states: np.ndarray, # (N, 3) = [x_world, y_world, heading_world]
local_state: np.ndarray, # (3,) = [x_ref, y_ref, heading_ref]
) -> np.ndarray: # (N, 3) = [x_local, y_local, heading_local]
"""여러 점의 [x, y, heading]을 기준 포즈(local_state) 기준 로컬 좌표계로 변환한다.

개념적으로 이 함수는
- `global_states` : 세계(월드) 좌표계 기준의 포즈들 집합
- `local_state`   : “새 기준 좌표계”가 될 포즈(예: ego 차량의 현재 포즈)
를 받아서, 각 포즈를 `local_state` 기준으로 보았을 때의
로컬 좌표 [x_local, y_local, heading_local] 로 바꿔 줍니다.

처리 순서:
    1. `local_state`로부터 3x3 변환 행렬(세계 → 로컬 프레임)을 만든다.
    2. `global_states`의 각 [x, y, heading]을 3x3 동차 변환행렬로 바꾼다.
    3. (1)의 역행렬을 (2)에 곱해, 모든 포즈를 로컬 좌표계 기준으로 재표현한다.
    4. 변환된 3x3 행렬 묶음을 다시 [x, y, heading] 형식의 배열로 되돌린다.

Args:
    global_states (np.ndarray):
        - shape: (N, 3)
        - 각 행: [x_world, y_world, heading_world]
          (세계 좌표계 기준 포즈들).
    local_state (np.ndarray):
        - shape: (3,)
        - [x_ref, y_ref, heading_ref]
          로컬 좌표계의 기준이 되는 포즈(예: ego 포즈).

Returns:
    np.ndarray:
        - shape: (N, 3)
        - 각 행: [x_local, y_local, heading_local]
          · `local_state`를 원점/기준으로 하는 좌표계 기준 포즈입니다.
"""
# local_xform: (3, 3) — 기준 포즈(local_state)에 대한 동차 변환행렬 (world→기준)
local_xform = _state_se2_array_to_transform_matrix(local_state)
# local_xform_inv: (3, 3) — 기준 포즈의 역변환 (기준→world) 의 역 → (world→local)
local_xform_inv = np.linalg.inv(local_xform)

# transforms: (N, 3, 3) — 각 global_state 를 world 기준 변환행렬로 표현
transforms = _state_se2_array_to_transform_matrix_batch(global_states)

# transforms: (N, 3, 3) — world 기준 포즈들을 local_state 기준 로컬 프레임으로 변환
transforms = np.matmul(local_xform_inv, transforms)

# output: (N, 3) — [x_local, y_local, heading_local]
output = _transform_matrix_to_state_se2_array_batch(transforms)

return output

def _global_velocity_to_local(
velocity: np.ndarray, # (N, 2) = [vx_world, vy_world]
anchor_heading: float, # 스칼라 heading(rad) 또는 브로드캐스트 가능한 값
) -> np.ndarray: # (N, 2) = [vx_local, vy_local]
"""월드 좌표계 기준 속도 벡터를 ego(또는 기준 heading) 좌표계 기준 속도로 회전 변환한다.

이 함수는 2D 속도 벡터 [vx, vy] (세계 좌표계 기준)를,
기준 차량(ego)의 heading(방향각)을 기준으로 회전시켜
ego 좌표계 기준 속도 [vx_local, vy_local] 로 바꾸어 줍니다.

변환 방식:
    - 기준 heading = θ 라 할 때,
      · vx_local = vx * cos(θ) + vy * sin(θ)
      · vy_local = vy * cos(θ) - vx * sin(θ)

직관적으로,
- 세계 기준으로 측정된 속도를,
- ego 차량이 바라보는 방향을 x축으로 하는 좌표계로 "돌려서" 표현한다고 보면 됩니다.

Args:
    velocity (np.ndarray):
        - shape: (N, 2)
        - 각 행: [vx_world, vy_world] (세계 좌표계 기준 속도).
    anchor_heading (float):
        - 기준이 되는 heading 값(rad).
        - 보통 ego 차량의 heading 을 넣어 사용합니다.
        - 스칼라이지만, 넘파이 브로드캐스팅 덕분에 벡터화 연산이 가능합니다.

Returns:
    np.ndarray:
        - shape: (N, 2)
        - 각 행: [vx_local, vy_local]
        - 기준 heading 좌표계(ego 기준)로 회전된 속도 벡터입니다.
"""
# velocity_x: (N,) — ego 기준 x 방향 속도
velocity_x = velocity[:, 0] * np.cos(
    anchor_heading) + velocity[:, 1] * np.sin(anchor_heading)
# velocity_y: (N,) — ego 기준 y 방향 속도
velocity_y = velocity[:, 1] * np.cos(
    anchor_heading) - velocity[:, 0] * np.sin(anchor_heading)

# (N, 2) 로 스택
return np.stack([velocity_x, velocity_y], axis=-1)

def _build_ego_pose_from_state(
ego_cur_pose_np: np.ndarray, # (3,)
) -> np.ndarray: # (3,)
"""EgoState 배열에서 ego 기준 좌표 변환에 사용할 [x, y, heading] 벡터를 만든다.

이 함수는 ego_cur_pose_np 배열에서
- x 좌표
- y 좌표
- heading(방향, rad 단위)
세 값을 뽑아서, 부동소수 형태의 1차원 벡터로 만들어준다.

Args:
    ego_cur_pose_np (np.ndarray):
        - shape: (3,)
        - [x_ego, y_ego, heading_ego] 를 담고 있는 배열.

Returns:
    np.ndarray:
        - shape: (3,)
        - [x_ego, y_ego, heading_ego] 를 float64 타입으로 담은 벡터.
"""
# ego_pose: (3,) = [x_ego, y_ego, heading_ego]
ego_pose = np.array(
    [
        float(ego_cur_pose_np[EgoInternalIndex.x()]),
        float(ego_cur_pose_np[EgoInternalIndex.y()]),
        float(ego_cur_pose_np[EgoInternalIndex.heading()]),
    ],
    dtype=np.float64,
)
return ego_pose

def _convert_ego_history_to_relative(
agent_state: np.ndarray, # (time_num, state_dim_ego=10)
ego_pose: np.ndarray, # (3,)
) -> np.ndarray: # (time_num, state_dim_ego+1=11)
"""ego(자차) 궤적을 월드 좌표계에서 ego 기준 상대 좌표계로 변환한다.

이고의 과거~현재 상태 시퀀스를 받아서,
- 위치/방향: ego 기준 좌표계로 변환
- heading: cos, sin 두 값으로 나누어 저장
- 속도: 월드 기준 속도를 ego 기준 속도로 회전 변환
- 차량 크기/타입(one-hot) 등 뒤쪽 값은 그대로 복사

최종적으로 원래보다 차원이 1 늘어난 (N, state_dim+1) 형태의 배열을 만든다.

Args:
    agent_state (np.ndarray):
        - shape: (time_num, state_dim_ego=10)
        - ego seq 궤적 (월드 좌표계).
    ego_pose (np.ndarray):
        - shape: (3,)
        - [x_ego, y_ego, heading_ego] (월드 좌표계 기준 현재 ego 상태).

Returns:
    np.ndarray:
        - shape: (time_num, state_dim_ego+1=11)
        - ego 기준 상대 좌표계로 변환된 ego 궤적.
"""
# agent_state: (time_num, state_dim_ego=10)
time_num, state_dim = agent_state.shape

# new_agent_state: (time_num, state_dim_ego+1=11)
new_agent_state = np.zeros((agent_state.shape[0], state_dim + 1),
                           dtype=np.float64)

# 크기/타입 등 뒤쪽 항목 복사
new_agent_state[:, 6:] = agent_state[:, 5:]

# agent_global_poses: (time_num, 3) = [x, y, heading]
agent_global_poses = agent_state[:, [
    EgoInternalIndex.x(),
    EgoInternalIndex.y(),
    EgoInternalIndex.heading()
]]  # (N, 3)

# transforms: (time_num, 3, 3)  — 월드→ego 변환 행렬
# agent_global_poses: (time_num, 3), 절대 좌표계 기준 값
# ego_pose # (3,) : 절대 좌표계 기준 값
transforms = _local_to_local_transforms(agent_global_poses, ego_pose)

# transformed_poses: (time_num, 3) — ego 좌표계 기준 [x, y, heading]
transformed_poses = _transform_matrix_to_state_se2_array_batch(
    transforms)  # transformed_poses: ego 좌표계 기준 값

# 위치/방향(→cos,sin) 갱신
new_agent_state[:, EgoInternalIndex.x()] = transformed_poses[:, 0]
new_agent_state[:, EgoInternalIndex.y()] = transformed_poses[:, 1]
new_agent_state[:, 2] = np.cos(transformed_poses[:, 2])
new_agent_state[:, 3] = np.sin(transformed_poses[:, 2])

# --- velocity (world -> anchor ego frame) ---
# agent_global_velocities: (time_num, 2) = [vx_world, vy_world]
agent_global_velocities = agent_state[:, [
    EgoInternalIndex.vx(), EgoInternalIndex.vy()
]]

# transformed_velocities: (time_num, 2) = [vx_ego, vy_ego]
transformed_velocities = _global_velocity_to_local(agent_global_velocities,
                                                   ego_pose[-1])

new_agent_state[:, 4] = transformed_velocities[:, 0]
new_agent_state[:, 5] = transformed_velocities[:, 1]

return new_agent_state

def _convert_agent_states_to_relative(
agent_state: np.ndarray, # (N, state_dim_agent)
ego_pose: np.ndarray, # (3,)
) -> np.ndarray: # (N, state_dim_agent)
"""주변 에이전트(차량/보행자/자전거)의 상태를 ego 기준 상대 좌표계로 변환한다.

월드 좌표계 기준으로 기록된 주변 에이전트들의 상태에서
- 위치 (x, y)
- 방향 (heading)
- 속도 (vx, vy)
만 이고 기준 좌표계로 바꿔준다.

나머지 값들(차량 크기, 기타 특성)은 그대로 유지하며,
입력 배열을 in-place 로 수정한 뒤 반환한다.

Args:
    agent_state (np.ndarray):
        - shape: (N, state_dim_agent)
            [track_id, vx, vy, heading, width, length, x, y]
        - 주변 에이전트 상태 배열.
          스키마는 AgentInternalIndex 를 따른다.
    ego_pose (np.ndarray):
        - shape: (3,)
        - [x_ego, y_ego, heading_ego] (월드 좌표계 기준 현재 ego 상태).

Returns:
    np.ndarray:
        - shape: (N, state_dim_agent)
            - [track_id, vx, vy, heading, width, length, x, y]
        - 위치/방향/속도가 ego 기준으로 바뀐 에이전트 상태 배열.
"""
# agent_global_poses: (N, 3) = [x, y, heading]
agent_global_poses = agent_state[:, [
    AgentInternalIndex.x(),
    AgentInternalIndex.y(),
    AgentInternalIndex.heading()
]]

# agent_global_velocities: (N, 2) = [vx_world, vy_world]
agent_global_velocities = agent_state[:, [
    AgentInternalIndex.vx(
    ), AgentInternalIndex.vy()
]]

# transformed_poses: (N, 3) = [x_ego, y_ego, heading_ego]
transformed_poses = _global_state_se2_array_to_local(
    agent_global_poses, ego_pose)

# transformed_velocities: (N, 2) = [vx_ego, vy_ego]
transformed_velocities = _global_velocity_to_local(agent_global_velocities,
                                                   ego_pose[-1])

# 위치/방향/속도 갱신 (in-place)
agent_state[:, AgentInternalIndex.x()] = transformed_poses[:, 0]
agent_state[:, AgentInternalIndex.y()] = transformed_poses[:, 1]
agent_state[:, AgentInternalIndex.heading()] = transformed_poses[:, 2]
agent_state[:, AgentInternalIndex.vx()] = transformed_velocities[:, 0]
agent_state[:, AgentInternalIndex.vy()] = transformed_velocities[:, 1]

return agent_state

def _convert_static_states_to_relative(
agent_state: np.ndarray, # (N, state_dim_static)
ego_pose: np.ndarray, # (3,)
) -> np.ndarray: # (N, state_dim_static)
"""정적 객체(표지판, 배리어 등)의 위치/방향을 ego 기준 상대 좌표계로 변환한다.

정적 객체의 상태 배열에서 앞의 세 값
- x 좌표
- y 좌표
- heading(방향)
만 ego 기준 좌표계로 변환하고, 나머지 값(크기 등)은 그대로 둔다.

입력 배열을 in-place 로 수정한 뒤 반환한다.

Args:
    agent_state (np.ndarray):
        - shape: (N, state_dim_static)
        - 정적 객체 상태 배열. 앞 3차원이 [x, y, heading].
    ego_pose (np.ndarray):
        - shape: (3,)
        - [x_ego, y_ego, heading_ego] (월드 좌표계 기준 현재 ego 상태).

Returns:
    np.ndarray:
        - shape: (N, state_dim_static)
        - 위치/방향이 ego 기준으로 바뀐 정적 객체 상태 배열.
"""
# agent_global_poses: (N, 3) = [x, y, heading]
agent_global_poses = agent_state[:, [0, 1, 2]]

# transformed_poses: (N, 3) = [x_ego, y_ego, heading_ego]
transformed_poses = _global_state_se2_array_to_local(
    agent_global_poses, ego_pose)

# 위치/방향 갱신 (in-place)
agent_state[:, 0] = transformed_poses[:, 0]
agent_state[:, 1] = transformed_poses[:, 1]
agent_state[:, 2] = transformed_poses[:, 2]

return agent_state

def convert_absolute_quantities_to_relative(
agent_state: np.ndarray, # (N, state_dim)
ego_cur_pose_np: np.ndarray, # (3,)
agent_type: str = 'ego',
) -> np.ndarray:
"""월드 좌표계 기준 상태들을 ego(자차) 기준 상대 좌표계로 변환하는 함수.

이 함수는 세 가지 경우를 처리합니다.

1) agent_type == 'ego'
    - 입력: 이고(자차)의 과거/현재 궤적 (월드 좌표계) # (num_frames, 10)
        - x, y, heading, vx, vy, width, length, (car, pedestrian, cyclist)
    - 출력: 이고 기준으로 다시 표현된 궤적 # (num_frames, 11)
      (위치/방향/속도는 ego 기준, 차체 크기와 타입(one-hot)은 그대로 유지)
    - 결과 shape: (N, original_dim + 1)
      · heading → cos, sin 두 차원으로 나뉘면서 1차원 증가

2) agent_type == 'agent'
    - 입력: 주변 에이전트(차량/보행자/자전거 등)의 상태 (월드 좌표계)
        - [track_id, vx, vy, heading, width, length, x, y]
    - 출력: 이고 기준 상대 좌표계로 변환된 에이전트 상태
      · 위치/방향/속도만 ego 기준으로 바뀌고, 나머지는 그대로 유지
    - in-place 방식으로 `agent_state`를 수정 후 반환

3) agent_type == 'static'
    - 입력: 정적 객체(표지판, 배리어 등)의 상태 (월드 좌표계)
      · [x, y, heading] + 크기 등
    - 출력: 이고 기준 상대 좌표계로 변환된 정적 객체 상태
      · 위치/방향만 ego 기준으로 바뀜

Args:
    agent_state (np.ndarray):
        - shape: (N, state_dim)
        - 변환 대상 상태 배열.
          · ego 모드: 이고 궤적
          · agent 모드: 주변 동적 객체(에이전트)
          · static 모드: 정적 객체
    ego_cur_pose_np (np.ndarray):
        - shape: (3,)
        - [x_ego, y_ego, heading_ego] (월드 좌표계 기준 이고 현재 상태)
    agent_type (str, optional):
        - 'ego'   : 이고 궤적 변환 모드
        - 'agent' : 동적 에이전트 변환 모드
        - 'static': 정적 객체 변환 모드

Returns:
    np.ndarray:
        - 변환된 상태 배열.
        - 'ego' 모드: shape (N, state_dim + 1)
            - x, y, cos, sin, vx, vy, width, length, (car, pedestrian, cyclist)
        - 'agent' 모드: shape (N, state_dim) (in-place 수정)
            - [track_id, vx, vy, heading, width, length, x, y]
        - 'static' 모드: shape (N, state_dim) (in-place 수정)
"""
# ego_pose: (3,) = [x_ego, y_ego, heading_ego]
ego_pose = _build_ego_pose_from_state(ego_cur_pose_np)

if agent_type == 'ego':
    # (time_num, state_dim_ego+1=11)
    agent_state = _convert_ego_history_to_relative(agent_state, ego_pose)

elif agent_type == 'agent':
    # (N, state_dim_agent)
    agent_state = _convert_agent_states_to_relative(agent_state, ego_pose)

elif agent_type == 'static':
    # (N, state_dim_static)
    agent_state = _convert_static_states_to_relative(agent_state, ego_pose)

return agent_state

=====================

2. Map coordination transformation

=====================

def coordinates_to_local_frame(coords, anchor_state, precision=None):
"""
Transform a set of [x, y] coordinates without heading to the the given frame.
:param coords: <np.array: num_coords, 2> Coordinates to be transformed, in the form [x, y].
:param anchor_state: The coordinate frame to transform to, in the form [x, y, heading].
:param precision: The precision with which to allocate the intermediate array. If None, then it will be inferred from the input precisions.
:return: <np.array: num_coords, 2> Transformed coordinates.
"""
if len(coords.shape) != 2 or coords.shape[1] != 2:
raise ValueError(f"Unexpected coords shape: {coords.shape}")

if precision is None:
    if coords.dtype != anchor_state.dtype:
        raise ValueError(
            "Mixed datatypes provided to coordinates_to_local_frame without precision specifier."
        )
    precision = coords.dtype

# torch.nn.functional.pad will crash with 0-length inputs.
# In that case, there are no coordinates to transform.
if coords.shape[0] == 0:
    return coords

# Extract transform
transform = _state_se2_array_to_transform_matrix(anchor_state)
transform = np.linalg.inv(transform)

# Transform the incoming coordinates to homogeneous coordinates
#  So translation can be done with a simple matrix multiply.
#
# [x1, y1]  => [x1, y1, 1]
# [x2, y2]     [x2, y2, 1]
# ...          ...
# [xn, yn]     [xn, yn, 1]
coords = np.pad(coords,
                pad_width=((0, 0), (0, 1)),
                mode='constant',
                constant_values=1.0)

# Perform the transformation, transposing so the shapes match
coords = np.matmul(transform, coords.T)

# Transform back from homogeneous coordinates to standard coordinates.
#   Get rid of the scaling dimension and transpose so output shape matches input shape.
result = coords.T
result = result[:, :2]

return result

def vector_set_coordinates_to_local_frame(
coords,
avails,
anchor_state,
output_precision=np.float32,
):
"""
Transform the vector set map element coordinates from global frame to ego vehicle frame, as specified by
anchor_state.
:param coords: Coordinates to transform. <np.array: num_elements, num_points, 2>.
:param avails: Availabilities mask identifying real vs zero-padded data in coords.
<np.array: num_elements, num_points>.
:param anchor_state: The coordinate frame to transform to, in the form [x, y, heading].
:param output_precision: The precision with which to allocate output array.
:return: Transformed coordinates.
:raise ValueError: If coordinates dimensions are not valid or don't match availabilities.
"""

# Flatten coords from (num_map_elements, num_points_per_element, 2) to
#   (num_map_elements * num_points_per_element, 2) for easier processing.
num_map_elements, num_points_per_element, _ = coords.shape
coords = coords.reshape(num_map_elements * num_points_per_element, 2)

# Apply transformation using adequate precision
coords = coordinates_to_local_frame(coords,
                                    anchor_state,
                                    precision=np.float64)

# Reshape to original dimensionality
coords = coords.reshape(num_map_elements, num_points_per_element, 2)

# Output with specified precision
coords = coords.astype(output_precision)

# ignore zero-padded data
coords[~avails] = 0.0

return coords

=====================

3. Numpy-Tensor transformation

=====================

from typing import Any, Dict, Mapping, Union
import numpy as np
from typing import Any, Dict, Mapping, Optional, Sequence
import torch

def convert_data_dict_to_device_tensors(
data: Mapping[str, Any],
device: Union[torch.device, str],
squeeze: bool,
) -> Dict[str, torch.Tensor]:
"""
파이썬/NumPy 기반 딕셔너리를 모델 입력용 torch.Tensor 딕셔너리로 변환한다.

기능 요약
--------
- 값이 이미 torch.Tensor 인 경우: 다시 만들지 않고 .to(device, dtype) 만 호출
- NumPy 배열: from_numpy 로 래핑 후 float32 또는 bool 로 캐스팅
- 파이썬 스칼라/리스트: torch.as_tensor(...) 로 생성
- squeeze=False 인 경우: 앞쪽에 배치 차원 1개를 추가(unsqueeze(0))
- 키가 "agent_route_lane_order" 인 텐서는 항상 torch.int64 로 맞춤

Args:
    data:
        키-값 딕셔너리.
        값은 torch.Tensor / np.ndarray / 파이썬 수치/리스트 등을 허용한다.
    device:
        텐서를 올릴 디바이스. (예: "cuda:0", torch.device("cpu"))
    squeeze:
        False 이면, 모든 텐서 앞에 배치 차원 1을 추가한다.

Returns:
    Dict[str, torch.Tensor]:
        각 키에 대응하는 torch.Tensor 로 구성된 딕셔너리.
"""
out: Dict[str, torch.Tensor] = {}

for k, v in data.items():
    # 1) 이미 Tensor인 경우: 재생성 하지 말고 .to(...) 만
    if isinstance(v, torch.Tensor):
        target_dtype = torch.bool if v.dtype == torch.bool else torch.float32
        t = v.to(device=device, dtype=target_dtype, non_blocking=True)

    # 2) Numpy 배열인 경우: 복사 최소화를 위해 from_numpy/as_tensor 사용
    elif isinstance(v, np.ndarray):
        if v.dtype == np.bool_:
            # bool은 dtype 보존 -> 이후 device로만 이동
            t = torch.from_numpy(v).to(device=device, non_blocking=True)
            if t.dtype != torch.bool:
                t = t.to(dtype=torch.bool)
        else:
            # 수치형은 float32로
            t = torch.from_numpy(v).to(
                device=device,
                dtype=torch.float32,
                non_blocking=True,
            )

    # 3) 파이썬 bool 스칼라
    elif isinstance(v, (bool, np.bool_)):
        t = torch.tensor(v, dtype=torch.bool, device=device)
    # 4) none type 처리
    elif v is None:
        # ego_agent_next_11_dim # planner_future_11_dim
        continue
    # 5) 나머지(리스트/스칼라 등): as_tensor로 한 번에
    else:
        t = torch.as_tensor(v, dtype=torch.float32, device=device)

    if not squeeze:
        t = t.unsqueeze(0)

    if k == "agent_route_lane_order":
        t = t.to(torch.int64)

    out[k] = t

return out

================
<diffusion_planner/data_process/ego_process.py>
import numpy as np
import numpy.typing as npt
from typing import List, Tuple, Generator

from nuplan.common.actor_state.state_representation import TimePoint
from nuplan.common.actor_state.ego_state import EgoState
from nuplan.planning.training.preprocessing.utils.agents_preprocessing import EgoInternalIndex
from nuplan.planning.training.preprocessing.features.trajectory_utils import convert_absolute_to_relative_poses
from nuplan.common.actor_state.vehicle_parameters import get_pacifica_parameters
from nuplan.planning.scenario_builder.nuplan_db.nuplan_scenario import NuPlanScenario

from diffusion_planner.data_process.utils import convert_absolute_quantities_to_relative
from nuplan.common.geometry.convert import numpy_array_to_absolute_velocity
from typing import List, Tuple
import numpy as np
import numpy.typing as npt
from nuplan.common.actor_state.ego_state import EgoState
from nuplan.planning.training.preprocessing.utils.agents_preprocessing import EgoInternalIndex
from nuplan.planning.scenario_builder.nuplan_db.nuplan_scenario import NuPlanScenario

from diffusion_planner.data_process.utils import convert_absolute_quantities_to_relative
from nuplan.common.geometry.convert import numpy_array_to_absolute_velocity

def get_ego_past_array_from_scenario(
scenario: NuPlanScenario, num_past_poses: int,
past_time_horizon: float) -> Tuple[np.ndarray, np.ndarray]:

current_ego_state: EgoState = scenario.initial_ego_state

past_ego_states: Generator[EgoState, None,
                           None] = scenario.get_ego_past_trajectory(
                               iteration=0,
                               num_samples=num_past_poses,
                               time_horizon=past_time_horizon)
# list(past_ego_states): List[EgoState]
sampled_past_ego_states: List[EgoState] = list(past_ego_states) + [
    current_ego_state
]
# past_cur_ego_array: np (21, 10)
#  x, y, theta, vx, vy, width, length, (car, pedestrian, cyclist)
past_cur_ego_array = sampled_past_ego_states_to_array(
    sampled_past_ego_states)

past_time_stamps: List[TimePoint] = list(
    scenario.get_past_timestamps(
        iteration=0,
        num_samples=num_past_poses,
        time_horizon=past_time_horizon)) + [scenario.start_time]

def sampled_past_timestamps_to_array(
        past_time_stamps: List[TimePoint]) -> npt.NDArray[np.float32]:
    flat: List[int] = [t.time_us for t in past_time_stamps]
    return np.array(flat, dtype=np.int64)  # shape: (21)

# past_time_stamps_array: np (21,)
past_time_stamps_array = sampled_past_timestamps_to_array(past_time_stamps)

return past_cur_ego_array, past_time_stamps_array

def sampled_past_ego_states_to_array(
past_ego_states: List[EgoState]) -> npt.NDArray[np.float32]: # (21, 10)

# 원래 있던 함수임
past_cur_num = len(past_ego_states)
past_cur_ego_array = np.zeros((past_cur_num, 10), dtype=np.float64)
for time_i in range(0, past_cur_num, 1):
    past_cur_ego_array[
        time_i, EgoInternalIndex.x()] = past_ego_states[time_i].center.x
    past_cur_ego_array[
        time_i, EgoInternalIndex.y()] = past_ego_states[time_i].center.y
    heading_ = past_ego_states[time_i].center.heading
    past_cur_ego_array[time_i, EgoInternalIndex.heading()] = heading_
    # --- 자차좌표계 → 세계좌표계 속도 변환: 회전만 적용 ---
    v_local = past_ego_states[
        time_i].dynamic_car_state.center_velocity_2d  # body-frame velocity
    he = float(heading_)
    c, s = np.cos(he), np.sin(he)
    vx_w = c * float(v_local.x) - s * float(v_local.y)
    vy_w = s * float(v_local.x) + c * float(v_local.y)
    past_cur_ego_array[time_i, EgoInternalIndex.vx()] = vx_w
    past_cur_ego_array[time_i, EgoInternalIndex.vy()] = vy_w
    # past_cur_ego_array[time_i, EgoInternalIndex.ax(
    # )] = past_ego_states[time_i].dynamic_car_state.rear_axle_acceleration_2d.x
    # past_cur_ego_array[time_i, EgoInternalIndex.ay(
    # )] = past_ego_states[time_i].dynamic_car_state.rear_axle_acceleration_2d.y

    past_cur_ego_array[
        time_i,
        EgoInternalIndex.ax()] = past_ego_states[time_i].car_footprint.width
    past_cur_ego_array[time_i, EgoInternalIndex.ay(
    )] = past_ego_states[time_i].car_footprint.length
    past_cur_ego_array[time_i, 7:10] = [
        1, 0, 0
    ]  # one-hot encoding for agent type (car, pedestrian, cyclist)

return past_cur_ego_array

def sampled_future_ego_states_to_array(
future_ego_states: List[EgoState]) -> npt.NDArray[np.float64]:
"""미래 ego 상태 리스트를 “월드 좌표계 기준 10차원 배열”로 바꾼다.

이 함수는 시나리오에서 가져온 여러 개의 미래 ego 상태(EgoState)를
한 줄짜리 숫자 배열로 정리해준다. 나중에 다른 함수에서
ego 기준 좌표계로 바꾸기 전에, “월드 기준 원본 값”을 담는 역할이다.

각 시점마다 다음과 같은 값들을 담는다.

- 위치 : x, y                   (월드 좌표)
- 방향 : heading                (라디안, 월드 기준)
- 속도 : vx, vy                 (월드 좌표 기준 속도, ego 바디속도를 회전해서 구함)
- 차체 크기 : width, length
- 타입 one-hot : [1, 0, 0]      (항상 차량이라고 가정: car=1, ped=0, bike=0)

Args:
    future_ego_states (List[EgoState]):
        - 길이: future_len
        - 각 원소는 한 시점의 ego 상태(EgoState).

Returns:
    np.ndarray:
        - fut_ego_world_10
        - shape: (future_len, 10)
        - 열 순서:
            [x, y, heading, vx, vy, width, length, car, pedestrian, cyclist]
        - dtype: float64
"""
future_len: int = len(future_ego_states)
# fut_ego_world_10: (future_len, 10)
fut_ego_world_10 = np.zeros((future_len, 10), dtype=np.float64)

for time_i in range(future_len):
    # 위치 (x, y)
    fut_ego_world_10[
        time_i, EgoInternalIndex.x()] = future_ego_states[time_i].center.x
    fut_ego_world_10[
        time_i, EgoInternalIndex.y()] = future_ego_states[time_i].center.y

    # 방향 heading (월드 좌표 기준)
    fut_ego_world_10[time_i, EgoInternalIndex.heading(
    )] = future_ego_states[time_i].center.heading

    # --- 자차좌표계 → 세계좌표계 속도 변환: 회전만 적용 ---
    # v_local: Ego body-frame 속도 (vx_body, vy_body)
    v_local = future_ego_states[
        time_i].dynamic_car_state.center_velocity_2d  # body-frame velocity
    he = float(future_ego_states[time_i].center.heading)
    c, s = np.cos(he), np.sin(he)
    vx_w = c * float(v_local.x) - s * float(v_local.y)
    vy_w = s * float(v_local.x) + c * float(v_local.y)
    fut_ego_world_10[time_i, EgoInternalIndex.vx()] = vx_w
    fut_ego_world_10[time_i, EgoInternalIndex.vy()] = vy_w

    # 차체 크기 (width, length)
    fut_ego_world_10[time_i, EgoInternalIndex.ax(
    )] = future_ego_states[time_i].car_footprint.width
    fut_ego_world_10[time_i, EgoInternalIndex.ay(
    )] = future_ego_states[time_i].car_footprint.length

    # 타입 one-hot (car, pedestrian, cyclist) = (1, 0, 0)
    fut_ego_world_10[time_i, 7:10] = [1, 0, 0]

return fut_ego_world_10

def get_ego_future_array_from_scenario(
scenario: NuPlanScenario,
current_ego_state: EgoState,
num_future_poses: int,
future_time_horizon: float,
) -> Tuple[npt.NDArray[np.float32], npt.NDArray[np.float32]]:
"""시나리오에서 ego의 미래 궤적을 가져와,
ego 기준 좌표계로 변환한 결과를 (T,3) / (T,11) 두 가지 형태로 돌려준다.

전체 흐름
----------
1) nuPlan 시나리오에서, 현재 ego 상태 기준
   `num_future_poses`, `future_time_horizon` 조건에 맞게
   미래 ego 상태들을 가져온다.
   - future_ego_states: List[EgoState], 길이 T

2) `sampled_future_ego_states_to_array` 로
   각 시점을 10차원 월드 좌표 배열로 바꾼다.
   - fut_ego_world_10: shape (T, 10)
     · [x, y, heading, vx, vy, width, length, one-hot(3)]

3) 현재 ego 포즈(current_ego_state.rear_axle)를
   [x_ego, y_ego, yaw_ego] 형태의 벡터로 만든다.
   - ego_cur_pose_np: shape (3,)

4) `convert_absolute_quantities_to_relative(..., 'ego')` 를 호출해
   월드 좌표 기반의 10차원 배열을 ego 기준 좌표계로 바꾸면서
   heading 을 cos, sin 두 값으로 풀어 1차원을 늘린다.
   - fut_ego_local_11: shape (T, 11), dtype float32
     · [x, y, cos(yaw), sin(yaw), vx, vy, width, length, one-hot(3)]

5) x, y 값으로부터 heading 을 다시 뽑아 (단순 arctan2 사용)
   (T, 3) = [x, y, heading] 형태의 간단한 궤적도 만들어서 함께 반환한다.
   - fut_ego_local_xyh: shape (T, 3)

Args:
    scenario (NuPlanScenario):
        nuPlan 시나리오 객체.
    current_ego_state (EgoState):
        현재 ego 상태. (보통 initial_ego_state 또는 시뮬레이터의 현재 상태)
    num_future_poses (int):
        몇 개의 미래 시점을 샘플링할지 (T 값).
    future_time_horizon (float):
        현재부터 몇 초 뒤까지를 커버할지 [초].

Returns:
    Tuple[np.ndarray, np.ndarray]:
        - fut_ego_local_xyh:
            · shape: (T, 3)
            · 각 행: [x_ego, y_ego, heading_ego] (ego 기준 좌표계)
            · dtype: float32
        - fut_ego_local_11:
            · shape: (T, 11)
            · 각 행:
                [x, y, cos(yaw), sin(yaw), vx, vy,
                 width, length, onehot_car, onehot_ped, onehot_bike]
            · dtype: float32
"""
# future_ego_states: List[EgoState], 길이 T
future_ego_states = scenario.get_ego_future_trajectory(
    iteration=0,
    num_samples=num_future_poses,
    time_horizon=future_time_horizon)

# fut_ego_world_10: (T, 10)
fut_ego_world_10 = sampled_future_ego_states_to_array(
    list(future_ego_states))
# ego_cur_pose_np: (3,) = [x_ego, y_ego, yaw_ego] (월드 좌표계)
ego_cur_pose_np = np.array(
    [
        current_ego_state.rear_axle.x,
        current_ego_state.rear_axle.y,
        current_ego_state.rear_axle.heading,
    ],
    dtype=np.float64,
)

# fut_ego_local_11: (T, 11)  ← 'ego' 모드로 상대 좌표 변환 후 float32
fut_ego_local_11 = convert_absolute_quantities_to_relative(
    fut_ego_world_10, ego_cur_pose_np, 'ego').astype(np.float32)
# fut_ego_local_xy: (T, 2)  ← x,y 만 분리
fut_ego_local_xy = fut_ego_local_11[:, :2]
fut_ego_local_cos_yaw = fut_ego_local_11[:, 2]
fut_ego_local_sin_yaw = fut_ego_local_11[:, 3]

# fut_ego_local_heading: (T,)  ← x,y 에서 heading 추출 (현재 구현 그대로 유지)
fut_ego_local_heading = np.arctan2(fut_ego_local_sin_yaw,
                                   fut_ego_local_cos_yaw)

# fut_ego_local_xyh: (T, 3) = [x, y, heading]
fut_ego_local_xyh = np.concatenate(
    [fut_ego_local_xy, fut_ego_local_heading[:, None]], axis=-1)
return fut_ego_local_xyh, fut_ego_local_11

def calculate_additional_ego_states(ego_agent_past, time_stamp):

# ego_agent_past: (N, 7) where N is the number of past states.
# 7: x, y, heading, vx, vy, width, length
# transform haeding to cos h, sin h and calculate the steering_angle and yaw_rate for current state

current_state = ego_agent_past[-1]
prev_state = ego_agent_past[-2]

dt = (time_stamp[-1] - time_stamp[-2]) * 1e-6

cur_velocity = current_state[3]
angle_diff = current_state[2] - prev_state[2]
angle_diff = (angle_diff + np.pi) % (2 * np.pi) - np.pi
yaw_rate = angle_diff / dt

if abs(cur_velocity) < 0.2:
    steering_angle = 0.0
    yaw_rate = 0.0  # if the car is almost stopped, the yaw rate is unreliable
else:
    steering_angle = np.arctan(
        yaw_rate * get_pacifica_parameters().wheel_base / abs(cur_velocity))
    steering_angle = np.clip(steering_angle, -2 / 3 * np.pi, 2 / 3 * np.pi)
    yaw_rate = np.clip(yaw_rate, -0.95, 0.95)
# ego_agent_past: (T, 7)
# past: (T, 8) # +3 for one-hot encoding of the agent type (car, pedestrian, cyclist) and ego is always car.

current = np.zeros((ego_agent_past.shape[1] + 3), dtype=np.float32)
current[:2] = current_state[:2]
current[2] = np.cos(current_state[2])
current[3] = np.sin(current_state[2])
current[4:8] = current_state[3:7]
current[8] = steering_angle
current[9] = yaw_rate

return current
profile
ad_official

0개의 댓글