사용자 요청이 있을 때마다 크롤링을 하는데 원하는 결과를 받아볼 때까지 소요되는 시간을 줄이고 싶음.
그래서 생각한 게 5~10분 간격으로 Celery가 크롤링 작업을 해서 이전 작업물과 비교해서 혹시 다르면 카카오톡 챗봇 event api으로 최신 보안 뉴스를 사용자에게 전송하고자 함.
Celery는 task queue로 실시간 프로세싱과 task 스케줄링에 초점을 맞췄다.
message를 통해 통신하고, client와 worker 사이에 broker를 통해 연결되어 있다.
Celery system은 많은 worker와 broker로 이루어져 있고 이로 인해 높은 가용성과 수평적 확장을 가능하게 한다.
Celery는 파이썬으로 작성되었지만, 어떤 언어로든 Celery를 이용할 수 있다.
Celery는 메시지를 보내고 받기 위해 message transport(메시지 전송)이 필요한데, RabbitMQ나 Redis를 Message Broker로 사용한다.
Broker가 Worker에게 메시지를 전달하면 Worker는 작업을 비동기로 수행한다.
Task queue란 스레드나 머신으로 일을 분배하는 방법으로 사용된다.
Task queue의 입력은 task라고 불리는 work의 단위
Worker는 새롭게 할 일이 없는 지 Task queue를 계속 모니터링하고 있다.
AMQP를 따라는 오픈소스 메시지 브로커로 기능이 완전하고 안정적이며 내구성이 뛰어납니다.
응용 프로그램에게 메시지를 주고 받을 수 있으며, 메시지가 수신될 때까지 안전하게 있을 수 있도록 공용 플랫폼을 제공함.
따라서 메시지를 많은 사용자에게 전달하거나, 요청에 대한 처리 시간이 길 때, 해당 요청을 다른 API에게 위임하고 빠른 응답을 할 때 사용한다.
메시지 우선 순위 기능을 가지고 있고 Redis와 비교했을 때 더 다양한 기능을 제공한다.
Redis는 기능이 완전하지만 갑작스ㅓ운 종료 또는 전원 장애 시 데이터 손실에 매우 취약하다.
인메모리 데이터베이스, 컴퓨터 메모리를 이용한(in-memory) Cache 서버
Key-Value를 이용해 Celery가 처리할 작업을 Celery에 보낸 후 Cache에서 해당 Key를 제거하는 방식으로 작동
- 메모리에서 Cache를 가져다 쓰기 때문에 속도가 매우 빠름
- 매우 빠른 서비스 및 메모리 내 기능을 제공하기 때문에 짧은 보존 메시지에 적합
- 큰 메시지를 처리할 때는 대기 시간이 오래 걸림
아래의 명령어들을 차례로 입력하게 되면 다음과 같은 작업을 수행하게 되는 것이다.
- wget http://download.redis.io/redis-stable.tar.gz: redis-stable.tar.gz를 다운로드받는다.
- tar xvzf redis-stable.tar.gz: redis-stable.tar.gz를 압축해제한다.
- cd redis-stable: 압축 해제한 redis-stable 디렉토리 경로로 이동한다.
- make: redis를 빌드한다.
# redis terminal > wget http://download.redis.io/redis-stable.tar.gz > tar xvzf redis-stable.tar.gz > cd redis-stable > make cd src && make all make[1]: Entering directory '/srv/boanbot/redis-stable/src' CC Makefile.dep rm -rf redis-server redis-sentinel redis-cli redis-benchmark redis-check-rdb redis-check-aof *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark rm -f adlist.d quicklist.d ae.d anet.d dict.d server.d sds.d zmalloc.d lzf_c.d lzf_d.d pqsort.d zipmap.d sha1.d ziplist.d release.d networking.d util.d object.d db.d replication.d rdb.d t_string.d t_list.d t_set.d t_zset.d t_hash.d config.d aof.d pubsub.d multi.d debug.d sort.d intset.d syncio.d cluster.d crc16.d endianconv.d slowlog.d scripting.d bio.d rio.d rand.d memtest.d crcspeed.d crc64.d bitops.d sentinel.d notify.d setproctitle.d blocked.d hyperloglog.d latency.d sparkline.d redis-check-rdb.d redis-check-aof.d geo.d lazyfree.d module.d evict.d expire.d geohash.d geohash_helper.d childinfo.d defrag.d siphash.d rax.d t_stream.d listpack.d localtime.d lolwut.d lolwut5.d lolwut6.d acl.d gopher.d tracking.d connection.d tls.d sha256.d timeout.d setcpuaffinity.d anet.d adlist.d dict.d redis-cli.d zmalloc.d release.d ae.d crcspeed.d crc64.d siphash.d crc16.d ae.d anet.d redis-benchmark.d adlist.d dict.d zmalloc.d siphash.d (cd ../deps && make distclean) make[2]: Entering directory '/srv/boanbot/redis-stable/deps' (cd hiredis && make clean) > /dev/null || true (cd linenoise && make clean) > /dev/null || true ...
- 다음 sudo make install을 명령어를 입력해서 redis를 설치한다.
# redis terminal > sudo make install cd src && make install make[1]: Entering directory '/srv/boanbot/redis-stable/src' CC Makefile.dep Hint: It's a good idea to run 'make test' ;) INSTALL install INSTALL install INSTALL install INSTALL install INSTALL install make[1]: Leaving directory '/srv/boanbot/redis-stable/src' ubuntu@ip-172-31-27-226 /srv/boanbot/redis-stable master ± ⬆ ✹ ✭ cd .. (1d3h30m) 07:50:59 ubuntu@ip-172-31-27-226 /srv/boanbot master ± ⬆ ✹ ✭ redis-server (1d3h30m) 07:51:05 131432:C 02 Feb 2021 07:51:07.217 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo 131432:C 02 Feb 2021 07:51:07.217 # Redis version=6.0.10, bits=64, commit=8af42c16, modified=1, pid=131432, just started 131432:C 02 Feb 2021 07:51:07.217 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf
- redis-server를 입력하면 아래와 같이 redis가 잘 실행되는 것을 확인할 수 있다.
redis-server # redis terminal 131432:C 02 Feb 2021 07:51:07.217 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo 131432:C 02 Feb 2021 07:51:07.217 # Redis version=6.0.10, bits=64, commit=8af42c16, modified=1, pid=131432, just started 131432:C 02 Feb 2021 07:51:07.217 # Warning: no config file specified, using the default config. In order to specify a config file use redis-server /path/to/redis.conf 131432:M 02 Feb 2021 07:51:07.218 * Increased maximum number of open files to 10032 (it was originally set to 1024). _._ _.-``__ ''-._ _.-`` `. `_. ''-._ Redis 6.0.10 (8af42c16/1) 64 bit .-`` .-```. ```\/ _.,_ ''-._ ( ' , .-` | `, ) Running in standalone mode |`-._`-...-` __...-.``-._|'` _.-'| Port: 6379 | `-._ `._ / _.-' | PID: 131432 `-._ `-._ `-./ _.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | http://redis.io `-._ `-._`-.__.-'_.-' _.-' |`-._`-._ `-.__.-' _.-'_.-'| | `-._`-._ _.-'_.-' | `-._ `-._`-.__.-'_.-' _.-' `-._ `-.__.-' _.-' `-._ _.-' `-.__.-' 131432:M 02 Feb 2021 07:51:07.221 # Server initialized 131432:M 02 Feb 2021 07:51:07.221 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect. 131432:M 02 Feb 2021 07:51:07.222 * Loading RDB produced by version 4.0.9 131432:M 02 Feb 2021 07:51:07.222 * RDB age 1555205 seconds 131432:M 02 Feb 2021 07:51:07.222 * RDB memory usage when created 0.45 Mb 131432:M 02 Feb 2021 07:51:07.223 * DB loaded from disk: 0.001 seconds 131432:M 02 Feb 2021 07:51:07.223 * Ready to accept connections
redis를 잘 설치해서 잘 돌아가는 것까지 확인했으니 celery를 설치한다.
pip install celery 명령어를 입력해서 celery를 설치한다.
# celery terminal > pip install celery
Celery Instance란 Celery application 혹은 app이라고 부르는데, task 생성 및 worker 관리 등 Celery에서 원하는 작업을 위한 entry-point로 사용되므로 꼭 생성해주어야 한다.
1. 먼저 tasks.py라는 python 파일을 하나 만들어서 아래와 같이 작성한다.
# celery terminal > vim tasks.py from celery import Celery app = Celery('task', broker='redis://guest@localhost//') @app.task def add(x, y): return x + y
Celery의 첫 번째 parameter는 현재 module의 이름이다.
Celery의 두 번째 parameter는 broker keyword argument로 사용하고자 하는 message broker의 URL을 명시한다.이렇게 까지 하면 간단한 더하기 연산을 하는 add라고 불리는 single task를 정의한 것이다.
2. 작성을 완료하였으면 아래 명령어를 입력해서 celery worker server를 실행해본다.
# celery terminal > celery -A tasks worker -l INFO -------------- celery@ip-172-31-27-226 v5.0.5 (singularity) --- ***** ----- -- ******* ---- Linux-5.4.0-1029-aws-x86_64-with-glibc2.29 2021-02-02 07:51:16 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7efc898a5cd0 - ** ---------- .> transport: redis://guest@localhost:6379// - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . tasks.add [2021-02-02 07:51:17,246: INFO/MainProcess] Connected to redis://guest@localhost:6379// [2021-02-02 07:51:17,254: INFO/MainProcess] mingle: searching for neighbors [2021-02-02 07:51:18,274: INFO/MainProcess] mingle: all alone [2021-02-02 07:51:18,284: INFO/MainProcess] celery@ip-172-31-27-226 ready.
이렇게 되면 celery 준비가 완료되었다.
3. 이제 다른 터미널을 하나 더 켜서 task를 호출해서 worker에서 잘 처리되는지 확인해본다.
# other terminal > ipython Python 3.8.5 (default, Jan 27 2021, 05:45:55) Type 'copyright', 'credits' or 'license' for more information IPython 7.20.0 -- An enhanced Interactive Python. Type '?' for help. In [1]: from tasks import add In [2]: result = add.delay(4,4)
delay() 메소드를 사용해서 task를 호출한다.
그러면 이전에 시작된 worker에서 task가 처리되는 것을 아래의 화면에서 확인할 수 있다.
# celery terminal [2021-02-02 08:07:06,742: INFO/MainProcess] Received task: tasks.add[6159b957-939f-44ad-958f-a642ff8a3aab] > [2021-02-02 08:07:06,743: INFO/ForkPoolWorker-1] Task tasks.add[6159b957-939f-44ad-958f-a642ff8a3aab] succeeded in 0.00024276599287986755s: 8
다시 other terminal로 이동해서 다음과 같이 입력했더니 아래와 같은 결과가 나왔다.
# other terminal In [3]: result.ready() --------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-3-d851513b5988> in <module> ----> 1 result.ready() ~/.pyenv/versions/3.8.5/envs/boanbot/lib/python3.8/site-packages/celery/result.py in ready(self) 315 for retry then :const:`False` is returned. 316 """ --> 317 return self.state in self.backend.READY_STATES 318 319 def successful(self): ~/.pyenv/versions/3.8.5/envs/boanbot/lib/python3.8/site-packages/celery/result.py in state(self) 475 then contains the tasks return value. 476 """ --> 477 return self._get_task_meta()['status'] 478 status = state # XXX compat 479 ~/.pyenv/versions/3.8.5/envs/boanbot/lib/python3.8/site-packages/celery/result.py in _get_task_meta(self) 414 def _get_task_meta(self): 415 if self._cache is None: --> 416 return self._maybe_set_cache(self.backend.get_task_meta(self.id)) 417 return self._cache 418 ~/.pyenv/versions/3.8.5/envs/boanbot/lib/python3.8/site-packages/celery/backends/base.py in get_task_meta(self, task_id, cache) 513 while True: 514 try: --> 515 meta = self._get_task_meta_for(task_id) 516 break 517 except Exception as exc: AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
기본적으로 celery는 result backend 설정이 안되어있다.
그러니까 celery가 task 결과를 확인하려면 celery를 따로 설정해줘야한다.
작업 결과를 저장하는데 사용합니다.
위에서도 설명했듯이 Celery는 기본적으로 작업 결과를 저장하지 않는다.
따라서 작업 결과를 저장하기 위해서는 Celery에 내장된 result backend를 골라서 설정하면 된다.
SQLAlchemy / Django ORM, MongoDB, Memcached, Redis, RPC(RabbitMQ/AMQP) 및 원하는 result backend 등 중 골라서 쓰면 된다.
내가 겪은 문제는 다음과 같다.
메시지 브로커로 redis를 사용했는데, result backend로도 redis가 쓰고 싶었다.
tasks.py를 다음과 같이 작성을 하고 celery를 실행했다.
#tasks.py from celery import Celery app = Celery('tasks', broker='redis://localhost', backend='redis://localhost') @app.task def add(x, y): return x + y
# celery terminal celery -A tasks worker -l INFO -------------- celery@ip-172-31-27-226 v5.0.5 (singularity) --- ***** ----- -- ******* ---- Linux-5.4.0-1029-aws-x86_64-with-glibc2.29 2021-02-03 03:09:59 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7fb08d6db7f0 - ** ---------- .> transport: redis://localhost:6379// - ** ---------- .> results: redis://localhost/ ## 주목할 부분 - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . tasks.add [2021-02-03 03:09:59,993: INFO/MainProcess] Connected to redis://localhost:6379// [2021-02-03 03:10:00,010: INFO/MainProcess] mingle: searching for neighbors [2021-02-03 03:10:01,068: INFO/MainProcess] mingle: all alone [2021-02-03 03:10:01,082: INFO/MainProcess] celery@ip-172-31-27-226 ready.
config 부분을 보면 app, transport, results 등등 여러 설정 요소들이 있다.
주목할 부분은 transport와 results인데, transport는 메시지 브로커를 의미하고 results는 result backend를 의미한다.
transport는 redis://localhost:6379//로 redis와 잘 연결된 것을 확인할 수 있는데,
results는 redis://localhost/로 뭔가 이상하다.
같은 redis로 메시지 브로커와 result backend를 동시에 처리할 수는 없는 것인가 싶어서 일단 celery config 부분을 바꿔보았다.
# tasks.py from celery import Celery app = Celery('tasks', broker='redis://localhost:6379', backend='redis://localhost:6379') @app.task def add(x, y): return x + y
# celery terminal > celery -A tasks worker -l INFO -------------- celery@ip-172-31-27-226 v5.0.5 (singularity) --- ***** ----- -- ******* ---- Linux-5.4.0-1029-aws-x86_64-with-glibc2.29 2021-02-03 04:44:35 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: tasks:0x7f52d44c4910 - ** ---------- .> transport: redis://localhost:6379// - ** ---------- .> results: redis://localhost:6379/ - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . tasks.add [2021-02-03 04:44:35,382: INFO/MainProcess] Connected to redis://localhost:6379// [2021-02-03 04:44:35,390: INFO/MainProcess] mingle: searching for neighbors [2021-02-03 04:44:36,409: INFO/MainProcess] mingle: all alone [2021-02-03 04:44:36,418: INFO/MainProcess] celery@ip-172-31-27-226 ready.
celery config 부분에서 results 부분에 redis://localhost:6379/로 잘 연결된 것을 확인할 수 있다.
다시 한번 result를 확인해보면 다음과 같다.
# other terminal > ipython Python 3.8.5 (default, Jan 27 2021, 05:45:55) Type 'copyright', 'credits' or 'license' for more information IPython 7.20.0 -- An enhanced Interactive Python. Type '?' for help. In [1]: from tasks import add In [2]: result = add.delay(4, 4) In [3]: result.ready() Out[3]: True In [4]: result.get() Out[4]: 8
- 분산 비동기 작업 처리를 위한 Celery 첫걸음: https://jonnung.dev/python/2018/12/22/celery-distributed-task-queue/
- redis 설치하기: https://dgkim5360.tistory.com/entry/install-redis-for-linux-or-windows
- celery 기본 문서: https://docs.celeryproject.org/en/stable/index.html
잘 보고 갑니다.