[WebSocket] Django channels, celery 연동

코딩은 돈이 된다·2024년 7월 10일

웹 소켓을 하려고 하면서, 우선 적당히 consumers를 짜두고 오류 안나는 것을 확인하고 push를 했었다.
다른 개발들을 진행했는데, 너무 금방 끝나서 js를 테스트로 만들어서 실시간 통신을 테스트해보려고 했다.

우선 프로젝트의 구조를 설명해보자면

구상은 이랬다.
웹소켓 연결 -> 클라이언트에서 api요청 -> 해당 View에서 celery 작업을 호출 ->
gpt api를 통해 값을 받아온 후 group_send를 이용해 consumer로 전송 -> receive가 호출되어 클라이언트에게 출력

분명 가능할거라고 생각하고 코드도 이대로 작성했는데, 전혀 작동이 되지 않았다.
찾아보면 channel_layer.group_send로 하면 된다는데,

channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        'chat_chat_room',
        {
            'type': 'gpt_talk_message',
            'message': "메세지 전송",
        }
    )

task.py에서 호출하는 코드를 이런식으로 두고

    def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json.get('message')
        print("message: ", message)
        message_type = text_data_json.get('type')
        if message and message_type:
            self.send(text_data=json.dumps(
                {
                    'message': message,
                    'message_type': message_type,
                }
            ))
        else:
            self.send(text_data=json.dumps({
                'error': 'Invalid message format. "message" and "type" are required fields.'
            }))

consumers.py의 receive 코드를 이런식으로 작성했다.
다른 부분은 모르겠지만, 이렇게 하면 print 부분은 무조건 출력이 될 것이라고 생각했기 때문에 진행했는데,
전혀 안뜨더라..
물론 celery의 return값은 뜬다...
아무 의미 없지만, celery 작업은 문제가 없다고 생각했고, consumers로 값이 넘어가지지 않는다는 것을 알 수 있었다.

그래서 이 방법이 틀렸다고 생각하고 다른 방법을 모색해 보았다.

클라이언트가 소켓으로 넘긴 값으로 celery 작업을 호출하면 어떨까?

인터넷을 찾아보며 나오는 것들은 전부, 채팅처럼 클라이언트가 서버에 값을 보내면 서버에서 그 값을 가지고 다시 클라이언트에게 전달하는 내용이었다.

이것에 착안하여,
"어차피 셀러리 작업들은 모두 등록이 되어있으니, api 요청 말고 소켓 요청을 통해 보내주면 어떨까?"
라는 생각을 하게 되었다.

결론적으로 되긴 한다.

api 설계 시에 request에 넣어서 보내던 값들을, socket을 통해 보내고
consumers에서는 해당 타입에 맞는 셀러리 작업을 호출하여 그 결과를 다시 클라이언트에게 전송해준다.

뭐 된다면 그만이긴 하지만, 큰 고민이 생겼다.

그럼 기존 api 들은..?

말 그대로이다. 소켓으로만 처리하면 기존 chat room에서의 api들은 아무 쓸모가 없어진다.
모든 사람이 이렇게 개발을 진행하는지도 모르겠고, 내 방식이 맞는지도 모르겠어서 이리저리 자문을 구하고자 한다..

대공사

아무래도 consumers.py에 대한 이해가 부족했던 것 같다.
message의 타입에 따라서 실행되는 함수가 존재했던 것 같아서, 다시 원래대로 코드를 돌리고 message 타입을 설정한 뒤에 실행을 해주었다.

    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        'chat_room',
        {
            'type': 'gpt_talk_message',
            'message': result,
        }
    )

해당 task에서의 웹소켓 호출을 받아주기 위해, 타입 이름과 일치하는 함수를 생성해주었다.

    def gpt_talk_message(self, event):
        message = event['message']
        message_type = event['type']
        print("gpt_talk_message", message, message_type)
        self.send(text_data=json.dumps({
            'message': message,
            'type': message_type
        }))

결과는?

안된다..
print 문 자체가 출력이 되지 않는다. 원래 기대한 상황은 print문은 무조건 출력이 되어야 하는데,
안되는 것을 보니 해당 함수 자체가 호출이 안되는 것 같다.

도대체 이유를 모르겠다..

문제 발견

다른 어떤 곳도 아닌, settings.py쪽 문제였다.
난 크게 대수롭지 않게 CHANNEL_LAYERS를 인메모리로 설정해두었는데, 이 부분이 문제였던 것 같다..

    CHANNEL_LAYERS = {
        'default': {
            'BACKEND': 'channels.layers.InMemoryChannelLayer',
        },
    }

소켓으로 호출하는 형식으로 할 때에는 채널 내에서만 동작하기 때문에, 문제 없이 동작을 했다.

하지만, 내가 구현한 환경은 celery -> django-channels로 값을 넘기는 채널 간 이동이 필요했기 때문에, 메시지 브로커가 꼭 필요했던 것이다..
나는 많이 사용한다는 redis를 적용하였다.

	CHANNEL_LAYERS = {
        'default': {
            'BACKEND': 'channels_redis.core.RedisChannelLayer',
            'CONFIG': {
                "hosts": [('redis', 6379)],
            },
        },
    }

결과는 성공...
이제 있던 코드들 전부 고쳐야겠다.....

지금까지 한 이틀간 사투를 벌이며 settings.py에는 전혀 문제가 없을 것이라고 생각했는데,
막상 settings.py 쪽 문제였고, 해결하니 허무하다.

이미 짠 세팅도 다시 보자

0개의 댓글