다운로드 후 압축만 해제 하면 됨
jdk 1.6 이상이 설치되어 있어야 한다.
특정 하둡버전에 대해서만 동작.
릴리즈 페이지에서 확인 (http://pig.apache.org/releases.html#25+April%2C+2012%3A+release+0.10.0+available)
export HADOOP_INSTALL=/home/gurubee/cloud/env/hadoop-1.0.3
PATH=$PATH:$HOME/bin:$HADOOP_INSTALL/bin
export PATH
export PIG_CLASSPATH=$HADOOP_INSTALL/conf
pig -x local
2012-06-15 19:08:33,516 [main] INFO org.apache.pig.Main - Apache Pig version 0.10.0 (r1328203) compiled Apr 19 2012, 22:54:12
2012-06-15 19:08:33,517 [main] INFO org.apache.pig.Main - Logging error messages to: /cloud/env/pig-0.10.0/bin/pig_1339754913505.log
2012-06-15 19:08:33,742 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
grunt>
쿼리 요청을 맵리듀스 작업으로 번역하여 하둡 클러스터에서 실행한다.
하둡과 피그의 버전이 상호 호환이 되어야 한다.
환경설정 수정
export PIG_CLASSPATH=$HADOOP_INSTALL/conf
맵리듀스 모드실행
/cloud/env/pig-0.10.0/bin/pig
2012-06-15 19:14:18,261 [main] INFO org.apache.pig.Main - Apache Pig version 0.10.0 (r1328203) compiled Apr 19 2012, 22:54:12
2012-06-15 19:14:18,262 [main] INFO org.apache.pig.Main - Logging error messages to: /cloud/env/pig-0.10.0/bin/pig_1339755258253.log
2012-06-15 19:14:18,620 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: file:///
2012-06-15 19:14:18,830 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to map-reduce job tracker at: localhost:9001
grunt>
2장의 연중 가장 높은 기온을 계산하는 프로그램 예제
sample.txt 내용
1999 10 1
1999 11 2
1999 12 3
2000 20 4
2000 11 5
피그라틴 예
-- 로컬 모드 시작
./bin/pig -x local
-- max_temp.pig: 피그라틴 작성 및 실행
-- 단순히 탭으로 구분된 텍스트이고, 각 라인은 연도, 기온, 특징 항목만으로 기록된 데이터로 간주
records = LOAD 'sample/sample.txt'
AS (year:chararray, temperature:int, quality:int);
-- 기온값이 누락되었거나 비적합한 값을 가지는 레코드 제거
filtered_records = FILTER records BY temperature != 9999 AND
(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
-- GROUP 함수를 이용하여 year 항목에 따라 관계를 정렬
grouped_records = GROUP filtered_records BY year;
-- FOREACH 모든행을 처리하여 파생된 행의 집합을 생성
-- GENERATE절을 사용하여 각각 파생된 행의 항목을 정의
max_temp = FOREACH grouped_records GENERATE group,
MAX(filtered_records.temperature);
DUMP max_temp;
-- 실행 결과
Input(s):
Successfully read records from: "file:///home/gurubee/cloud/env/pig-0.10.0/sample/sample.txt"
Output(s):
Successfully stored records in: "file:/tmp/temp-15271195/tmp1542620103"
-- 필터의 영향으로 1999년도는 10도가, 2000년도는 20도가 가장 큰 온도로 조회 되었다.
(1999,10)
(2000,20)
DUMP 연산자를 사용하여 별칭에 해당하는 내용을 확인 할 수 있다.
grunt>DUMP records;
...
어쩌구 저쩌구.. 블라블라..
...
(1999,10,1)
(1999,11,2)
(1999,12,3)
(2000,20,4)
(2000,11,5)
DESCRIBE 연산자를 사용하여 관계 구조(스키마)를 학인 할 수 있다.
grunt> DESCRIBE records
records: {year: chararray,temperature: int,quality: int}
ILLUSTRATE 연산자를 통해서 간결하면서도 완전한 데이터 셋을 생성 할 수도 있다.
grunt> ILLUSTRATE max_temp
...
어쩌구 저쩌구
...
----------------------------------------------------------------------------
| records | year:chararray | temperature:int | quality:int |
----------------------------------------------------------------------------
| | 1999 | 11 | 2 |
| | 1999 | 10 | 1 |
| | 2000 | 11 | 5 |
| | 2000 | 20 | 4 |
| | 1999 | 0 | 0 |
----------------------------------------------------------------------------
-------------------------------------------------------------------------------------
| filtered_records | year:chararray | temperature:int | quality:int |
-------------------------------------------------------------------------------------
| | 1999 | 10 | 1 |
| | 2000 | 11 | 5 |
| | 2000 | 20 | 4 |
| | 1999 | 0 | 0 |
-------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------
| grouped_records | group:chararray | filtered_records:bag{:tuple(year:chararray,temperature:int,quality:int)} |
--------------------------------------------------------------------------------------------------------------------------------------------
| | 1999 | {(1999, 10, 1), (1999, 0, 0)} |
| | 2000 | {(2000, 11, 5), (2000, 20, 4)} |
--------------------------------------------------------------------------------------------------------------------------------------------
-------------------------------------------------
| max_temp | group:chararray | :int |
-------------------------------------------------
| | 1999 | 10 |
| | 2000 | 20 |
-------------------------------------------------
| 피그 라틴 | SQL |
|---|---|
| 데이터 흐름 프로그래밍 언어 | 서술형 프로그래밍 언어 |
| 각 단계가 하나의 변환으로 구성된 입력관계에 대한 단계적 연산의 집합 | 함께 결합되어 출력이 정의되는 제약 조건의 집합 |
| 실행시 선택적으로 스키마를 정의 | 미리 정의된 스카마로 이루어진 테이블에 데이터 저장 |
| 피그는 없다 | 트랜잭션, 인덱스 제공 |
스키마에서 선언한 대로 값이 변환되지 않으면 null 갑으로 대체된다.
피그는 무료한 항목에 대해서 경고 메세지를 생성하지만 처리를 중단시키지 않는다.
is null 연산자 관련 예제들 (Page 444)
손상된 레코드가 몇 개인지 알 수 있다.
-- 손상된 레코드 조회
grunt> corrupt_records = FILTER records BY temperature is null;
grunt> DUMP corrupt_records;
-- 손상된 레코드가 몇 개인지 알 수 있다.
grunt> grouped = GROUP corrupt_records ALL;
grunt> all_grouped = FOREACH grouped GENERATE group, COUNT(corrupt_records);
grunt> DUMP all_grouped;
(all,1L)
SPLIT 연산자를 사용하여 'good'과 'bad'관계로 구분 할 수 있다.
grunt> SPLIT records INTO good_records IF temperature is not null,
>> bad_records IF temperature is null;
-- good
grunt> DUMP good_records;
(1950,0,1)
(1950,22,1)
(1949,111,1)
(1949,78,1)
-- bad
grunt> DUMP bad_records;
(1950,,1)
package com.gurubee.pig.udf.filter;
import java.io.IOException;
import org.apache.pig.FilterFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
/**
* 만족스럽지 못한 온도 특징을 나타내는 레코드를 삭제하는 FilterFunc UDF 함수
*
* @author : oramaster
*
*/
public class IsGoodQuality extends FilterFunc {
@Override
public Boolean exec(Tuple tuple) throws IOException {
if (tuple == null || tuple.size() == 0) {
return false;
}
try {
Object object = tuple.get(0);
if (object == null) {
return false;
}
int i = (Integer) object;
return i == 0 || i == 1 || i == 4 || i == 5 || i == 9;
} catch (ExecException e) {
throw new IOException(e);
}
}
}
새로운 함수를 사용하기 위해서는 jar 파일로 패키지를 생성해야 한다.
REGISTER 연산자를 사용하여 피그에게 jar 파일에 대해 알려준다.
grunt> REGISTER pig.jar;
grunt> filtered_records = FILTER records BY temperature != 9999 AND
>> com.gurubee.pig.udf.filter.IsGoodQuality(quality);
사용 예제
-- udf-sample.txt 내용
2000 11 6
1999 12 7
2000 18 8
1999 8 9
2000 10 9
-- 로컬 모드 시작
./bin/pig -x local
-- cloud-project.jar 등록
REGISTER cloud-project.jar;
-- 피그라틴 작성
records = LOAD 'sample/udf-sample.txt'
AS (year:chararray, temperature:int, quality:int);
-- 사용자 함수 com.gurubee.pig.udf.filter.IsGoodQuality 적용
filtered_records = FILTER records BY temperature != 9999 AND
com.gurubee.pig.udf.filter.IsGoodQuality(quality);
grouped_records = GROUP filtered_records BY year;
max_temp = FOREACH grouped_records GENERATE group,
MAX(filtered_records.temperature);
DUMP max_temp;
-- 처리결과
-- 1999년도에는 8도가, 2000년도에는 10도가 가장 큰 온도로 조회 되었다.
(1999,8)
(2000,10)
DEFINE 연산자를 이용하여 함수 이름을 짧게 할 수 있다.
grunt> DEFINE isGood com.gurubee.pig.udf.filter.IsGoodQuality();
grunt> filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);
UDF 사용 후 아래와 같이 변경되었다.
-- 기온값이 누락되었거나 비적합한 값을 가지는 레코드 제거
-- 변경 전
filtered_records = FILTER records BY temperature != 9999 AND
(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
-- 변경후
filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);
package com.gurubee.pig.udf.eval;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
/**
* chararray 값들의 맨 앞과 맨 뒤의 불필요한 공백을 없애는 EvalFunc UDF
*
* @author : oramaster
*
*/
public class Trim extends EvalFunc<String> {
@Override
public String exec(Tuple input) throws IOException {
if (input == null || input.size() == 0) {
return null;
}
try {
Object object = input.get(0);
if (object == null) {
return null;
}
return ((String) object).trim();
} catch (ExecException e) {
throw new IOException(e);
}
}
/**
* Page 451 타입의 활용 참고
*/
@Override
public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
List<FuncSpec> funcList = new ArrayList<FuncSpec>();
funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
new Schema.FieldSchema(null, DataType.CHARARRAY))));
return funcList;
}
}
PigStorage를 사용해서 튜플을 콜론 문자로 구분되는 일반 텍스트 값으로 저장할 수 있다.
grunt> records = LOAD 'sample/sample.txt'
>> AS (year:chararray, temperature:int, quality:int);
grunt> STORE records INTO 'out' USING PigStorage(':');
grunt> cat out
1999:10:1
1999:11:2
1999:12:3
2000:20:4
2000:11:5
관계의 모든 행에 작동하기 위해 사용한다.
Constant라는 chararray 값을 가지는 상수 항목으로 처리
grunt> DUMP records
...
어쩌구 저쩌구
...
(1999,10,1)
(1999,11,2)
(1999,12,3)
(2000,20,4)
(2000,11,5)
-- 첫번째 항목($0)은 그대로 출력,
-- 두번째 항목은 세번째항목($2)+1 값을 출력
-- 세번째 항목은 Constant로 고정출력
grunt> B = FOREACH records GENERATE $0, $2+1, 'Constant';
grunt> DUMP B;
(1999,2,Constant)
(1999,3,Constant)
(1999,4,Constant)
(2000,5,Constant)
(2000,6,Constant)
한 관계의 데이터를 외부 프로그램 또는 스크립트를 사용해서 변환 할 수 있도록 해준다.
피그에서 리눅스 명령어 및 shell 프로그램 사용을 가능하게 해준다.
리눅스의 cut 명력을 사용하여 두 번째 항목 온도만 출력하는 예이다.
grunt> C = STREAM records THROUGH `cut -f 2`;
grunt> DUMP C;
-- 처리결과
(10)
(11)
(12)
(20)
(11)
내부 조인 예
grunt> DUMP A;
(2,Tie)
(4,Coat)
(3,Hat)
(1,Scarf)
grunt> DUMP B;
(Joe,2)
(Hank,4)
(Ali,0)
(Eve,3)
(Hank,2)
-- 조인작업 (조인키를 지정한다. A BY $0, B BY $1)
-- A의 첫번째 열과 B의 두번째 열의 값이 같은 값만 조인한다.
grunt> C = JOIN A BY $0, B BY $1;
grunt> DUMP C;
(2,Tie,Joe,2)
(2,Tie,Hank,2)
(3,Hat,Eve,3)
(4,Coat,Hank,4)
grunt> D = COGROUP A BY $0, B BY $1;
grunt> DUMP D;
(0,{},{(Ali,0)})
(1,{(1,Scarf)},{})
(2,{(2,Tie)},{(Joe,2),(Hank,2)})
(3,{(3,Hat)},{(Eve,3)})
(4,{(4,Coat)},{(Hank,4)})grunt> I = CROSS A, B;
grunt> DUMP I;
(2,Tie,Joe,2)
(2,Tie,Hank,4)
(2,Tie,Ali,0)
(2,Tie,Eve,3)
(2,Tie,Hank,2)
(4,Coat,Joe,2)
(4,Coat,Hank,4)
(4,Coat,Ali,0)
(4,Coat,Eve,3)
(4,Coat,Hank,2)
(3,Hat,Joe,2)
(3,Hat,Hank,4)
(3,Hat,Ali,0)
(3,Hat,Eve,3)
(3,Hat,Hank,2)
(1,Scarf,Joe,2)
(1,Scarf,Hank,4)
(1,Scarf,Ali,0)
(1,Scarf,Eve,3)
(1,Scarf,Hank,2)grunt> C = GROUP A ALL;
grunt> DUMP C;
(all,{(Joe,cherry),(Ali,apple),(Joe,banana),(Eve,apple)})피그에서 관계들은 정렬되지 않은 상태로 있다.
ORDER 연산자를 사용하여 관계의 한 개 이상의 항목을 정렬할 수 있다.
LIMIT 연산자를 사용하여 개수를 제한 할 수 있다.
grunt> B = ORDER A BY $0, $1 DESC;
grunt> DUMP B;
(1,2)
(2,4)
(2,3)
grunt> D = LIMIT B 2;
grunt> DUMP D;
(1,2)
(2,4)
UNION을 사용하여 하나로 합칠 수 있다.
grunt> DUMP A;
(2,3)
(1,2)
(2,4)
grunt> DUMP B;
(z,x,8)
(w,y,1)
grunt> C = UNION A, B;
grunt> DUMP C;
(2,3)
(z,x,8)
(1,2)
(w,y,1)
(2,4)
grouped_records = GROUP records BY year PARALLEL 30;정기적으로 실행하는 피그 스크립트가 있다면 다른 파라미터를 줘서 같은 스크립트를 실행할 가능성이 매우 높다.
파라미터는 $문자를 접두어로 하여 식별자에 의해서 표시된다.
아래 예의 $input, $output는 입력 및 출력 경로를 지정하기 위해서 사용한다.
피그라틴 스크립트
-- max_temp_param.pig
records = LOAD '$input' AS (year:chararray, temperature:int, quality:int);
filtered_records = FILTER records BY temperature != 9999 AND
(quality == 0 OR quality == 1 OR quality == 4 OR quality == 5 OR quality == 9);
grouped_records = GROUP filtered_records BY year;
max_temp = FOREACH grouped_records GENERATE group,
MAX(filtered_records.temperature);
STORE max_temp into '$output';
피그 실행시 -param옵션을 주어서 파라미터를 지정 할 수 있다.
% pig \
-param input=/user/tom/input/ncdc/micro-tab/sample.txt \
-param output=/tmp/out \
src/main/ch11/pig/max_temp_param.pig
파일에 파라미터를 지정하고 -param_file 옵션을 주어서 피그를 실행하여 파라미터를 넘기는 방법도 있다.
-- max_temp_param.param 파링ㄹ
# Input file
input=/user/tom/input/ncdc/micro-tab/sample.txt
# Output file
output=/tmp/out
-- 피그실행
% pig \
-param_file src/main/ch11/pig/max_temp_param.param \
src/main/ch11/pig/max_temp_param.pig
% pig \
-param input=/user/tom/input/ncdc/micro-tab/sample.txt \
-param output=/tmp/`date "+%Y-%m-%d"`/out \
src/main/ch11/pig/max_temp_param.pig