386 lines
16 KiB
Python
386 lines
16 KiB
Python
#!/bin/python
|
|
#coding: utf-8
|
|
# +-------------------------------------------------------------------
|
|
# | system: django-vue-lyadmin
|
|
# +-------------------------------------------------------------------
|
|
# | Author: lybbn
|
|
# +-------------------------------------------------------------------
|
|
# | QQ: 1042594286
|
|
# +-------------------------------------------------------------------
|
|
|
|
# ------------------------------
|
|
# django_celery_beat PeriodicTask view
|
|
# https://docs.celeryq.dev/en/stable/reference/celery.schedules.html#celery.schedules.crontab
|
|
# ------------------------------
|
|
|
|
import json
|
|
from django_celery_beat.models import PeriodicTask,CrontabSchedule, cronexp,PeriodicTasks,IntervalSchedule
|
|
from django_celery_results.models import TaskResult
|
|
from rest_framework import serializers
|
|
|
|
from utils.jsonResponse import SuccessResponse, ErrorResponse,DetailResponse
|
|
from utils.serializers import CustomModelSerializer
|
|
from utils.viewset import CustomModelViewSet
|
|
from apps.lycrontab.views.celery_interval_schedule import IntervalScheduleSerializer
|
|
from utils.common import get_parameter_dic,ast_convert
|
|
from rest_framework.exceptions import APIException
|
|
from apps.lycrontab.filters import CeleryPeriodicTaskFilterSet
|
|
from application import settings
|
|
from django.db import transaction
|
|
|
|
CrontabSchedule.__str__ = lambda self : '{0} {1} {2} {3} {4}'.format(
|
|
cronexp(self.minute), cronexp(self.hour),
|
|
cronexp(self.day_of_month), cronexp(self.month_of_year),
|
|
cronexp(self.day_of_week)
|
|
)
|
|
|
|
def get_task_list():
|
|
"""
|
|
获取本地所有app目录中所有的tasks任务文件中“lytask_”开头的任务方法
|
|
:return:
|
|
"""
|
|
task_list = []
|
|
task_dict_list = []
|
|
for app in settings.INSTALLED_APPS:
|
|
try:
|
|
exec(f"""
|
|
from {app} import tasks
|
|
for t in [i for i in dir(tasks) if i.startswith('lytask_')]:
|
|
task_dict = dict()
|
|
task_dict['label'] = '{app}.tasks.' + t
|
|
task_dict['value'] = '{app}.tasks.' + t
|
|
task_list.append('{app}.tasks.' + t)
|
|
task_dict_list.append(task_dict)
|
|
""")
|
|
except ImportError :
|
|
pass
|
|
return {'task_list': task_list, 'task_dict_list': task_dict_list}
|
|
|
|
|
|
|
|
def cronConvert(cron):
|
|
"""
|
|
解析cron表达式为字典形式
|
|
:param cron: * * * * * 分、时、天、月、周
|
|
:return:
|
|
"""
|
|
cron = cron.split(" ")
|
|
result = {
|
|
"minute":cron[0],
|
|
"hour":cron[1],
|
|
"day":cron[2],
|
|
"month":cron[3],
|
|
"week":cron[4]
|
|
}
|
|
return result
|
|
|
|
class IntervalScheduleSerializer(CustomModelSerializer):
|
|
|
|
class Meta:
|
|
model = IntervalSchedule
|
|
read_only_fields = ["id"]
|
|
fields = '__all__'
|
|
|
|
class CrontabScheduleSerializer(CustomModelSerializer):
|
|
|
|
class Meta:
|
|
model = CrontabSchedule
|
|
read_only_fields = ["id"]
|
|
exclude = ('timezone',)
|
|
# fields = '__all__'
|
|
|
|
class PeriodicTaskSerializer(CustomModelSerializer):
|
|
|
|
crontab = serializers.StringRelatedField(read_only=True)
|
|
crontab_id = serializers.SerializerMethodField(read_only=True)
|
|
interval = serializers.SerializerMethodField(read_only=True)
|
|
interval_id = serializers.SerializerMethodField(read_only=True)
|
|
type = serializers.SerializerMethodField(read_only=True)
|
|
|
|
def get_crontab_id(self,obj):
|
|
return obj.crontab_id
|
|
|
|
def get_interval(self,obj):
|
|
if obj.interval_id:
|
|
return {
|
|
'every':obj.interval.every,
|
|
'period':obj.interval.period
|
|
}
|
|
else:
|
|
return None
|
|
|
|
def get_interval_id(self,obj):
|
|
return obj.interval_id
|
|
|
|
def get_type(self,obj):
|
|
type = 0
|
|
if obj.crontab_id:
|
|
type = 1
|
|
return type
|
|
|
|
class Meta:
|
|
model = PeriodicTask
|
|
read_only_fields = ["id"]
|
|
fields = '__all__'
|
|
|
|
class PeriodicTaskCreateUpdateSerializer(CustomModelSerializer):
|
|
|
|
class Meta:
|
|
model = PeriodicTask
|
|
read_only_fields = ["id","total_run_count","date_changed"]
|
|
fields = '__all__'
|
|
|
|
|
|
class PeriodicTaskModelViewSet(CustomModelViewSet):
|
|
"""
|
|
任务数据模型
|
|
"""
|
|
queryset = PeriodicTask.objects.exclude(name="celery.backend_cleanup").order_by("id")
|
|
serializer_class = PeriodicTaskSerializer
|
|
create_serializer_class = PeriodicTaskCreateUpdateSerializer
|
|
update_serializer_class = PeriodicTaskCreateUpdateSerializer
|
|
filterset_class = CeleryPeriodicTaskFilterSet
|
|
|
|
@transaction.atomic
|
|
def create(self, request, *args, **kwargs):
|
|
body_data = request.data.copy()
|
|
type = int(body_data.pop('type'))
|
|
args1 = body_data.get('args','')
|
|
if not isinstance(ast_convert(args1),list):
|
|
return ErrorResponse(msg="参数格式错误")
|
|
if type not in [0,1]:
|
|
return ErrorResponse(msg="type类型错误")
|
|
if type == 1:
|
|
body_data.pop('interval')
|
|
cron = body_data.get('crontab')
|
|
cron_dict = cronConvert(cron)
|
|
minute = cron_dict["minute"]
|
|
hour = cron_dict["hour"]
|
|
day = cron_dict["day"]
|
|
month = cron_dict["month"]
|
|
week = cron_dict["week"]
|
|
cron_data = {
|
|
'minute': minute,
|
|
'hour': hour,
|
|
'day_of_week': week,
|
|
'day_of_month': day,
|
|
'month_of_year': month
|
|
}
|
|
task = body_data.get('task')
|
|
result = None
|
|
task_list = get_task_list()
|
|
task_list = task_list.get('task_list')
|
|
if task in task_list:
|
|
# 添加crontab
|
|
serializer = CrontabScheduleSerializer(data=cron_data, request=request)
|
|
serializer.is_valid(raise_exception=True)
|
|
self.perform_create(serializer)
|
|
|
|
# 添加任务
|
|
body_data['crontab'] = serializer.data.get('id')
|
|
# body_data['enabled'] = False
|
|
# header = {}
|
|
# header['periodic_task_name'] = body_data['name']
|
|
# header['task_name'] = body_data['task']
|
|
# body_data['headers'] = json.dumps(header)
|
|
serializer = self.get_serializer(data=body_data, request=request)
|
|
res = serializer.is_valid()
|
|
if not res:
|
|
raise APIException({"msg": f"添加失败,已经有一个名为 {body_data['name']} 的任务了"}, code=4000)
|
|
self.perform_create(serializer)
|
|
result = serializer.data
|
|
task_obj = PeriodicTask.objects.get(id=result.get('id'))
|
|
PeriodicTasks.changed(task_obj)
|
|
return DetailResponse(msg="添加成功", data=result)
|
|
else:
|
|
return ErrorResponse(msg="没有该任务方法,请先添加", data=None)
|
|
else:
|
|
body_data.pop('crontab')
|
|
interval = body_data.get('interval')
|
|
every = interval['every']
|
|
period = interval['period']
|
|
if not all([every,period]):
|
|
return ErrorResponse(msg="间隔时间错误")
|
|
task = body_data.get('task')
|
|
result = None
|
|
task_list = get_task_list()
|
|
task_list = task_list.get('task_list')
|
|
if task in task_list:
|
|
# 添加crontab
|
|
serializer = IntervalScheduleSerializer(data=interval, request=request)
|
|
serializer.is_valid(raise_exception=True)
|
|
self.perform_create(serializer)
|
|
|
|
# 添加任务
|
|
body_data['interval'] = serializer.data.get('id')
|
|
# body_data['enabled'] = False
|
|
serializer = self.get_serializer(data=body_data, request=request)
|
|
res = serializer.is_valid()
|
|
if not res:
|
|
raise APIException({"msg": f"添加失败,已经有一个名为 {body_data['name']} 的任务了"}, code=4000)
|
|
self.perform_create(serializer)
|
|
result = serializer.data
|
|
task_obj = PeriodicTask.objects.get(id=result.get('id'))
|
|
PeriodicTasks.changed(task_obj)
|
|
return DetailResponse(msg="添加成功", data=result)
|
|
else:
|
|
return ErrorResponse(msg="没有该任务方法,请先添加", data=None)
|
|
|
|
@transaction.atomic
|
|
def update(self, request, *args, **kwargs):
|
|
body_data = request.data.copy()
|
|
type = int(body_data.pop('type'))
|
|
args1 = body_data.get('args','')
|
|
if not isinstance(ast_convert(args1),list):
|
|
return ErrorResponse(msg="参数格式错误")
|
|
if type not in [0, 1]:
|
|
return ErrorResponse(msg="type类型错误")
|
|
if type == 1:
|
|
body_data.pop('interval')
|
|
body_data.pop('interval_id')
|
|
cron = body_data.get('crontab')
|
|
cron_id = body_data.get('crontab_id')
|
|
cron_dict = cronConvert(cron)
|
|
minute = cron_dict["minute"]
|
|
hour = cron_dict["hour"]
|
|
day = cron_dict["day"]
|
|
month = cron_dict["month"]
|
|
week = cron_dict["week"]
|
|
cron_data = {
|
|
'minute': minute,
|
|
'hour': hour,
|
|
'day_of_week': week,
|
|
'day_of_month': day,
|
|
'month_of_year': month
|
|
}
|
|
task = body_data.get('task')
|
|
result = None
|
|
task_list = get_task_list()
|
|
task_list = task_list.get('task_list')
|
|
if task in task_list:
|
|
# 编辑crontab
|
|
cond_instance = CrontabSchedule.objects.filter(id=cron_id).first()
|
|
oldcron = '{0} {1} {2} {3} {4}'.format(
|
|
cronexp(cond_instance.minute), cronexp(cond_instance.hour),
|
|
cronexp(cond_instance.day_of_month), cronexp(cond_instance.month_of_year),
|
|
cronexp(cond_instance.day_of_week)
|
|
)
|
|
if cron.strip() == oldcron:
|
|
body_data['crontab'] = cron_id
|
|
else:
|
|
# cond_instance.minute = minute
|
|
# cond_instance.hour = hour
|
|
# cond_instance.day_of_month = day
|
|
# cond_instance.month_of_year = month
|
|
# cond_instance.day_of_week = week
|
|
# body_data['crontab'] = cond_instance.id
|
|
cron_data['id'] = cron_id
|
|
serializer = CrontabScheduleSerializer(cond_instance, data=cron_data, request=request)
|
|
serializer.is_valid(raise_exception=True)
|
|
self.perform_update(serializer)
|
|
body_data['crontab'] = cond_instance.id
|
|
header = {}
|
|
header['periodic_task_name'] = body_data['name']
|
|
header['task_name'] = body_data['task']
|
|
body_data['headers'] = json.dumps(header)
|
|
partial = kwargs.pop('partial', False)
|
|
instance = self.get_object()
|
|
serializer1 = self.get_serializer(instance, data=body_data, request=request, partial=partial)
|
|
serializer1.is_valid(raise_exception=True)
|
|
self.perform_update(serializer1)
|
|
|
|
if getattr(instance, '_prefetched_objects_cache', None):
|
|
# If 'prefetch_related' has been applied to a queryset, we need to
|
|
# forcibly invalidate the prefetch cache on the instance.
|
|
instance._prefetched_objects_cache = {}
|
|
task_obj = PeriodicTask.objects.get(id=instance.id)
|
|
PeriodicTasks.changed(task_obj)
|
|
return DetailResponse(data=serializer1.data, msg="更新成功")
|
|
else:
|
|
return ErrorResponse(msg="没有该任务方法,请先添加", data=None)
|
|
else:
|
|
body_data.pop('crontab')
|
|
body_data.pop('crontab_id')
|
|
interval = body_data.get('interval')
|
|
interval_id = body_data.get('interval_id')
|
|
every = interval['every']
|
|
period = interval['period']
|
|
if not all([every, period]):
|
|
return ErrorResponse(msg="间隔时间错误")
|
|
task = body_data.get('task')
|
|
result = None
|
|
task_list = get_task_list()
|
|
task_list = task_list.get('task_list')
|
|
if task in task_list:
|
|
# 编辑crontab
|
|
interval_instance = IntervalSchedule.objects.filter(id=interval_id).first()
|
|
if interval_instance.every == every and interval_instance.period == period:
|
|
body_data['interval'] = interval_id
|
|
else:
|
|
interval['id'] = interval_id
|
|
serializer = IntervalScheduleSerializer(interval_instance, data=interval, request=request)
|
|
serializer.is_valid(raise_exception=True)
|
|
self.perform_update(serializer)
|
|
body_data['interval'] = interval_instance.id
|
|
partial = kwargs.pop('partial', False)
|
|
instance = self.get_object()
|
|
serializer1 = self.get_serializer(instance, data=body_data, request=request, partial=partial)
|
|
serializer1.is_valid(raise_exception=True)
|
|
self.perform_update(serializer1)
|
|
|
|
if getattr(instance, '_prefetched_objects_cache', None):
|
|
# If 'prefetch_related' has been applied to a queryset, we need to
|
|
# forcibly invalidate the prefetch cache on the instance.
|
|
instance._prefetched_objects_cache = {}
|
|
task_obj = PeriodicTask.objects.get(id=instance.id)
|
|
PeriodicTasks.changed(task_obj)
|
|
return DetailResponse(data=serializer1.data, msg="更新成功")
|
|
else:
|
|
return ErrorResponse(msg="没有该任务方法,请先添加", data=None)
|
|
|
|
|
|
def destroy(self, request, *args, **kwargs):
|
|
instance = self.get_object_list()
|
|
for i in instance:
|
|
if i.crontab_id:
|
|
i.crontab.delete()
|
|
if i.interval_id:
|
|
i.interval.delete()
|
|
self.perform_destroy(instance)
|
|
return DetailResponse(data=[], msg="删除成功")
|
|
|
|
|
|
def tasklist(self, request, *args, **kwargs):
|
|
"""获取本地tasks任务所有方法"""
|
|
result = get_task_list()
|
|
task_list = result.get('task_dict_list')
|
|
return SuccessResponse(msg='获取成功', data=task_list, total=len(task_list))
|
|
|
|
def taskenabled(self, request, *args, **kwargs):
|
|
"""开始、暂停任务"""
|
|
instance = self.get_object()
|
|
instance.enabled = get_parameter_dic(request).get('enabled',False)
|
|
instance.save()
|
|
PeriodicTasks.changed(instance)
|
|
return DetailResponse(msg="修改成功")
|
|
|
|
def exec_task(self, request, *args, **kwargs):
|
|
task_name = get_parameter_dic(request).get('task_name', '')
|
|
id = get_parameter_dic(request).get('id', '')
|
|
queryset = self.filter_queryset(self.get_queryset())
|
|
instance = queryset.filter(id=id).first()
|
|
if not instance:
|
|
return ErrorResponse(msg="任务不存在")
|
|
newargs = instance.args
|
|
data = {
|
|
'task': None
|
|
}
|
|
test = f"""
|
|
from {'.'.join(task_name.split('.')[:-1])} import {task_name.split('.')[-1]}
|
|
task = {task_name.split('.')[-1]}.apply_async(args={newargs},periodic_task_name='{instance.name}')
|
|
"""
|
|
exec(test, data)
|
|
if not data["task"]:
|
|
return ErrorResponse(msg="执行失败")
|
|
task_id = data.get('task', ).id
|
|
return SuccessResponse(data={'task_id': task_id},msg="执行成功") |