环境:
python 3.7.3
pip install redis
pip install --upgrade https://github.com/celery/celery/tarball/master
项目目录结构:
celery.py内容
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'OpsMonitorAlert.settings') from django.conf import settings app = Celery('OpsMonitorAlert ') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
项目目录下的__init__.py
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('celery_app',)
ops目录下的task.py(这个根据自己的需求自己定义)
# -*- coding:utf-8 -*- # Create your tasks here from __future__ import absolute_import, unicode_literals from celery import shared_task from django.conf import settings from .task_api.run_rundeck import OpsTaskProcess from website.models.rundeck import RDInfo, RDJobEvent from common.utils import get_object_or_none,get_objects_filters import os @shared_task def rundeck_run_job(env, project_name, job_name, param=None or {}): ''' :param tid: Asset Rundeck RDPlatform id :param kwargs: :return: ''' rd_obj = get_object_or_none(RDInfo, name=env) if rd_obj: print (rd_obj.name, rd_obj.url) return True else: return False
celery启动:
注:在project下启动,即跟manage.py在同一个目录下执行启动。
celery -A OpsMonitorAlert worker -l info