django+celery最全使用教程

django+celery最全使用教程

码农世界 2024-05-16 后端 74 次浏览 0个评论

        在web项目中,我们常常需要处理许多异步任务和定时任务,例如发送邮件、调用第三方接口,批量处理文件、清除缓存、备份数据库等,使用celery可以帮助我们轻松做到这些。

        本文以django为例,详细介绍如何将celery集成到django中。

1、celery工作原理

        celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。它的执行单元为任务(task),利用多线程,如Eventlet,gevent等,它们能被并发地执行在单个或多个职程服务器(worker servers)上。任务能异步执行(后台运行)或同步执行(等待任务完成)。

2、django项目目录结构

django+celery最全使用教程

3、配置

3.1、安装依赖

# 必选
pip install django
pip install celery
pip install redis
# 可选
# windows下运行celery4.0以后版本,还需额外安装eventlet
pip install eventlet
# 推荐安装,需要设置定时或周期任务时安装
pip install django-celery-beat
# 需要存储任务结果时安装
pip install django-celery-results
# 需要监控celery任务运行状态
pip install folower

3.2、新建celery配置文件libs/celery/config.py(常用配置)

# 1、配置异步任务
# 设置结果存储
RESULT_BACKEND = 'redis://127.0.0.1:6379/1'
# 设置代理人broker
broker_url = 'redis://127.0.0.1:6379/0'
# celery 的启动工作数量设置
CELERY_WORKER_CONCURRENCY = 20
# 任务预取功能,就是每个工作的进程/线程在获取任务的时候,会尽量多拿 n 个,以保证获取的通讯成本可以压缩。
WORKER_PREFETCH_MULTIPLIER = 20
# 非常重要,有些情况下可以防止死锁
CELERYD_FORCE_EXECV = True
# celery 的 worker 执行多少个任务后进行重启操作
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
# 禁用所有速度限制,如果网络资源有限,不建议开足马力。
WORKER_DISABLE_RATE_LIMITS = True
# 明确指示在启动时进行连接重试
# BROKER_CONNECTION_RETRY_ON_STARTUP = True
broker_connection_retry_on_startup = True
# 2、配置定时任务
timezone = 'Asia/Shanghai'
DJANGO_CELERY_BEAT_TZ_AWARE = False
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

3.3、在django项目的app的根目录,新建celery.py文件

import os
from django.utils import timezone
from celery import Celery
from celery.schedules import crontab, timedelta
from libs.celery import config
# 设置环境变量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.dev")
app = Celery("apps")
# 读取配置文件中的配置
app.config_from_object(config)
# 自动从django注册的app中发现所有任务
app.autodiscover_tasks()
# 解决时区问题,定时任务启动就循环输出
app.now = timezone.now

3.4、修改django项目的app的__init__.py

from libs.celery.config import app as celery_app
__all__ = ("celery_app")

3.5、设置定时任务或周期任务,编辑刚刚创建的celery文件(apps.celery.py)

import os
from django.utils import timezone
from celery import Celery
from celery.schedules import crontab, timedelta
from libs.celery import config
# 设置环境变量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings.dev")
app = Celery("apps")
# 读取配置文件中的配置
app.config_from_object(config)
# 自动从django注册的app中发现所有任务
app.autodiscover_tasks()
# 配置定时任务
app.conf.beat_schedule = {
    "add": {
        "task": "apps.system.user.tasks.add",
        "schedule": crontab(minute="*/1"),
        "args": (1, 2)
    }
}
# 解决时区问题,定时任务启动就循环输出
app.now = timezone.now

3.6、配置celery日志(不同的任务单独存放到不同的日志文件中)

  1. 自定义日志句柄(libs/celery/handler.py)
import os
from logging import StreamHandler
from celery import current_task
from celery.signals import task_prerun, task_postrun
class CeleryTaskLoggerHandler(StreamHandler):
    terminator = "\r\n"
    def __init__(self, *args, **kwargs):
        self.task_id_fd_mapper = {}
        super().__init__(*args, **kwargs)
        # 使用 celery的task信号,设置任务开始和结束时的执行的东西
        # 主要是获取task_id 然后创建对应的独立任务日志文件
        task_prerun.connect(self.on_task_start)
        task_postrun.connect(self.on_start_end)
    @staticmethod
    def get_current_task_id():
        # celery 内置提供方法获取task_id
        if not current_task:
            return
        task_id = current_task.request.root_id
        return task_id
    def on_task_start(self, sender, task_id, **kwargs):
        # 这里是根据task_id 定义每个任务的日志文件存放
        log_path = os.path.join('logs/celery/', f"{task_id}.log")
        f = open(log_path, 'a')
        self.task_id_fd_mapper[task_id] = f
    def on_start_end(self, sender, task_id, **kwargs):
        f = self.task_id_fd_mapper.pop(task_id, None)
        if f and not f.closed:
            f.close()
        self.task_id_fd_mapper.pop(task_id, None)
    def emit(self, record):
        # 自定义Handler必须要重写的一个方法
        task_id = self.get_current_task_id()
        if not task_id:
            return
        try:
            f = self.task_id_fd_mapper.get(task_id)
            self.write_task_log(f, record)
            self.flush()
        except Exception:
            self.handleError(record)
    def write_task_log(self, f, record):
        # 日志的实际写入
        if not f:
            raise ValueError('Not found thread task file')
        msg = self.format(record)
        f.write(msg)
        f.write(self.terminator)
        f.flush()
    def flush(self):
        for f in self.task_id_fd_mapper.values():
            f.flush()
  1. 自定义信号处理(libs/celery/signal.py)
import logging
from celery.signals import after_setup_logger
from .handler import CeleryTaskLoggerHandler
@after_setup_logger.connect
def add_celery_logger_handler(sender=None, logger=None, loglevel=None, format=None, **kwargs):
    if not logger:
        return
    task_handler = CeleryTaskLoggerHandler()
    task_handler.setLevel(loglevel)
    formatter = logging.Formatter(format)
    task_handler.setFormatter(formatter)
    logger.addHandler(task_handler)
  1. 绑定django应用与自定义信号处理函数(apps.apps.py)
from django.apps import AppConfig
class AppsConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'apps'
    def ready(self) -> None:
        from libs.celery.signal import add_celery_logger_handler
        return super().ready()

3.7、启动

# 启动celery中间件(这里使用redis)
redis-server
# 启动celery(apps是django的应用名)
celery -A apps worker -l info
# 启动beat
celery -A apps beat -l info

转载请注明来自码农世界,本文标题:《django+celery最全使用教程》

百度分享代码,如果开启HTTPS请参考李洋个人博客
每一天,每一秒,你所做的决定都会改变你的人生!

发表评论

快捷回复:

评论列表 (暂无评论,74人围观)参与讨论

还没有评论,来说两句吧...

Top