API 사용 설정
gcloud services enable dataflow.googleapis.com \
containerregistry.googleapis.com \
videointelligence.googleapis.com
사용할 변수 설정
Cloud Shell에서 작업하는 경우 Shell 을 껐다 켜거나 시간이 지날 때마다 변수를 다시 선언해줘야 한다. 이를 해결하고 싶으면 nano ./bashrc에서 아래의 변수들을 다 등록해주고 source ./bashrc로 저장
export REGION=REGION
export PROJECT=$(gcloud config get-value project)
export GCS_NOTIFICATION_TOPIC="gcs-notification-topic"
export GCS_NOTIFICATION_SUBSCRIPTION="gcs-notification-subscription"
export VIDEO_CLIPS_BUCKET=${PROJECT}_videos
export BIGQUERY_DATASET="video_analytics"
export OBJECT_DETECTION_TOPIC="object-detection-topic"
export OBJECT_DETECTION_SUBSCRIPTION="object-detection-subscription"
export ERROR_TOPIC="error-topic"
export ERROR_SUBSCRIPTION="error-subscription"
export DATAFLOW_TEMPLATE_BUCKET=${PROJECT}_dataflow_template_config
사용할 깃허브 저장소 clone
git clone https://github.com/GoogleCloudPlatform/dataflow-video-analytics.git
cd dataflow-video-analytics
pub/sub 주제 생성
gcloud pubsub topics create ${GCS_NOTIFICATION_TOPIC}
주제에 대한 pub/sub 구독 생성
gcloud pubsub subscriptions create ${GCS_NOTIFICATION_SUBSCRIPTION} \
--topic=${GCS_NOTIFICATION_TOPIC}
동영상 클립을 저장할 버킷 생성
gsutil mb -c standard -l ${REGION} gs://${VIDEO_CLIPS_BUCKET}
버킷용 pub/sub알림 생성
※이 작업은 콘솔에서 되지 않는다. 오로지 Shell 작업을 통해서만 가능※
gsutil notification create -t ${GCS_NOTIFICATION_TOPIC} \
-f json gs://${VIDEO_CLIPS_BUCKET}
데이터 세트 생성
bq mk -d --location=US ${BIGQUERY_DATASET}
테이블 생성
bq mk -t \
--schema src/main/resources/table_schema.json \
--description "object_tracking_data" \
${PROJECT}:${BIGQUERY_DATASET}.object_tracking_analysis
주제 생성
gcloud pubsub topics create ${OBJECT_DETECTION_TOPIC}
구독 생성
gcloud pubsub subscriptions create ${OBJECT_DETECTION_SUBSCRIPTION} \
--topic=${OBJECT_DETECTION_TOPIC}
이 객체 식별에 관해선 나중에 Cloud Function을 활용해서 Google Chat이나 Slack으로 탐지 알람을 보낼 수 있다.
index.js
const request = require(`request`);
// Translate JSON message into Text
exports.notifyHangouts = async (pubsubEvent, context) => {
const pubsubData = Buffer.from(pubsubEvent.data, 'base64').toString();
const alert = JSON.parse(pubsubData);
console.log(alert);
const now = new Date();
// Post message to the room
request({
uri: "<Google Chat Webhook url 입력>",
method: "POST",
json: {
"text": "GS One BigData Project Object Detection Alert",
"cards": [
{
"header": {
"title": "GCP Object Detection Alert",
"subtitle": "Automated Object Detection Notificaiton",
"imageUrl": "https://goo.gl/aeDtrS"
},
"sections": [
{
"widgets": [
{
"keyValue": {
"topLabel": "Entity",
"content": alert.entity.toString()
}
},
{
"keyValue": {
"topLabel": "File",
"content": alert.file_name.toString()
}
},
{
"keyValue": {
"topLabel": "Time",
"content": alert.frame_data[0].processing_timestamp.toString()
}
}
]
},
{
"widgets": [
{
"buttons": [
{
"textButton": {
"text": "OPEN THE GCP CONSOLE",
"onClick": {
"openLink": {
"url": "https://console.cloud.google.com/bigquery?project=gsn-rampup-bigdata-prj&ws=!1m5!1m4!4m3!1sgsn-rampup-bigdata-prj!2svideo_analytics!3sobject_tracking_analysis"
}
}
}
}
]
}
]
}
]
}
]
}
}, function(error, response, body){
if(error) {
console.log(error);
} else {
console.log(response.statusCode, body);
}
});
};
package.json
{
"name": "cloud-functions-object-tracking-data",
"version": "0.0.1",
"dependencies": {
"request": "2.88.2"
}
}
Dataflow 코드에서 데이터를 Pub/Sub으로 보내는 걸 entity, file_name의 이름으로, 그리고 나머지 항목은 묶어서 데이터 프레임 형식으로 넣어줬다.
주제 생성
gcloud pubsub topics create ${ERROR_TOPIC}
구독 생성
gcloud pubsub subscriptions create ${ERROR_SUBSCRIPTION} \
--topic=${ERROR_TOPIC}
Apache Beam 자바 SDK를 사용하여 Dataflow 파이프라인을 빌드한 후 Dataflow Flex 템플릿을 사용하여 실행
Dataflow 파이프라인은 Video Intelligence API를 사용하여 동영상을 프레임 별로 분석하고 결과를 BigQuery와 Pub/Sub으로 보낸다.
cd gradle/wrapper
gradle이 deprecated된 버전이라 gradle-wrapper.properties에서 gradle 버전을 수정해줬다.
(현재 gradle-7.5-rc-2-all.zip 사용)
build.gradle
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
buildscript {
ext {
dataflowBeamVersion = '2.23.0'
}
repositories {
mavenCentral()
jcenter()
maven {
url "https://plugins.gradle.org/m2/"
}
dependencies {
classpath "net.ltgt.gradle:gradle-apt-plugin:0.19"
classpath "gradle.plugin.com.google.cloud.tools:jib-gradle-plugin:3.2.1"
classpath "com.diffplug.spotless:spotless-plugin-gradle:6.7.2"
}
}
}
group 'com.google.solutions'
version '0.1.0'
repositories {
mavenCentral()
}
apply plugin: 'java'
apply plugin: 'application'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'com.google.cloud.tools.jib'
apply plugin: "com.diffplug.spotless"
// Licence header enforced by spotless
def javaLicenseHeader = """/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
"""
sourceCompatibility = 1.8
targetCompatibility = 1.8
applicationDefaultJvmArgs = ['-Xms4g', '-Xmx16g']
mainClassName = System.getProperty('mainClass', 'com.google.solutions.df.video.analytics.VideoAnalyticsPipeline')
dependencies {
implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: dataflowBeamVersion
implementation group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version:dataflowBeamVersion
implementation group: 'org.apache.beam', name: 'beam-runners-direct-java', version: dataflowBeamVersion
implementation group: 'org.apache.beam', name: 'beam-sdks-java-extensions-ml', version: dataflowBeamVersion
implementation group: 'org.slf4j', name: 'slf4j-jdk14', version: '1.7.30'
implementation "com.google.auto.value:auto-value-annotations:1.6.2"
implementation 'com.google.cloud:google-cloud-video-intelligence:1.2.1'
annotationProcessor "com.google.auto.value:auto-value:1.6.2"
testImplementation group: 'org.apache.beam', name: 'beam-runners-direct-java', version: dataflowBeamVersion
testImplementation group: 'org.slf4j', name: 'slf4j-jdk14', version: '1.7.30'
}
jar {
manifest {
attributes ('Main-Class': mainClassName)
}
}
compileJava {
options.compilerArgs.addAll(['-Xlint:all', '-Xlint:-serial'])
options.encoding = 'UTF-8'
}
// default tasks when you run ./gradlew
defaultTasks 'clean', 'assemble'
// Google Cloud Dataflow requires the resource/main directory to exist
task resources {
def resourcesDir = new File('build/resources/main')
resourcesDir.mkdirs()
}
run {
if (project.hasProperty('args')) {
args project.args.split('\\s')
}
mustRunAfter 'resources'
}
// Spotless configuration
def enableSpotlessCheck = project.hasProperty('enableSpotlessCheck') && project.enableSpotlessCheck == 'true'
spotless {
enforceCheck enableSpotlessCheck
java {
licenseHeader javaLicenseHeader
googleJavaFormat()
}
}
jib {
from {
image = 'gcr.io/dataflow-templates-base/java8-template-launcher-base:latest'
}
to {
credHelper = 'gcloud'
}
container {
appRoot = '/template/df-video-analytics-template'
entrypoint = 'INHERIT'
environment = [DATAFLOW_JAVA_COMMAND_SPEC:'/template/df-video-analytics-template/resources/java_command_spec.json']
}
}
Dataflow Flex 템플릿용 Docker 이미지 생성
gcloud auth configure-docker
gradle jib \
--image=gcr.io/${PROJECT}/dataflow-video-analytics:latest
만약 아래 사진과 같은 오류가 난다면 build.gradle에서 jib-gradle-plugin 버전을 최신 버전으로 바꿔주자(현재 위의 코드는 3.2.1버전 사용)
Dataflow Flex 템플릿을 저장할 Cloud Storage 버킷 생성
gsutil mb -c standard -l ${REGION} \
gs://${DATAFLOW_TEMPLATE_BUCKET}
Dataflow 템플릿의 json 구성파일을 버킷에 업로드
cat << EOF | gsutil cp - gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_video_analytics.json
{
"image":
"gcr.io/${PROJECT}/dataflow-video-analytics:latest",
"sdk_info": {"language": "JAVA"}
}
EOF
파이프라인 실행
gcloud beta dataflow flex-template run "video-analysis-pipeline" \
--project=${PROJECT} \
--region=${REGION} \
--template-file-gcs-location=gs://${DATAFLOW_TEMPLATE_BUCKET}/dynamic_template_video_analytics.json \
--parameters=^~^autoscalingAlgorithm="THROUGHPUT_BASED"~numWorkers=5~maxNumWorkers=5~workerMachineType=n1-highmem-4\
~inputNotificationSubscription=projects/${PROJECT}/subscriptions/${GCS_NOTIFICATION_SUBSCRIPTION}\
~outputTopic=projects/${PROJECT}/topics/${OBJECT_DETECTION_TOPIC}\
~errorTopic=projects/${PROJECT}/topics/${ERROR_TOPIC}\
~features=OBJECT_TRACKING~entities=window,person~confidenceThreshold=0.9~windowInterval=1\
~tableReference=${PROJECT}:${BIGQUERY_DATASET}.object_tracking_analysis\
~streaming=true
Cloud Logging에서 로그를 전부 확인할 수 있다.
동영상 파일을 분할하려면 ffmpeg 프로그램을 사용해야 한다.
sudo apt update
sudo apt install -y ffmpeg
원본 동영상 파일 다운로드
gsutil -m cp gs://dataflow-video-analytics-test-clips/*.mp4 .
꼭 위의 샘플데이터로 안하고 직접 찍어 올려도 된다.
만약 mkv파일을 올리고 싶다?
다운 받은 깃허브 소스에서 FilterInputNotificationsTransform을 찾는다.
여기서 확인 후 mkv를 넣어주면 된다.
원본 파일을 5초 세그먼트로 분할
for file in *.mp4
do
ffmpeg -i "$file" -codec:a aac -ac 2 -ar 48k -c copy \
-movflags faststart -f segment -segment_format mpegts \
-segment_time 5 "${file%.*}~"%1d.mp4
done
세그먼트로 분할된 파일은 이름마다 다른 숫자로 끝난다
(ex: myfile~1.mp4 및 myfile~2.mp4)
동영상 세그먼트를 Cloud Storage에 업로드
gsutil -m cp *~*.mp4 gs://${VIDEO_CLIPS_BUCKET}/
각 동영상 세그먼트를 업로드하면 Cloud Storage는 자동으로 알림을 Pub/Sub로 보낸다.
Pub/Sub는 알림을 Dataflow 작업으로 전달.
Dataflow는 PubSub을 통해 bucketid, 동영상 id, 정상적으로 업로드된 파일인지 등등을 확인해서 Video Intelligence API를 활용해 동영상 파일을 처리해 BigQuery 테이블에 데이터를 쌓은 것
BigQuery에서 동영상 데이터 분석
사람, 자전거 등과 같은 객체가 탐지된 비디오 클립를 보여준다.
SELECT file_name, entity
FROM `video_analytics.object_tracking_analysis`
WHERE entity like 'bicycle%'
OR entity like 'person%'
OR entity like 'cat%'
GROUP by file_name, entity
비디오 클립에서 탐지된 모든 객체 및 탐지와 관련된 신뢰도 수준을 표시
SELECT file_name, entity, max(fd.confidence) AS max_confidence
FROM `video_analytics.object_tracking_analysis` ,
UNNEST(frame_data) AS fd
GROUP by file_name, entity
ORDER by file_name, entity, max_confidence DESC
비디오 파일의 시간 간격, bounding box coordinates 및 탐지된 객체를 가져온다.
SELECT entity, fd.*
FROM `video_analytics.object_tracking_analysis`,
UNNEST(frame_data) AS fd
WHERE file_name LIKE 'gbike%.mp4'
ORDER BY timeOffset
Dataflow 파이프라인에서 Pub/Sub로 보낸 객체 탐지에 대한 결과를 표시하는 간단한 Python 애플리케이션 실행
virtualenv venv
source venv/bin/activate
pub/sub pip패키지 설치
pip install google-cloud-pubsub
python app 실행
python pull-detections.py \
--project=${PROJECT} \
--subscription=${OBJECT_DETECTION_SUBSCRIPTION}
애플리케이션은 Pub/Sub 주제를 사용하고
Dataflow 작업의 처리 기준과 일치하는 모든 결과를 표시
Python app이 아니라 Pub/Sub과 Cloud Function을 사용하여(위에 있는 index.js와 package.json코드 사용) Slack or Google Chat을 연동해서 알림을 받을 수도 있다.