Video Intelligence API를 사용한 스트리밍 영상 분석 파이프라인 구축

김민형·2022년 6월 19일
2

GCP - AI,ML

목록 보기
3/14

아키텍처

API 사용 설정

gcloud services enable dataflow.googleapis.com \
containerregistry.googleapis.com \
videointelligence.googleapis.com

사용할 변수 설정

Cloud Shell에서 작업하는 경우 Shell 을 껐다 켜거나 시간이 지날 때마다 변수를 다시 선언해줘야 한다. 이를 해결하고 싶으면 nano ./bashrc에서 아래의 변수들을 다 등록해주고 source ./bashrc로 저장

export REGION=REGION
export PROJECT=$(gcloud config get-value project)
export GCS_NOTIFICATION_TOPIC="gcs-notification-topic"
export GCS_NOTIFICATION_SUBSCRIPTION="gcs-notification-subscription"
export VIDEO_CLIPS_BUCKET=${PROJECT}_videos
export BIGQUERY_DATASET="video_analytics"
export OBJECT_DETECTION_TOPIC="object-detection-topic"
export OBJECT_DETECTION_SUBSCRIPTION="object-detection-subscription"
export ERROR_TOPIC="error-topic"
export ERROR_SUBSCRIPTION="error-subscription"
export DATAFLOW_TEMPLATE_BUCKET=${PROJECT}_dataflow_template_config

사용할 깃허브 저장소 clone

git clone https://github.com/GoogleCloudPlatform/dataflow-video-analytics.git
cd dataflow-video-analytics

Cloud Storage용 Pub/Sub생성

pub/sub 주제 생성

gcloud pubsub topics create ${GCS_NOTIFICATION_TOPIC}

주제에 대한 pub/sub 구독 생성

gcloud pubsub subscriptions create ${GCS_NOTIFICATION_SUBSCRIPTION} \
--topic=${GCS_NOTIFICATION_TOPIC}

동영상 클립을 저장할 버킷 생성

gsutil mb -c standard -l ${REGION} gs://${VIDEO_CLIPS_BUCKET}

버킷용 pub/sub알림 생성

※이 작업은 콘솔에서 되지 않는다. 오로지 Shell 작업을 통해서만 가능※

gsutil notification create -t ${GCS_NOTIFICATION_TOPIC} \
-f json gs://${VIDEO_CLIPS_BUCKET}

BigQuery 데이터 세트 및 테이블 생성

데이터 세트 생성

bq mk -d --location=US ${BIGQUERY_DATASET}

테이블 생성

bq mk -t \
--schema src/main/resources/table_schema.json \
--description "object_tracking_data" \
${PROJECT}:${BIGQUERY_DATASET}.object_tracking_analysis

객체 식별에 관한 pub/sub주제 및 구독 생성

주제 생성

gcloud pubsub topics create ${OBJECT_DETECTION_TOPIC}

구독 생성

gcloud pubsub subscriptions create ${OBJECT_DETECTION_SUBSCRIPTION} \
--topic=${OBJECT_DETECTION_TOPIC}

이 객체 식별에 관해선 나중에 Cloud Function을 활용해서 Google Chat이나 Slack으로 탐지 알람을 보낼 수 있다.

index.js

const request = require(`request`);

// Translate JSON message into Text
exports.notifyHangouts = async (pubsubEvent, context) => {
    const pubsubData = Buffer.from(pubsubEvent.data, 'base64').toString();
    const alert = JSON.parse(pubsubData);
    console.log(alert);
    const now = new Date();
   
    // Post message to the room
    request({
      uri: "<Google Chat Webhook url 입력>",
      method: "POST",
      json: {
  "text": "GS One BigData Project Object Detection Alert",
  "cards": [
    {
      "header": {
        "title": "GCP Object Detection Alert",
        "subtitle": "Automated Object Detection Notificaiton",
        "imageUrl": "https://goo.gl/aeDtrS"
      },
      "sections": [
        {
          "widgets": [
            {
                "keyValue": {
                  "topLabel": "Entity",
                  "content": alert.entity.toString()
                }
              },
              {
                "keyValue": {
                  "topLabel": "File",
                  "content": alert.file_name.toString()
                }
              },
              {
                "keyValue": {
                  "topLabel": "Time",
                  "content": alert.frame_data[0].processing_timestamp.toString()
                }
              }
          ]
        },
        {
          "widgets": [
              {
                  "buttons": [
                    {
                      "textButton": {
                        "text": "OPEN THE GCP CONSOLE",
                        "onClick": {
                          "openLink": {
                            "url": "https://console.cloud.google.com/bigquery?project=gsn-rampup-bigdata-prj&ws=!1m5!1m4!4m3!1sgsn-rampup-bigdata-prj!2svideo_analytics!3sobject_tracking_analysis"
                          }
                        }
                      }
                    }
                  ]
              }
          ]
        }
      ]
    }
  ]
}               
    }, function(error, response, body){
    if(error) {
        console.log(error);
    } else {
        console.log(response.statusCode, body);
    }
});

};

package.json

{
    "name": "cloud-functions-object-tracking-data",
    "version": "0.0.1",
    "dependencies": {
      "request": "2.88.2"
  }
}

Dataflow 코드에서 데이터를 Pub/Sub으로 보내는 걸 entity, file_name의 이름으로, 그리고 나머지 항목은 묶어서 데이터 프레임 형식으로 넣어줬다.

오류에 관한 pub/sub주제 및 구독 생성

주제 생성

gcloud pubsub topics create ${ERROR_TOPIC}

구독 생성

gcloud pubsub subscriptions create ${ERROR_SUBSCRIPTION} \
--topic=${ERROR_TOPIC}

Dataflow 파이프라인 실행

Apache Beam 자바 SDK를 사용하여 Dataflow 파이프라인을 빌드한 후 Dataflow Flex 템플릿을 사용하여 실행
Dataflow 파이프라인은 Video Intelligence API를 사용하여 동영상을 프레임 별로 분석하고 결과를 BigQuery와 Pub/Sub으로 보낸다.

cd gradle/wrapper

gradle이 deprecated된 버전이라 gradle-wrapper.properties에서 gradle 버전을 수정해줬다.
(현재 gradle-7.5-rc-2-all.zip 사용)

build.gradle

/*
 * Copyright 2020 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
buildscript {
    ext {
        dataflowBeamVersion = '2.23.0'
    }
    repositories {
        mavenCentral()
        jcenter()
        maven {
            url "https://plugins.gradle.org/m2/"
       }
       dependencies {
            classpath "net.ltgt.gradle:gradle-apt-plugin:0.19"
            classpath "gradle.plugin.com.google.cloud.tools:jib-gradle-plugin:3.2.1"
            classpath "com.diffplug.spotless:spotless-plugin-gradle:6.7.2"
        }
    }
}

group 'com.google.solutions'
version '0.1.0'

repositories {
   mavenCentral()
}

apply plugin: 'java'
apply plugin: 'application'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'com.google.cloud.tools.jib'
apply plugin: "com.diffplug.spotless"


// Licence header enforced by spotless
def javaLicenseHeader = """/*
 * Copyright 2020 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
"""

sourceCompatibility = 1.8
targetCompatibility = 1.8
applicationDefaultJvmArgs = ['-Xms4g', '-Xmx16g']
mainClassName = System.getProperty('mainClass', 'com.google.solutions.df.video.analytics.VideoAnalyticsPipeline')


dependencies {
    implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: dataflowBeamVersion
    implementation group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version:dataflowBeamVersion
    implementation group: 'org.apache.beam', name: 'beam-runners-direct-java', version: dataflowBeamVersion
    implementation group: 'org.apache.beam', name: 'beam-sdks-java-extensions-ml', version: dataflowBeamVersion
    implementation group: 'org.slf4j', name: 'slf4j-jdk14', version: '1.7.30'
    implementation "com.google.auto.value:auto-value-annotations:1.6.2"
   implementation 'com.google.cloud:google-cloud-video-intelligence:1.2.1'
    annotationProcessor "com.google.auto.value:auto-value:1.6.2"
    testImplementation group: 'org.apache.beam', name: 'beam-runners-direct-java', version: dataflowBeamVersion
    testImplementation group: 'org.slf4j', name: 'slf4j-jdk14', version: '1.7.30'
}


jar {
    manifest {
        attributes ('Main-Class': mainClassName)
    }
}

compileJava {
    options.compilerArgs.addAll(['-Xlint:all', '-Xlint:-serial'])
    options.encoding = 'UTF-8'
}

// default tasks when you run ./gradlew
defaultTasks 'clean', 'assemble'

// Google Cloud Dataflow requires the resource/main directory to exist
task resources {
    def resourcesDir = new File('build/resources/main')
    resourcesDir.mkdirs()
}

run {
    if (project.hasProperty('args')) {
        args project.args.split('\\s')
    }
    mustRunAfter 'resources'
}

// Spotless configuration

def enableSpotlessCheck = project.hasProperty('enableSpotlessCheck') && project.enableSpotlessCheck == 'true'
spotless {
  enforceCheck enableSpotlessCheck
  java {
    licenseHeader javaLicenseHeader
    googleJavaFormat()
  }
}


jib {
    from {
        image = 'gcr.io/dataflow-templates-base/java8-template-launcher-base:latest'
    }
    to {
        credHelper = 'gcloud'
    }
    container {
        appRoot = '/template/df-video-analytics-template'
        entrypoint = 'INHERIT'
        environment = [DATAFLOW_JAVA_COMMAND_SPEC:'/template/df-video-analytics-template/resources/java_command_spec.json']
    }
}

Dataflow Flex 템플릿용 Docker 이미지 생성

gcloud auth configure-docker

gradle jib \
    --image=gcr.io/${PROJECT}/dataflow-video-analytics:latest

만약 아래 사진과 같은 오류가 난다면 build.gradle에서 jib-gradle-plugin 버전을 최신 버전으로 바꿔주자(현재 위의 코드는 3.2.1버전 사용)

Dataflow Flex 템플릿을 저장할 Cloud Storage 버킷 생성

gsutil mb -c standard -l ${REGION} \
gs://${DATAFLOW_TEMPLATE_BUCKET}

Dataflow 템플릿의 json 구성파일을 버킷에 업로드

cat << EOF | gsutil cp - gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_video_analytics.json
{
  "image":
  "gcr.io/${PROJECT}/dataflow-video-analytics:latest",
  "sdk_info": {"language": "JAVA"}
}
EOF

파이프라인 실행

gcloud beta dataflow flex-template run "video-analysis-pipeline" \
--project=${PROJECT} \
--region=${REGION} \
--template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_video_analytics.json \
--parameters=^~^autoscalingAlgorithm="THROUGHPUT_BASED"~numWorkers=5~maxNumWorkers=5~workerMachineType=n1-highmem-4\
~inputNotificationSubscription=projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION}\
~outputTopic=projects/${PROJECT}/topics/${OBJECT_DETECTION_TOPIC}\
~errorTopic=projects/${PROJECT}/topics/${ERROR_TOPIC}\
~features=OBJECT_TRACKING~entities=window,person~confidenceThreshold=0.9~windowInterval=1\
~tableReference=${PROJECT}:${BIGQUERY_DATASET}.object_tracking_analysis\
~streaming=true


Cloud Logging에서 로그를 전부 확인할 수 있다.

샘플 동영상 파일 업로드

동영상 파일을 분할하려면 ffmpeg 프로그램을 사용해야 한다.

sudo apt update
sudo apt install -y ffmpeg

원본 동영상 파일 다운로드

gsutil -m cp gs://dataflow-video-analytics-test-clips/*.mp4 .

꼭 위의 샘플데이터로 안하고 직접 찍어 올려도 된다.

만약 mkv파일을 올리고 싶다?

다운 받은 깃허브 소스에서 FilterInputNotificationsTransform을 찾는다.

여기서 확인 후 mkv를 넣어주면 된다.

원본 파일을 5초 세그먼트로 분할

for file in *.mp4
do
ffmpeg -i "$file" -codec:a aac  -ac 2  -ar 48k -c copy \
-movflags faststart -f segment -segment_format mpegts \
-segment_time 5 "${file%.*}~"%1d.mp4
done

세그먼트로 분할된 파일은 이름마다 다른 숫자로 끝난다
(ex: myfile~1.mp4 및 myfile~2.mp4)

동영상 세그먼트를 Cloud Storage에 업로드

gsutil -m cp *~*.mp4 gs://${VIDEO_CLIPS_BUCKET}/

각 동영상 세그먼트를 업로드하면 Cloud Storage는 자동으로 알림을 Pub/Sub로 보낸다.
Pub/Sub는 알림을 Dataflow 작업으로 전달.
Dataflow는 PubSub을 통해 bucketid, 동영상 id, 정상적으로 업로드된 파일인지 등등을 확인해서 Video Intelligence API를 활용해 동영상 파일을 처리해 BigQuery 테이블에 데이터를 쌓은 것

BigQuery에서 동영상 데이터 분석

사람, 자전거 등과 같은 객체가 탐지된 비디오 클립를 보여준다.

SELECT file_name, entity
FROM `video_analytics.object_tracking_analysis`
WHERE entity like 'bicycle%'
OR entity like 'person%'
OR entity like 'cat%'
GROUP by file_name, entity

비디오 클립에서 탐지된 모든 객체 및 탐지와 관련된 신뢰도 수준을 표시

SELECT file_name, entity, max(fd.confidence) AS max_confidence
FROM `video_analytics.object_tracking_analysis` ,
UNNEST(frame_data) AS fd
GROUP by file_name, entity
ORDER by file_name, entity, max_confidence DESC

비디오 파일의 시간 간격, bounding box coordinates 및 탐지된 객체를 가져온다.

SELECT entity, fd.*
FROM `video_analytics.object_tracking_analysis`,
UNNEST(frame_data) AS fd
WHERE file_name LIKE 'gbike%.mp4'
ORDER BY timeOffset

Dataflow 파이프라인에서 Pub/Sub로 보낸 객체 탐지에 대한 결과를 표시하는 간단한 Python 애플리케이션 실행

virtualenv venv
source venv/bin/activate

pub/sub pip패키지 설치

pip install google-cloud-pubsub

python app 실행

python pull-detections.py \
--project=${PROJECT} \
--subscription=${OBJECT_DETECTION_SUBSCRIPTION}

애플리케이션은 Pub/Sub 주제를 사용하고
Dataflow 작업의 처리 기준과 일치하는 모든 결과를 표시

Python app이 아니라 Pub/Sub과 Cloud Function을 사용하여(위에 있는 index.js와 package.json코드 사용) Slack or Google Chat을 연동해서 알림을 받을 수도 있다.

profile
Solutions Architect (rlaalsgud97@gmail.com)

0개의 댓글