오늘 할일
- 분석데이터 RDS 저장
- EMR 생성
- EMR에서 RDS데이터 가져오기
분석데이터 RDS 저장
데이터 추가
data_path = '/content/drive/MyDrive/Colab Notebooks/Custommer2.csv'
item_df = pd.read_csv(data_path, index_col='Unnamed: 0')
item_df.head()
allergy_list = ['계란', '우유', '땅콩', '견과류', '밀', '갑각류', '대두', '메밀', '육류', '생선', '과일']
import random
item_df['allergy_list'] = ''
item_df['allergy_list'] = item_df['allergy_list'].apply(lambda x: list(set([random.choice(allergy_list) for i in range(random.randrange(0, 4))])))
item_df.to_csv('/content/drive/MyDrive/Colab Notebooks/total_Custommer.csv')
item_df.head()
데이터 준비
- 어제 크롤링으로 생성한 데이터와 그를 번역한 최종 데이터이다.
- CSV파일로 저장해놓고 S3에 업로드 해놓는다.
- RDS도 미리 생성해 놓았다.
- MySQL 8.0.23버전으로 프라이빗 액세스만 허용하여 생성하였다.
Cloud9
- Cloud9으로 작업하기위해 S3에서 데이터를 가져온다.
dataDownload.py
import boto3
bucket = 's3.cocudeny'
key = 'Custommer2.csv'
image_name = key.split('/')[-1]
s3 = boto3.client('s3')
S3_response = s3.get_object(Bucket=bucket, Key=key)
print("CONTENT TYPE: " + S3_response['ContentType'])
image_path = './'+image_name
s3.download_file(bucket, key, image_path)
print('Download is done')
RDS에 데이터 올리기
import pymysql
import os
import logging
import sys
import pandas as pd
data_path = '/home/ec2-user/environment/BigData/total_Custommer.csv'
rds_endpoint = 'AWS RDS의 엔튿포인트'
name = 'RDS 유저 이름'
password = 'RDS 비밀번호'
db_name = 'RDS 사용할 DB이름'
table_name = '사용(생성)할 테이블이름'
logger = logging.getLogger()
logger.setLevel(logging.INFO)
try:
conn = pymysql.connect(host=rds_endpoint, user=name, passwd=password, db=db_name, connect_timeout=5)
print('connected')
except pymysql.MySQLError as e:
logger.error("ERROR: Unexpected error: Could not connect to MySQL instance.")
logger.error(e)
sys.exit(1)
table_items = {
'custommer_id' : 'int not null',
'custommer_name' : 'varchar(20) not null',
'items' : 'varchar(2000)',
'allergy_list' : 'varchar(50)'
}
try:
sql_create_option = []
for k, v in table_items.items():
sql_create_option.append(k+' '+v)
sql_create_option = (', ').join(sql_create_option)
print('init table :',sql_create_option )
with conn.cursor() as cur:
cur.execute("create table if not exists "+table_name+" ( "+sql_create_option+" )")
conn.commit()
print('init table successed')
except pymysql.MySQLError as e:
logger.error("ERROR: Init Table Error")
logger.error(e)
sys.exit(2)
def get_data_keys(keys):
return (', ').join(keys)
def get_data_vals(series):
text = []
for key in ['customer_id', 'customer_name', 'items', 'allergy_list']:
data = series[key]
if key == 'customer_id':
text.append(str(data))
else:
text.append("\'" + data + "\'")
return (', ').join(text)
item_df = pd.read_csv(data_path, index_col='Unnamed: 0')
item_df['items'] = item_df['items'].apply(lambda x: (', ').join([item[1:-1] for item in x[1:-1].split(', ')]))
item_df['allergy_list'] = item_df['allergy_list'].apply(lambda x: (', ').join([item[1:-1] for item in x[1:-1].split(', ')]))
print('CSV_read done')
error_list = []
data_keys = get_data_keys(list(table_items.keys()))
for i in range(len(item_df)):
data_vals = get_data_vals(item_df.iloc[i])
command = 'insert into '+table_name+' ('+data_keys+') values('+data_vals+')'
try:
with conn.cursor() as cur:
cur.execute(command)
conn.commit()
except pymysql.MySQLError as e:
logger.error("ERROR: Insert Fail.", e)
print(e)
error_list.append(i)
print(error_list)
- 에러 없이 모두 들어갔다.
- 보기가 조금 이상하지만 잘 들어갔다.
추천시스템에 대한 조사
실시간 인기 콘텐츠, 상품 분석 모니터링
추천 시스템
- 전통적인 개인화 추천 방식에는 크게 2가지가 있다.
- 유사한 사용자들이 선호한 다른 아이템을 추천하는
collaborative Filtering
방식과 과거에 선호한 아이템과 내용이 유사한 다른 아이템을 추천하는 content-based Filtering
방식이 있다.
EMR 따라하기
간략한설명
- SSL 인증서를 사용하여 Hive와 외부 메타스토어 간에 암호화된 연결을 설정합니다. 새 Amazon EMR 클러스터를 시작하거나 클러스터가 실행 된 후에 이 연결을 설정할 수 있습니다.
새 Amazon EMR 클러스터에서 SSL 연결 설정
- 다음과 유사한 명령을 실행하여 Amazon RDS MySQL DB 인스턴스를 만듭니다. $RDS_LEADER_USER_NAME과 $RDS_LEADER_PASSWORD를 사용자 이름과 암호로 바꿉니다. 자세한 내용은 create-db-instance를 참조하세요.
aws rds create-db-instance --db-name hive --db-instance-identifier mysql-hive-meta --db-instance-class db.t2.micro\ --engine mysql --engine-version 5.7.19 --master-username $RDS_LEADER_USER_NAME --master-user-password $RDS_LEADER_PASSWORD\ --allocated-storage 20 --storage-type gp2 --vpc-security-group-ids $RDS_VPC_SG --publicly-accessible
- 여기서 우리는 이미 존재하는 RDS를 사용할 것이다.
- 기본 사용자로 Amazon RDS MySQL DB 인스턴스에 연결합니다. 다음 예제와 같이 Hive 메타스토어에 대한 사용자를 만듭니다.
mysql -h mysql-hive-meta.########.us-east-1.rds.amazonaws.com -P 3306 -u $RDS_LEADER_USER_NAME -p
- Enter password: $RDS_LEADER_PASSWORD
CREATE USER 'hive_user'@'%' IDENTIFIED BY 'hive_user_password'; REVOKE ALL PRIVILEGES, GRANT OPTION FROM 'hive_user'@'%'; GRANT ALL PRIVILEGES ON hive.* TO 'hive_user'@'%' REQUIRE SSL; FLUSH PRIVILEGES;
- REQUIRE SSL 문구를 지웠다. SSL이 없기 떄문
- 다음과 유사한 JSON 구성 파일을 작성합니다. hive_user 및 hive_user_password를 2단계에서 JSON 스크립트에 사용한 값으로 바꿉니다. JDBC URL의 엔드포인트를 RDS DB 인스턴스의 엔드포인트로 바꿉니다.
[
{
"Classification": "hive-site",
"Properties": {
"javax.jdo.option.ConnectionURL": "jdbc:mysql:\/\/mysql-hive-meta.########.us-east-1.rds.amazonaws.com:3306\/hive?createDatabaseIfNotExist=true&useSSL=true&serverSslCert=\/home\/hadoop\/rds-combined-ca-bundle.pem",
"javax.jdo.option.ConnectionDriverName": "org.mariadb.jdbc.Driver",
"javax.jdo.option.ConnectionUserName": "hive_user",
"javax.jdo.option.ConnectionPassword": "hive_user_password"
}
}
]
- Amazon RDS MySQL 인스턴스와 연결된 보안 그룹에서 다음 파라미터를 사용하여 인바운드 규칙을 생성합니다.
[유형(Type)]에 대해 [MYSQL/Aurora(3306)]을 선택합니다.
[프로토콜(Protocol)]의 경우 [TCP(6)]가 기본적으로 선택됩니다.
[포트 범위(Port Range)]의 경우 [3306]이 기본적으로 선택됩니다.
[소스(Source)]에 리더 노드와 연결된 Amazon EMR에서 관리하는 보안 그룹의 [그룹 ID(Group ID)]를 입력합니다.
- 아직 EMR을 생성하지 않았기 때문에 모든 IPv4주소를 열었다.
- create-cluster 명령을 실행하여 3단계의 JSON 파일과 부트스트랩 작업을 함께 사용하여 Amazon EMR 클러스터를 시작합니다. 부트스트랩 작업은 리더 노드의 /home/hadoop/에 SSL 인증서를 다운로드합니다.
aws emr create-cluster --applications Name=Hadoop Name=Hive --tags Name="EMR Hive Metastore"\
--ec2-attributes KeyName=$EC2_KEY_PAIR,InstanceProfile=EMR_EC2_DefaultRole,SubnetId=$EMR_SUBNET,EmrManagedSlaveSecurityGroup=$EMR_CORE_AND_TASK_VPC_SG,EmrManagedMasterSecurityGroup=$EMR_MASTER_VPC_SG\
--service-role EMR_DefaultRole --release-label emr-5.18.0 --log-uri s3://awsdoc-example-bucket/emr-logs-path/ --name "Hive External Metastore RDS MySQL w/ SSL"\
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge,Name="Master
- 1"\
--configurations file://hive-ext-meta-mysql-ssl.json\
--bootstrap-actions Path=s3://elasticmapreduce/bootstrap-actions/run-if,Args=["instance.isMaster=true","cd /home/hadoop && wget -S -T
10 -t 5 https://s3.amazonaws.com/rds-downloads/rds-combined-ca-bundle.pem"]
- 잘 안되서 콘솔에서 직접 생성하였다.
- 부트스트랩은 안되서 일단 제외시킨다.
- SSH를 사용하여 리더 노드에 연결합니다.