python3 django2.5 celery4.3 redis 分布式异步任务调度

环境:

python 3.7.3

pip install redis

pip install --upgrade https://github.com/celery/celery/tarball/master


项目目录结构:



celery.py内容


from __future__ import absolute_import, unicode_literals
import os
from celery import Celery


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'OpsMonitorAlert.settings')
from django.conf import settings

app = Celery('OpsMonitorAlert ')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))


项目目录下的__init__.py


from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ('celery_app',)



ops目录下的task.py(这个根据自己的需求自己定义)

# -*- coding:utf-8 -*-
# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from django.conf import settings
from .task_api.run_rundeck import OpsTaskProcess
from website.models.rundeck import RDInfo, RDJobEvent
from common.utils import get_object_or_none,get_objects_filters

import os
@shared_task
def rundeck_run_job(env, project_name, job_name, param=None or {}):
    '''

    :param tid: Asset Rundeck RDPlatform id
    :param kwargs:
    :return:
    '''
    rd_obj = get_object_or_none(RDInfo, name=env)
    if rd_obj:

        print (rd_obj.name, rd_obj.url)
        return True
    else:
        return False


celery启动:

注:在project下启动,即跟manage.py在同一个目录下执行启动。


celery -A OpsMonitorAlert worker -l info


暂无评论