2019년 겨울 프로젝트 정리 3 - from Kafka to Spark
TweetDeck Clone Project
= 데이터 파이프라인 및 tweet 실시간 피드 개발
Kafka로부터 tweet message들을 읽어, Spark로 실시간으로 분산 처리 후 Cassandra DB에 저장하는 부분에 대해 설명해보려 한다.
과거에 Notion에 정리했던 것을 간단하게 옮긴 것이라 중간 과정 공백이 많을 가능성이 높다
환경 설정 및 분산 처리 관련된 것을 구글링을 통해 배우는 것이 현명하다
코드의 전반적인 프로세스와 코드를 작성하면서 참고할만한 점들을 정리한다.
SparkConf
를 사용하여 conf
생성SparkContext
for Connection to a Spark Clusterconf
를 parameter로 하여 생성한다.StreamingContext
for using streaming applicationKafkaUtils
를 사용해 KafkaStream
생성.DStream
KafkaStream
에서 각 RDD map 함수 처리
map()
vs foreachRDD()
map()
→ Return a new DStream by applying a function to each element of DStream.foreachRDD()
→ Apply a function to each RDD in this DStream.
json.loads
함수를 두번 적용해야 하는 점
Raw 데이터를 분석하고, 이를 바탕으로
어떤 스키마를 바탕으로 데이터를 처리하고 분류할 지 프로세스를 정의하였다.
Retweet
**"retweeted"
으로 Retweet 확인** (프로필 렌더링에 있어서 중요)true
라면, "retweeted_status"
속 데이터로 아래 방식으로 Tweet 렌더링Tweet
**"in_reply_to_status_id"
로 Reply 확인**
"entities"
속 "user_mentions"
member 수, 리스트 데이터로 만들어서 전달Text
**"truncated"
로 Text 초과 데이터 확인**true
시 "extended_tweet"
로 Text 렌더링 (entities 및 extended_entities가 이 안으로 속해 있음)false
시 기본 "text"
로 렌더링을 해야 하는데, Quoted 시 "display_text_range"
데이터가 없기 때문에, "entities"
를 제외한 부분을 Text로 렌더링 해야 한다.주의
hashtag나 urls 같은 경우, 초과 범위 안은 entities에 초과된 것들은 extended_tweet 안에 따로 존재.Media
**"truncated"
확인**true
시 "extended_tweet"
로 media 정보가 모두 넘어가기 때문에, Text 및 media 관련 데이터를 모두 "extended_tweet"
에서 색인하여 렌더링false
시 기본 "entities"
& "extended_entities"
모두에 media 데이터가 있으므로 선택하여 렌더링Quoted
**"is_quote_status"
로 Quoted 확인**true
라면, "quoted_status"
속 데이터로 Tweet 렌더링Filtering
Raw 데이터 예시는 이전 포스팅 확인
{
"created_at":"Tue Jan 28 02:21:06 +0000 2020",
"id":1221981496820453376,
"id_str":"1221981496820453376",
"text":"smilegateserverdev #smilegateserverdev\nsmilegateserverdev #smilegateserverdev\nsmilegateserverdev\u2026 https:\/\/t.co\/tolbTYmZU5",
// **길이 제한 생략** (항상 존재)
"truncated":true,
// **User Data** (항상 존재)
"user":{...},
// **Retweet Data** (Retweet이 아니라면 생략)
"retweeted_status":{...},
// **Quoted Data** (아래 5개) (Quoted가 없다면 상위 4개 생략)
"quoted_status_id":1222013115358990336,
"quoted_status_id_str":"1222013115358990336",
"quoted_status":{...},
"quoted_status_permalink":{...},
"is_quote_status":false,
// **초과된 데이터** (Text 관련) (없다면 생략)
"extended_tweet":{...},
// **Entity** (해시태그, URL, User_mentions, Symbols 등) (항상 존재)
"entities":{...},
// **초과된 데이터** (영상 및 사진 관련) (없다면 생략)
"extended_entities":{...},
// **Retweet** (항상 존재)
"retweeted":false,
"lang":"it",
"timestamp_ms":"1580178066817"
}
Twitter API로 받는 데이터에서 예외 처리할 사항들
해당 데이터
'is_quote_status'
는 True가 되지만, 'quoted_status'
데이터는 없게 된다.
{'created_at': 'Mon Feb 10 00:18:27 +0000 2020', 'id': 1226661672573689862, 'id_str': '1226661672573689862', 'text': '‘what made you decide to have the film in korean’ she asks the korean director who lives in korea and speaks korean… https://t.co/gNSSJyZTBE', 'source': '<a href="http://twitter.com/download/iphone" rel="nofollow">Twitter for iPhone</a>', 'truncated': True, 'in_reply_to_status_id': None, 'in_reply_to_status_id_str': None, 'in_reply_to_user_id': None, 'in_reply_to_user_id_str': None, 'in_reply_to_screen_name': None, 'user': {'id': 1100818939007901696, 'id_str': '1100818939007901696', 'name': 'princess naya ⁷ 💕💘💖💗💓💞', 'screen_name': 'hoyarkive', 'location': '♡ ot7 + jk + chungha', 'url': 'http://www.curiouscat.me/princessnaya', 'description': '#YOONGI: no YOU live in a society, i live in hopeworld ☻', 'translator_type': 'none', 'protected': False, 'verified': False, 'followers_count': 1286, 'friends_count': 258, 'listed_count': 5, 'favourites_count': 4396, 'statuses_count': 3185, 'created_at': 'Wed Feb 27 18:04:22 +0000 2019', 'utc_offset': None, 'time_zone': None, 'geo_enabled': False, 'lang': None, 'contributors_enabled': False, 'is_translator': False, 'profile_background_color': '000000', 'profile_background_image_url': 'http://abs.twimg.com/images/themes/theme1/bg.png', 'profile_background_image_url_https': 'https://abs.twimg.com/images/themes/theme1/bg.png', 'profile_background_tile': False, 'profile_link_color': 'A00B26', 'profile_sidebar_border_color': '000000', 'profile_sidebar_fill_color': '000000', 'profile_text_color': '000000', 'profile_use_background_image': False, 'profile_image_url': 'http://pbs.twimg.com/profile_images/1226524646108995590/zNMyCTH4_normal.jpg', 'profile_image_url_https': 'https://pbs.twimg.com/profile_images/1226524646108995590/zNMyCTH4_normal.jpg', 'profile_banner_url': 'https://pbs.twimg.com/profile_banners/1100818939007901696/1581261239', 'default_profile': False, 'default_profile_image': False, 'following': None, 'follow_request_sent': None, 'notifications': None}, 'geo': None, 'coordinates': None, 'place': None, 'contributors': None, 'is_quote_status': False, 'extended_tweet': {'full_text': '‘what made you decide to have the film in korean’ she asks the korean director who lives in korea and speaks korean and hired a korean cast to play korean characters to make social criticism about class in korea\n\nhttps://t.co/UwOWd1ej2G', 'display_text_range': [0, 236], 'entities': {'hashtags': [], 'urls': [], 'user_mentions': [], 'symbols': [], 'media': [{'id': 1226655108286406656, 'indices': [213, 236], 'media_url': 'http://pbs.twimg.com/ext_tw_video_thumb/1226655108286406656/pu/img/u1UqOQ4joWS5jZwy.jpg', 'type': 'video', 'video_info': {'aspect_ratio': [16, 9], 'duration_millis': 34567, 'variants': [{'content_type': 'application/x-mpegURL', 'url': 'https://video.twimg.com/ext_tw_video/1226655108286406656/pu/pl/_y_7pzqoXgjbnHgT.m3u8?tag=10'}, {'bitrate': 832000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1226655108286406656/pu/vid/640x360/ROE626tsdcwiaHtb.mp4?tag=10'}, {'bitrate': 2176000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1226655108286406656/pu/vid/1280x720/pECEW8SuXumK5uox.mp4?tag=10'}, {'bitrate': 256000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1226655108286406656/pu/vid/480x270/2lGE_B2aop7mRUux.mp4?tag=10'}]}}]}, 'extended_entities': {'media': [{'id': 1226655108286406656, 'indices': [213, 236], 'media_url': 'http://pbs.twimg.com/ext_tw_video_thumb/1226655108286406656/pu/img/u1UqOQ4joWS5jZwy.jpg', 'type': 'video', 'video_info': {'aspect_ratio': [16, 9], 'duration_millis': 34567, 'variants': [{'content_type': 'application/x-mpegURL', 'url': 'https://video.twimg.com/ext_tw_video/1226655108286406656/pu/pl/_y_7pzqoXgjbnHgT.m3u8?tag=10'}, {'bitrate': 832000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1226655108286406656/pu/vid/640x360/ROE626tsdcwiaHtb.mp4?tag=10'}, {'bitrate': 2176000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1226655108286406656/pu/vid/1280x720/pECEW8SuXumK5uox.mp4?tag=10'}, {'bitrate': 256000, 'content_type': 'video/mp4', 'url': 'https://video.twimg.com/ext_tw_video/1226655108286406656/pu/vid/480x270/2lGE_B2aop7mRUux.mp4?tag=10'}]}}]}}, 'quote_count': 8905, 'reply_count': 1587, 'retweet_count': 133452, 'favorite_count': 484247, 'entities': {'hashtags': [], 'urls': [{'url': 'https://t.co/gNSSJyZTBE', 'expanded_url': 'https://twitter.com/i/web/status/1226661672573689862', 'display_url': 'twitter.com/i/web/status/1…', 'indices': [117, 140]}], 'user_mentions': [], 'symbols': []}, 'favorited': False, 'retweeted': False, 'possibly_sensitive': False, 'filter_level': 'low', 'lang': 'en'}
'timestamp_ms'
값이 누락된 데이터 존재. 수없이 많이 존재.print
한 결과인데, 또 중요한 것이 중복 데이터가 존재.
TweetDeck에서 프로필 클릭 시 모습
원래 배경 설정이 되어 있지 않더라도 기본 트위터 배경 URL이 **'profile_banner_url'
** 에 있었는데, 누락되는 경우가 있어서 제외
retweeted
가 false인 것을 확인하고, 다르게 처리하였다.
아래 링크들을 참고하는 것이 현명하다
전체적인 파이프라인 - mavoll/SparkPipeline
카산드라 관련 - How To Run a Multi-Node Cluster Database with Cassandra on Ubuntu 14.04 | DigitalOcean
"Inside Twitter's Realtime Delivery of Tweets for Enterprise Customers" by Lisa White
실제 Twitter API가 작동하는 원리에 대한 youtube 영상인데, 사실 데이터 파이프라인이 아닌,
Filtered API를 만들고 있는 것이 아닌가라는 고민을 했다.반년이 지난 지금 생각해보니 파이프라인이 아니었다...
끝🙋♂️
아무도 안보는 블로그지만 그 소수의 고생하고 있는 이들이 도움을 받길