[twitter API v2 with python] load stream data to Mongodb

Ja L·2022년 12월 29일
0

Twitter API & Mongodb

목록 보기
2/2

이번에는 Twitter API V2를 이용해서 stream 데이터를 받아볼겁니다.
아래의 코드의 경우, stream 데이터를 Mongodb에 db를 만들고 저장하는 과정이 포함되어있습니다.

환경변수에 대한 추가적인 설명이 필요하면 아래의 링크를 참고하세요.

https://velog.io/@dafld/python-%ED%99%98%EA%B2%BD%EB%B3%80%EC%88%98-%EC%84%A4%EC%A0%95-os.environ

create_url 함수 내의 url에 관한 정보가 필요하면 아래의 링크를 참고하세요.

https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/quick-start



import requests
import os
import json
from pymongo import MongoClient

# To set your environment variables in your terminal run the following line:
# export 'BEARER_TOKEN'='<your_bearer_token>'

# os.environ.get('환경변수명', '값') 으로 환경변수에 값을 설정할 수 있다.
bearer_token = os.environ.get('BEARER_TOKEN', "본인의 승인받은 BEARER_TOKEN 입력")

####################### 추가된 부분 ##########################
client = MongoClient('localhost', 27017) # Mongodb에 연결합니다.
db = client.twitter4 # twitter 라는 데이터베이스를 만듭니다.
###########################################################

def create_url():
    return "https://api.twitter.com/2/tweets/sample/stream?tweet.fields=created_at&expansions=author_id&user.fields=created_at"


def bearer_oauth(r):
    """
    Method required by bearer token authentication.
    """

    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2SampledStreamPython"
    return r


def connect_to_endpoint(url):
    response = requests.request("GET", url, auth=bearer_oauth, stream=True)
    print(response.status_code)
    for response_line in response.iter_lines():
        if response_line:
            json_response = json.loads(response_line)
            print(json.dumps(json_response, indent=4, sort_keys=True))
            ##################### 추가된 부분 #####################
            db.data.insert_one(json_response)
            ####################################################
    if response.status_code != 200:
        raise Exception(
            "Request returned an error: {} {}".format(
                response.status_code, response.text
            )
        )


def main():
    url = create_url()
    timeout = 0
    while True:
        connect_to_endpoint(url)
        timeout += 1


if __name__ == "__main__":
    main()
    

받아온 데이터를 확인해보겠습니다.
아래는 터미널 창에 리턴된 스트림 데이터의 일부를 캡쳐했습니다.


이번에는 Mongodb GUI인 Mongodb compass를 이용해서 저장된 데이터를 확인해보겠습니다.
몇 초 안켜놨는데 1935개의 트윗이 저장됐네요. 저장이 잘 되었음을 확인할 수 있습니다.

twitter, twitter2, twitter3, twitter4는 각각 제가 생성한 db이고 여러가지 조건을 이용해서 stream data를 filter 해보는 용도로 만들었습니다. (별 다른게 아니라는 뜻)

MongoDB compass 설치 링크는 다음과 같습니다.

https://www.mongodb.com/try/download/compass

아래는 실시간 네트워크 발생과 작업상태를 확인할 수 있는 창인데 신기해서 캡쳐해봤습니다.
성공적으로 비정형 데이터를 Mongodb에 수집했습니다.


Debuging feedback

Twitter API tutorial에서 python을 이용한 stream 데이터 받아오는 코드 샘플은 아래의 주소에 있습니다.

https://github.com/twitterdev/Twitter-API-v2-sample-code/blob/main/Sampled-Stream/sampled-stream.py

코드를 복사해서 승인받은 BEARER_TOKEN을 입력하고 실행시키면 아래와 같은 오류가 발생합니다.

29번째 줄을 참조하면 아래와 같은데 print의 결과가 401이 출력됩니다.

	response = requests.request("GET", url, auth=bearer_oauth, stream=True)
    print(response.status_code)

최초의 과정에서 export 'BEARER_TOKEN'='값' 으로 환경변수가 설정을 시도했지만 환경변수 설정이 안됐습니다. 따라서 환경변수 설정과 관련한 내용을 구글링한 결과 다른 방법으로 환경변수 설정하는 방법을 찾아 제대로 환경변수 설정을 해주었고, 그 결과 response.status_code 가 200 으로 리턴되고 정상적으로 실행됨을 확인했습니다.

os.environ.get('환경변수명', '값') 으로 환경변수에 값을 설정할 수 있다.

결과적으로 JSONDecoderError가 발생한 이유는 401 에러로 어떠한 데이터도 받아오지 못했기 떄문입니다. 데이터를 JSON포맷으로 받아올것이라고 예상한 JSONDecoder는 아무런 데이터를 받지 못했기때문에 형식에 문제가 있다고 설명을 한 것으로 보입니다.

profile
DB Engineer

0개의 댓글