[Python] Whisper JSON을 MongoDB에 저장

seunghyun·2023년 12월 15일
1

Yougle

목록 보기
7/15
post-thumbnail

✔ 요구사항리

이번 요구사항은 다음과 같습니다.

Whisper결과로 나온 JSON을 MongoDB에 저장하기

이를 더 구체적으로 정리해보자면 아래와 같습니다.

  1. 사용자가 웹페이지에서 특정 채널ID를 입력한다.
    • SQLite DB 에서 채널ID를 조회한다. 해당 채널의 모든 영상의 제목을 웹페이지에 출력한다.
    • 사용자가 영상 제목클릭하면, 영상 다운로드 & 음성인식 & JSON 저장하는데 아래의 규칙을 따른다.
  2. 사용자가 영상 제목을 클릭하면 whisper-cid-vid.mp4을 local에 다운로드하는데, 아래의 규칙을 따른다.
    • (다운로드할 때, 제목을 하나하나 직접 다 눌러서 다운로드한다)
    • 이미 local에 저장되어 있는 상태라면, 다운로드를 skip한다.
    • local에 저장되어 있지 않다면, 다운로드한다.
      • 이 때 채널ID별로 디렉토리를 만들어서 분류하도록 한다. (파일시스템)
  3. 다운로드한 whisper-cid-vid.mp4에서 whisper-cid-vid.mp3을 추출하여 채널ID 디렉토리에 저장한다.
  4. Whisper로 whisper-cid-vid.mp3를 음성인식한 whisper-cid-vid.JSON를 얻는데, 아래의 규칙을 따른다.
    • 이미 MongoDB에 JSON이 저장되어 있다면, 음성인식을 skip한다.
    • MongoDB에 저장되어 있지 않다면, JSON을 얻는다.
  5. 4단계 JSON을 MongoDB에 저장한다.

✔ 구현

mongodb에 자막 정보(transcription)가 없다면 upsert 하도록 했다. 아래 사진과 같이 upsert가 되는 것을 확인할 수 있다.


insert 되는 json 정보를 확인해보고 싶어서 아래와 같이 확인해보았다.

def transcribe_audio(mp3_path):
    model = whisper.load_model('base')
    result = model.transcribe(mp3_path)
    # 각 세그먼트의 텍스트를 줄바꿈하여 출력
    print("dic result", result)
    print(result['text'])  # 텍스트 출력해서 확인 가능!
    print(result['segments'])  # 타임스탬프와 함께 세그먼트 정보 출력해서 확인 가능!
    return result

json 정보를 ide에서 확인해보면 아래와 같다!

pytube를 임포트했다.

✔ 고민한 점

파일 디렉토리 분류; Windows 중복 문제

처음엔 이렇게 했다. 그때만 해도 몰랐지 이게 발목을 잡을 줄...

@app.route('/download/<channel_id>/<video_id>')
def download_video(channel_id, video_id):
    youtube = YouTube(f'https://www.youtube.com/watch?v={video_id}')
    video = youtube.streams.filter(res='360p', progressive=True, file_extension='mp4').order_by('resolution').desc().first()
    if not video:
        return "Video not found", 404
    # 현재 스크립트 파일이 위치한 디렉토리의 절대 경로
    current_dir = os.path.abspath(os.path.dirname(__file__))
    # 프로젝트 루트 디렉토리를 찾기 (현재 디렉토리의 상위 디렉토리)
    # project_root = os.path.dirname(current_dir)
    # Yougle 프로젝트 루트 디렉토리 설정
    project_root = 'C:\\Users\\redna\\PycharmProjects\\Yougle'
    # videos 디렉토리 및 채널 ID 디렉토리 경로 설정
    videos_path = os.path.join(project_root, 'videos', channel_id)
    if not os.path.exists(videos_path):
        os.makedirs(videos_path)  # 디렉토리가 없으면 생성
    # 파일명 형식: whisper-channelid-videoid
    filename = f"whisper-{channel_id}-{video_id}.mp4"
    download_path = video.download(output_path=videos_path, filename=filename)
    return send_file(download_path, as_attachment=True)

아래 코드처럼 작성해도, 중복 문제를 피할 수 없었다. 윈도우즈 운영체제라서 그런건가?

@app.route('/download/<channel_id>/<video_id>')
def download_video(channel_id, video_id):
    youtube = YouTube(f'https://www.youtube.com/watch?v={video_id}')
    video = youtube.streams.filter(res='360p', progressive=True, file_extension='mp4').order_by('resolution').desc().first()
    if not video:
        return "Video not found", 404

    # project_root = 'C:\\Users\\redna\\PycharmProjects\\Yougle'
    current_directory = os.getcwd()
    project_root = current_directory
    videos_path = os.path.join(project_root, 'videos', channel_id)
    if not os.path.exists(videos_path):
        os.makedirs(videos_path)
    filename = f"whisper-{channel_id}-{video_id}.mp4"
    file_path = os.path.join(videos_path, filename)

    downloads_path = 'C:\\Users\\redna\\Downloads'  # Downloads 경로 추가
    download_file_path = os.path.join(downloads_path, filename)  # Downloads 경로에 대한 파일 경로
    print(download_file_path)

    # 두 경로 모두에 파일이 존재하지 않는 경우에만 다운로드
    if not os.path.exists(file_path) and not os.path.exists(download_file_path):
        # 파일이 없으면 다운로드
        video.download(output_path=videos_path, filename=filename)
        print(f"Downloaded to: {file_path}")
    else:
        if os.path.exists(file_path):
            print(f"File already exists in videos path: {file_path}")
        if os.path.exists(download_file_path):
            # TODO: delete
            # unlink(delete_file)
            print(f"File already exists in downloads path: {download_file_path}")

    return send_file(file_path, as_attachment=True)

다운>위치변경>삭제라는 방법도 써봤다... 그리고 다운로드에서 삭제.

@app.route('/download/<channel_id>/<video_id>')
def download_video(channel_id, video_id):
    youtube = YouTube(f'https://www.youtube.com/watch?v={video_id}')
    video = youtube.streams.filter(res='360p', progressive=True, file_extension='mp4').order_by(
        'resolution').desc().first()
    if not video:
        return "Video not found", 404

    downloads_path = 'C:\\Users\\redna\\Downloads'
    filename = f"whisper-{channel_id}-{video_id}.mp4"
    download_file_path = os.path.join(downloads_path, filename)

    # Downloads 폴더에 파일이 없으면 다운로드
    if not os.path.exists(download_file_path):
        video.download(output_path=downloads_path, filename=filename)
        print(f"Downloaded to: {download_file_path}")
    else:
        print(f"File already exists in downloads path: {download_file_path}")

    current_directory = os.getcwd()
    project_root = current_directory
    videos_path = os.path.join(project_root, 'videos', channel_id)

    # videos/channel_id 폴더가 없으면 생성
    if not os.path.exists(videos_path):
        os.makedirs(videos_path)

    destination_file_path = os.path.join(videos_path, filename)

    # 파일을 videos/channel_id 폴더로 이동
    shutil.move(download_file_path, destination_file_path)
    print(f"Moved to: {destination_file_path}")

    return send_file(destination_file_path, as_attachment=True)

mp4->mp3 추출

from moviepy.editor import VideoFileClip

@app.route('/download/<channel_id>/<video_id>')
def download_video(channel_id, video_id):
    youtube = YouTube(f'https://www.youtube.com/watch?v={video_id}')
    video = youtube.streams.filter(res='360p', progressive=True, file_extension='mp4').order_by('resolution').desc().first()
    if not video:
        return "Video not found", 404
    # 현재 스크립트 파일이 위치한 디렉토리의 절대 경로
    current_dir = os.path.abspath(os.path.dirname(__file__))
    # 프로젝트 루트 디렉토리를 찾기 (현재 디렉토리의 상위 디렉토리)
    # project_root = os.path.dirname(current_dir)
    # Yougle 프로젝트 루트 디렉토리 설정
    project_root = 'C:\\Users\\redna\\PycharmProjects\\Yougle'
    # videos 디렉토리 및 채널 ID 디렉토리 경로 설정
    videos_path = os.path.join(project_root, 'videos', channel_id)
    if not os.path.exists(videos_path):
        os.makedirs(videos_path)  # 디렉토리가 없으면 생성
    # 파일명 형식: whisper-channelid-videoid
    filename = f"whisper-{channel_id}-{video_id}.mp4"
    mp4_path = os.path.join(videos_path, filename)
    mp3_path = os.path.join(videos_path, f"whisper-{channel_id}-{video_id}.mp3")

    # MP4 파일 다운로드
    video.download(output_path=videos_path, filename=filename)
    # MP4를 MP3로 변환
    youtube_data.convert_mp4_to_mp3(mp4_path, mp3_path)
    # MP3 파일 전송
    return send_file(mp3_path, as_attachment=True)
    
    
# youtube_data.py
# mp4->mp3 변환
def convert_mp4_to_mp3(mp4_file_path, mp3_file_path):
    video_clip = VideoFileClip(mp4_file_path)
    audio_clip = video_clip.audio
    audio_clip.write_audiofile(mp3_file_path)
    audio_clip.close()
    video_clip.close()

Fix: 채널 ID 입력 시 제목 로딩

DB에 없는 YouTube 채널 ID를 입력했을 때, 아래 사진처럼 링크들의 제목이 즉시 표시되지 않고 새로고침 후에야 올바르게 나타나는 문제가 있었다.

youtube_data.get_channel_videos_and_save(channel_id) 함수는 YouTube API를 호출하여 데이터를 가져오고, 그 결과를 DB에 저장한다. 이 과정이 완료되기 전에 페이지가 렌더링되면, 데이터가 아직 DB에 저장되지 않은 상태일 수 있다.

로직이 너무 꼬여있던 것이었다! Fix: 채널 ID 입력 시 제목 로딩


✔ 후기

팀원과 소통을 하면서 느낀 점은, 같은 개발자끼리 소통을 하더라도 문서화 등을 통해서 '제대로 확실히' 개발 과정을 공유해야한다는 점이다. 그러기 위해서는 제대로 된 사전 지식이 뒷받침되어야 하는 것이 필수적이고! 앞으로는 이렇게 요구사항을 읽기 편하게 문서화해서 공유하고 이에 대해서 충분히 이야기를 나눈 다음 개발을 시작해야겠다 :)


🔗 Reference

0개의 댓글