在web项目中,我们常常需要处理许多异步任务和定时任务,例如发送邮件、调用第三方接口,批量处理文件、清除缓存、备份数据库等,使用celery可以帮助我们轻松做到这些。
本文以django为例,详细介绍如何将celery集成到django中。
1、celery工作原理
celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。它的执行单元为任务(task),利用多线程,如Eventlet,gevent等,它们能被并发地执行在单个或多个职程服务器(worker servers)上。任务能异步执行(后台运行)或同步执行(等待任务完成)。
2、django项目目录结构
3、配置
3.1、安装依赖
# 必选 pip install django pip install celery pip install redis # 可选 # windows下运行celery4.0以后版本,还需额外安装eventlet pip install eventlet # 推荐安装,需要设置定时或周期任务时安装 pip install django-celery-beat # 需要存储任务结果时安装 pip install django-celery-results # 需要监控celery任务运行状态 pip install folower
3.2、新建celery配置文件libs/celery/config.py(常用配置)
# 1、配置异步任务 # 设置结果存储 RESULT_BACKEND = 'redis://127.0.0.1:6379/1' # 设置代理人broker broker_url = 'redis://127.0.0.1:6379/0' # celery 的启动工作数量设置 CELERY_WORKER_CONCURRENCY = 20 # 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。 WORKER_PREFETCH_MULTIPLIER = 20 # 非常重要,有些情况下可以防止死锁 CELERYD_FORCE_EXECV = True # celery 的 worker 执行多少个任务后进行重启操作 CELERY_WORKER_MAX_TASKS_PER_CHILD = 100 # 禁用所有速度限制,如果网络资源有限,不建议开足马力。 WORKER_DISABLE_RATE_LIMITS = True # 明确指示在启动时进行连接重试 # BROKER_CONNECTION_RETRY_ON_STARTUP = True broker_connection_retry_on_startup = True # 2、配置定时任务 timezone = 'Asia/Shanghai' DJANGO_CELERY_BEAT_TZ_AWARE = False CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
3.3、在django项目的app的根目录,新建celery.py文件
import os from django.utils import timezone from celery import Celery from celery.schedules import crontab, timedelta from libs.celery import config # 设置环境变量 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.dev") app = Celery("apps") # 读取配置文件中的配置 app.config_from_object(config) # 自动从django注册的app中发现所有任务 app.autodiscover_tasks() # 解决时区问题,定时任务启动就循环输出 app.now = timezone.now
3.4、修改django项目的app的__init__.py
from libs.celery.config import app as celery_app __all__ = ("celery_app")
3.5、设置定时任务或周期任务,编辑刚刚创建的celery文件(apps.celery.py)
import os from django.utils import timezone from celery import Celery from celery.schedules import crontab, timedelta from libs.celery import config # 设置环境变量 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.dev") app = Celery("apps") # 读取配置文件中的配置 app.config_from_object(config) # 自动从django注册的app中发现所有任务 app.autodiscover_tasks() # 配置定时任务 app.conf.beat_schedule = { "add": { "task": "apps.system.user.tasks.add", "schedule": crontab(minute="*/1"), "args": (1, 2) } } # 解决时区问题,定时任务启动就循环输出 app.now = timezone.now
3.6、配置celery日志(不同的任务单独存放到不同的日志文件中)
- 自定义日志句柄(libs/celery/handler.py)
import os from logging import StreamHandler from celery import current_task from celery.signals import task_prerun, task_postrun class CeleryTaskLoggerHandler(StreamHandler): terminator = "\r\n" def __init__(self, *args, **kwargs): self.task_id_fd_mapper = {} super().__init__(*args, **kwargs) # 使用 celery的task信号,设置任务开始和结束时的执行的东西 # 主要是获取task_id 然后创建对应的独立任务日志文件 task_prerun.connect(self.on_task_start) task_postrun.connect(self.on_start_end) @staticmethod def get_current_task_id(): # celery 内置提供方法获取task_id if not current_task: return task_id = current_task.request.root_id return task_id def on_task_start(self, sender, task_id, **kwargs): # 这里是根据task_id 定义每个任务的日志文件存放 log_path = os.path.join('logs/celery/', f"{task_id}.log") f = open(log_path, 'a') self.task_id_fd_mapper[task_id] = f def on_start_end(self, sender, task_id, **kwargs): f = self.task_id_fd_mapper.pop(task_id, None) if f and not f.closed: f.close() self.task_id_fd_mapper.pop(task_id, None) def emit(self, record): # 自定义Handler必须要重写的一个方法 task_id = self.get_current_task_id() if not task_id: return try: f = self.task_id_fd_mapper.get(task_id) self.write_task_log(f, record) self.flush() except Exception: self.handleError(record) def write_task_log(self, f, record): # 日志的实际写入 if not f: raise ValueError('Not found thread task file') msg = self.format(record) f.write(msg) f.write(self.terminator) f.flush() def flush(self): for f in self.task_id_fd_mapper.values(): f.flush()
- 自定义信号处理(libs/celery/signal.py)
import logging from celery.signals import after_setup_logger from .handler import CeleryTaskLoggerHandler @after_setup_logger.connect def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs): if not logger: return task_handler = CeleryTaskLoggerHandler() task_handler.setLevel(loglevel) formatter = logging.Formatter(format) task_handler.setFormatter(formatter) logger.addHandler(task_handler)
- 绑定django应用与自定义信号处理函数(apps.apps.py)
from django.apps import AppConfig class AppsConfig(AppConfig): default_auto_field = 'django.db.models.BigAutoField' name = 'apps' def ready(self) -> None: from libs.celery.signal import add_celery_logger_handler return super().ready()
3.7、启动
# 启动celery中间件(这里使用redis) redis-server # 启动celery(apps是django的应用名) celery -A apps worker -l info # 启动beat celery -A apps beat -l info
还没有评论,来说两句吧...