실시간 카메라 영상 저장 프로그램 - Python

초이·2024년 5월 10일
0

Python

목록 보기
1/1

실시간 카메라 영상 저장 프로그램 - Python

이번 글에서는 실시간 카메라에서 byte 데이터를 받아 사용자가 지정한 프레임수와 초수만큼 영상을 저장하는 Python 프로그램에 대해 설명할 것이다. 이 프로그램은 비동기 처리를 통해 효율적으로 영상을 저장하고 관리할 수 있다. 아래에 프로그램의 주요 기능과 코드를 분석해보자.


주요 기능

1. 비동기 전처리 및 후처리:
영상 저장 작업 전에 전처리를 수행하고, 작업 후 후처리를 수행할 수 있다.
2. 실시간 영상 저장:
카메라에서 실시간으로 받은 byte 데이터를 영상으로 변환하여 지정된 프레임수와 초수만큼 저장할 수 있다.
3. 이전 파일 삭제:
이전에 저장된 파일 중 끝 시간이 없는 파일을 삭제할 수 있다.
4. 파일명 변경:
저장된 영상 파일의 시작 시간과 끝 시간을 포함한 파일명으로 변경할 수 있다.


코드 분석

1. 비동기 전처리 및 후처리

preprocessing 메서드는 전처리를, postprocessing 메서드는 후처리를 수행한다. 각각 1초의 지연을 두어 로그를 남긴다.

async def preprocessing(self, *args, **kwargs):
    self._logger.info("preprocessing...")
    await asyncio.sleep(1)
    self._logger.info("preprocessing is finished")

async def postprocessing(self, *args, **kwargs):
    self._logger.info("postprocessing...")
    await asyncio.sleep(1)
    self._logger.info("postprocessing is finished")

2. 실시간 영상 저장

execute 메서드는 실시간으로 카메라 데이터를 받아 영상을 저장하는 메인 기능을 수행한다. input_data를 받아 각 데이터에 대해 처리하고, 영상을 저장한다.

async def execute(self, input_data: Dict[str, List[Any]]):
    self._logger.info(f'Video.execute() starts ...')
    if input_data is None:
        self._logger.info("input_data none")
        return
    json_attribute: JsonAttribute = self.command_attribute.command.attribute
    json_node = json_attribute.json_node

    video_sec = json_node["sec"]
    video_fps = json_node["fps"]
    self.video_output_path = json_node["outputPath"]
    timestamp = None
    snapshot_array = None
    for id, stream_data in input_data.items():
        for snapshot_data in stream_data:
            timestamp = snapshot_data[0]
            snapshot_array = snapshot_data[1]
            self._logger.info(f'id: {id}, timestamp: {timestamp}, snapshot_data:')

    img = cv2.imdecode(np.frombuffer(snapshot_array, np.uint8), cv2.IMREAD_COLOR)
    size = (img.shape[1], img.shape[0])

    if self.size is not None and size != self.size:
        img = cv2.resize(img, self.size, interpolation=cv2.INTER_AREA)

    if self.out is None:
        self.start_timestamp = int(timestamp)
        dt = datetime.datetime.fromtimestamp(self.start_timestamp / 1e9)
        format_folder = dt.strftime("%Y-%m-%d")
        formatted_time = dt.strftime("%H-%M-%S-%f")[:-3]
        self.folder_name = f"{self.video_output_path}/{format_folder}"
        self.filename = f"{formatted_time}.mp4"
        self.output_path = os.path.join(self.folder_name, self.filename)
        self.size = (img.shape[1], img.shape[0])
        if not os.path.exists(self.folder_name):
            os.makedirs(self.folder_name)
        self._logger.info(f'path : {self.output_path}')
        self.out = cv2.VideoWriter(self.output_path, cv2.VideoWriter_fourcc(*'mp4v'), video_fps, self.size)
        self.delete_previous_file_without_end_time()

    self.end_timestamp = int(timestamp)

    try:
        self.count += 1
        self._logger.debug(self.count)
        self.out.write(img)
        if self.count >= (video_fps * video_sec):
            self.release()
    except Exception as e:
        self._logger.error(f"Exception: {e}")
        self.release()

3. 이전 파일 삭제

delete_previous_file_without_end_time 메서드는 저장된 파일 중 끝 시간이 없는 파일을 찾아 삭제한다.

def delete_previous_file_without_end_time(self):
    previous_file = None
    previous_file_path = None
    for root, _, files in os.walk(self.video_output_path):
        for file in sorted(files):
            if file.endswith('.mp4') and file < self.filename:
                previous_file = file
                previous_file_path = os.path.join(root, previous_file)
            if file == self.filename:
                break

    if previous_file:
        if len(previous_file.split('_')) == 1:
            try:
                os.remove(previous_file_path)
                self._logger.info(f'Removed previous file without end time: {previous_file_path}')
            except Exception as e:
                self._logger.error(f'Error removing previous file: {previous_file_path}, Exception: {e}')

4. 파일명 변경

release 메서드는 영상 저장이 완료되면 파일명을 시작 시간과 끝 시간을 포함하도록 변경한다.

def release(self):
    if self.out is not None:
        self.out.release()
        if self.start_timestamp is not None and self.end_timestamp is not None:
            start_dt = datetime.datetime.fromtimestamp(self.start_timestamp / 1e9)
            end_dt = datetime.datetime.fromtimestamp(self.end_timestamp / 1e9)
            start_time = start_dt.strftime("%H-%M-%S-%f")[:-3]
            end_time = end_dt.strftime("%H-%M-%S-%f")[:-3]
            new_filename = f"{start_time}_{end_time}.mp4"
            new_output_path = os.path.join(self.folder_name, new_filename)
            os.rename(self.output_path, new_output_path)
            self._logger.info(f'Renamed file to: {new_output_path}')
            
    self.out = None
    self.size = None
    self.count = 0
    self.start_timestamp = None
    self.end_timestamp = None
    self._logger.info(f'Video.execute() is finished')

이 프로그램은 실시간으로 카메라 데이터를 받아 영상을 저장하고, 사용자가 지정한 조건에 맞게 파일을 관리할 수 있게 해준다. 비동기 처리를 통해 효율성을 높이고, 파일 관리 기능을 통해 저장된 영상을 체계적으로 관리할 수 있다.

profile
MacBook이 갖고싶은 살암

0개의 댓글