MongoDB와 InfluxDB는 둘 다 NoSQL 데이터베이스 시스템 이지만
각 사용 용도가 명확하다.
| 특징\DB | MongoDB | InfluxDB |
|---|---|---|
| 데이터베이스 유형 | Document Store (NoSQL) | Time Series Database (TSDB) |
| 데이터 구조 | BSON (Binary JSON) | 시계열 데이터 (Time Series) |
| 사용 사례 | 비교적 복잡한 데이터 구조 및 검색 쿼리 | 모니터링, IoT 데이터 ... |
+ BSON
JSON의 이진 표현, MongoDB에서 효율적인 저장 및 데이터 탐색을 위해 내부적으로 사용
MongoDB 내부에서 어떤 동작을 하는 지, 공식 문서에서 가져왔다.
사내 웹서비스를 기획할 때 MongoDB와 InfluxDB를 둘 다 사용했던 적이 있는데, 다음과 같이 사용했다.

아래에 용어별 자세한 설명을 적어두었다.
_id 필드를 포함하며, MongoDB에서 자동으로 생성해줌insert_one, insert_manyfind_one, findupdate_onefrom fastapi import HTTPException
import motor.motor_asyncio
# MongoDB 클라이언트를 생성
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://id:pw@localhost:27017')
# DATABASE라는 이름의 데이터베이스에 접근
db = client.DATABASE
# 'users'라는 컬렉션에 접근 (접근시에 해당 컬렉션이 없으면 생성됨)
collection_users = db.users
# 사용자 정보 추가
async def add_user(user_entered):
# 인자로 받은 이메일과 일치하는 사용자가 이미 존재하는지 확인
user_info = await collection_users.find_one({"email":user_entered["email"]})
if not user_info:
await collection_users.insert_one(user_entered)
return user_info
# 이미 사용자가 존재하면 409 Conflict 에러 처리
raise HTTPException(status_code=409, detail="ACCOUNT_ALREADY_EXISTS")
# 사용자 정보 조회
async def get_user(email: str):
# 'users' 컬렉션에서 인자로 받은 email과 email 필드가 일치하는 문서를 하나 조회
user_info = await collection_users.find_one({"email":email})
if user_info:
return user_info
# 해당 email과 일치하는 필드가 없을 시에 404 에러 처리
raise HTTPException(status_code=404, detail="USER_NOT_FOUND")
# 사용자 목록 조회
async def get_user_list():
# 'users' 컬렉션에서 role 필드가 'user'인 모든 문서를 조회
if (user_info := collection_users.find({"role": "user"})) is not None:
# Cursor 객체를 Python 리스트로 변환 (to_list 비동기 메서드)
user_list = await user_info.to_list(length=100)
return user_list
# 사용자 정보 업데이트
async def update_user(user_entered):
# 인자로 받은 이메일과 일치하는 사용자가 존재하는지 확인
user_info = await get_user(user_entered["email"])
if user_info:
# "$set" 연산자를 사용하여 기존 정보에 새 데이터를 덮어씀
await collection_users.update_one({"email": user_entered["email"]}, {"$set": user_info})
return user_info
# 사용자가 없으면 404 Not Found 에러 처리
raise HTTPException(status_code=404, detail="USER_NOT_FOUND")
import motor.motor_asyncio
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://id:pw@localhost:27017')
db = client.DATABASE
collection_logs = db.logs
# logs 컬렉션에 "created_at"이라는 TTL 인덱스를 설정하여, 1년이 경과된 문서를 자동 삭제 되도록 함.
collection_logs.create_index([("created_at", 1)], expireAfterSeconds=365*86400)
...
async def add_log():
log = {
"content": "1년 뒤에 사라질 문서(Document)",
"created_at": datetime.utcnow() # TTL 인덱스 필드에 UTC 현재 시간을 할당
}
await collection_logs.insert_one(log)
return True
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
token = "token" # 인증 토큰
org = "my_org" # 조직 (Org) 이름
bucket= "my_bucket" # 버킷 (Bucket) 이름
# InfluxDB 클라이언트 생성
client = InfluxDBClient(url="http://localhost:8086", token=token, org=org)
# 데이터 쓰기 및 조회 API 설정
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
# IoT 측정값 기록
async def add_datain(device, timestamp):
# 동기식 쓰기 API 생성
write_api = client.write_api(write_options=SYNCHRONOUS)
# InfluxDB에 저장할 Point 객체 생성
point = Point("mem") \
.tag("data_type", "datain") \ # 태그 추가
.tag("mac", device["mac"]) \ # 태그 추가
.field("ip", device["ip"]) \ # 필드 추가
.field("offset", device["offset"] \ # 필드 추가
.time(datetime.fromtimestamp(timestamp), WritePrecision.NS) # 타임스탬프 할당
# 데이터 쓰기 (InfluxDB에 저장)
write_api.write(bucket, org, point)
return data
# IoT 측정값 조회
async def get_datain(query_info, device_list):
try:
res = []
for device_info in device_list:
res_per_device = {"mac": device_info['mac'], "offset_list": []}
# Flux 쿼리 작성 (InfluxDB 2.x)
query = f'''
from(bucket: "{bucket}")
|> range(start: {query_info["date_range"][0]}, stop: {query_info["date_range"][1]})
|> filter(fn: (r) => r._measurement == "mem")
|> filter(fn: (r) => r.data_type == "datain")
|> filter(fn: (r) => r.mac == "{device_info['mac']}")
|> filter(fn: (r) => r["_field"] == "offset")
'''
# 쿼리 실행 및 결과 조회
tables = query_api.query(query)
# 결과가 있는 경우, 각 테이블의 레코드를 순회
if len(tables) > 0:
for table in tables:
for row in table.records:
# JSON 형식으로 데이터 변환
device = jsonable_encoder(row.values)
if device["_value"] is not None:
# UTC to KST
time_str = row.values["_time"].strftime("%Y-%m-%d %H:%M:%S.%f")
time = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S.%f")
time += timedelta(hours=9)
res_per_device["offset_list"].append({
"time": time.strftime("%Y-%m-%d %H:%M:%S.%f"),
"offset": device["_value"]
})
res.append(res_per_device)
return res
except Exception as e:
print("e", e)
return []
async def add_datain(device, timestamp):
write_api = client.write_api(write_options=SYNCHRONOUS)
point = Point("mem") \
.tag("mac", device["mac"]) \
.field("offset", device["offset"] \
.time(int(time.time()), WritePrecision.NS) # 타임스탬프 할당 (UNIX timestamp)
write_api.write(bucket, org, point)
return data