이번 글에서는 실시간 카메라에서 byte 데이터를 받아 사용자가 지정한 프레임수와 초수만큼 영상을 저장하는 Python 프로그램에 대해 설명할 것이다. 이 프로그램은 비동기 처리를 통해 효율적으로 영상을 저장하고 관리할 수 있다. 아래에 프로그램의 주요 기능과 코드를 분석해보자.
1. 비동기 전처리 및 후처리:
영상 저장 작업 전에 전처리를 수행하고, 작업 후 후처리를 수행할 수 있다.
2. 실시간 영상 저장:
카메라에서 실시간으로 받은 byte 데이터를 영상으로 변환하여 지정된 프레임수와 초수만큼 저장할 수 있다.
3. 이전 파일 삭제:
이전에 저장된 파일 중 끝 시간이 없는 파일을 삭제할 수 있다.
4. 파일명 변경:
저장된 영상 파일의 시작 시간과 끝 시간을 포함한 파일명으로 변경할 수 있다.
1. 비동기 전처리 및 후처리
preprocessing 메서드는 전처리를, postprocessing 메서드는 후처리를 수행한다. 각각 1초의 지연을 두어 로그를 남긴다.
async def preprocessing(self, *args, **kwargs):
self._logger.info("preprocessing...")
await asyncio.sleep(1)
self._logger.info("preprocessing is finished")
async def postprocessing(self, *args, **kwargs):
self._logger.info("postprocessing...")
await asyncio.sleep(1)
self._logger.info("postprocessing is finished")
2. 실시간 영상 저장
execute 메서드는 실시간으로 카메라 데이터를 받아 영상을 저장하는 메인 기능을 수행한다. input_data를 받아 각 데이터에 대해 처리하고, 영상을 저장한다.
async def execute(self, input_data: Dict[str, List[Any]]):
self._logger.info(f'Video.execute() starts ...')
if input_data is None:
self._logger.info("input_data none")
return
json_attribute: JsonAttribute = self.command_attribute.command.attribute
json_node = json_attribute.json_node
video_sec = json_node["sec"]
video_fps = json_node["fps"]
self.video_output_path = json_node["outputPath"]
timestamp = None
snapshot_array = None
for id, stream_data in input_data.items():
for snapshot_data in stream_data:
timestamp = snapshot_data[0]
snapshot_array = snapshot_data[1]
self._logger.info(f'id: {id}, timestamp: {timestamp}, snapshot_data:')
img = cv2.imdecode(np.frombuffer(snapshot_array, np.uint8), cv2.IMREAD_COLOR)
size = (img.shape[1], img.shape[0])
if self.size is not None and size != self.size:
img = cv2.resize(img, self.size, interpolation=cv2.INTER_AREA)
if self.out is None:
self.start_timestamp = int(timestamp)
dt = datetime.datetime.fromtimestamp(self.start_timestamp / 1e9)
format_folder = dt.strftime("%Y-%m-%d")
formatted_time = dt.strftime("%H-%M-%S-%f")[:-3]
self.folder_name = f"{self.video_output_path}/{format_folder}"
self.filename = f"{formatted_time}.mp4"
self.output_path = os.path.join(self.folder_name, self.filename)
self.size = (img.shape[1], img.shape[0])
if not os.path.exists(self.folder_name):
os.makedirs(self.folder_name)
self._logger.info(f'path : {self.output_path}')
self.out = cv2.VideoWriter(self.output_path, cv2.VideoWriter_fourcc(*'mp4v'), video_fps, self.size)
self.delete_previous_file_without_end_time()
self.end_timestamp = int(timestamp)
try:
self.count += 1
self._logger.debug(self.count)
self.out.write(img)
if self.count >= (video_fps * video_sec):
self.release()
except Exception as e:
self._logger.error(f"Exception: {e}")
self.release()
3. 이전 파일 삭제
delete_previous_file_without_end_time 메서드는 저장된 파일 중 끝 시간이 없는 파일을 찾아 삭제한다.
def delete_previous_file_without_end_time(self):
previous_file = None
previous_file_path = None
for root, _, files in os.walk(self.video_output_path):
for file in sorted(files):
if file.endswith('.mp4') and file < self.filename:
previous_file = file
previous_file_path = os.path.join(root, previous_file)
if file == self.filename:
break
if previous_file:
if len(previous_file.split('_')) == 1:
try:
os.remove(previous_file_path)
self._logger.info(f'Removed previous file without end time: {previous_file_path}')
except Exception as e:
self._logger.error(f'Error removing previous file: {previous_file_path}, Exception: {e}')
4. 파일명 변경
release 메서드는 영상 저장이 완료되면 파일명을 시작 시간과 끝 시간을 포함하도록 변경한다.
def release(self):
if self.out is not None:
self.out.release()
if self.start_timestamp is not None and self.end_timestamp is not None:
start_dt = datetime.datetime.fromtimestamp(self.start_timestamp / 1e9)
end_dt = datetime.datetime.fromtimestamp(self.end_timestamp / 1e9)
start_time = start_dt.strftime("%H-%M-%S-%f")[:-3]
end_time = end_dt.strftime("%H-%M-%S-%f")[:-3]
new_filename = f"{start_time}_{end_time}.mp4"
new_output_path = os.path.join(self.folder_name, new_filename)
os.rename(self.output_path, new_output_path)
self._logger.info(f'Renamed file to: {new_output_path}')
self.out = None
self.size = None
self.count = 0
self.start_timestamp = None
self.end_timestamp = None
self._logger.info(f'Video.execute() is finished')
이 프로그램은 실시간으로 카메라 데이터를 받아 영상을 저장하고, 사용자가 지정한 조건에 맞게 파일을 관리할 수 있게 해준다. 비동기 처리를 통해 효율성을 높이고, 파일 관리 기능을 통해 저장된 영상을 체계적으로 관리할 수 있다.