Celery 4.0支持django1.8及以上的版本,低於1.8的項目使用Celery 3.1。 一個django項目的組織如下: 首先建立proj/proj/celery.py文件: 然後要保證django項目啟動時上述的app被載入,修改proj/proj/__init__.py文件: 現在就 ...
Celery 4.0支持django1.8及以上的版本,低於1.8的項目使用Celery 3.1。
一個django項目的組織如下:
- proj/ - manage.py - proj/ - __init__.py - settings.py - urls.py
首先建立proj/proj/celery.py文件:
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') app = Celery('proj') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
然後要保證django項目啟動時上述的app被載入,修改proj/proj/__init__.py文件:
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ['celery_app']
現在就可以在INSTALLED_APPS中的app下建立tasks.py文件啦:
- app1/ - tasks.py - models.py - app2/ - tasks.py - models.py
比如:
# Create your tasks here from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
在views中調用這些tasks即可非同步運行。
如果使用Redis作為broker,在settings.py中添加:
CELERY_BROKER_URL = 'redis://localhost:6379/0'
可以使用Django ORM/Cache作為儲存backend。
下載庫:
$ pip install django-celery-results
設定settings.py:
INSTALLED_APPS = ( ..., 'django_celery_results', )
建立數據表:
$ python manage.py migrate django_celery_results
在settings.py中添加Celery設置:
CELERY_RESULT_BACKEND = 'django-db' CELERY_RESULT_BACKEND = 'django-cache'
啟動:
$ celery -A proj worker -l info
可以在python manage.py shell中調用:
$ python manage.py shell Python 2.7.12 (default, Nov 19 2016, 06:48:10) [GCC 5.4.0 20160609] on linux2 Type "help", "copyright", "credits" or "license" for more information. (InteractiveConsole) >>> from app1.tasks import add >>> add.delay(3,4) <AsyncResult: a9abab6d-b7a9-47e6-8c09-ec284948449f>
celery日誌:
[2017-09-14 00:09:41,432: INFO/ForkPoolWorker-1] Task urldata.tasks.add[38af760e-ed6c-48f8-b77c-d67bade8d6b8] succeeded in 0.00782653002534s: 7
官方一個完整的例子:https://github.com/celery/celery/tree/master/examples/django/
官方文檔還有一個非同步審查用戶上傳評論的例子。
blog/models.py:
from django.db import models from django.utils.translation import ugettext_lazy as _ class Comment(models.Model): name = models.CharField(_('name'), max_length=64) email_address = models.EmailField(_('email address')) homepage = models.URLField(_('home page'), blank=True, verify_exists=False) comment = models.TextField(_('comment')) pub_date = models.DateTimeField(_('Published date'), editable=False, auto_add_now=True) is_spam = models.BooleanField(_('spam?'), default=False, editable=False) class Meta: verbose_name = _('comment') verbose_name_plural = _('comments')
在views中先保存評論,同時調用celery非同步審核。
blog/views.py:
from django import forms from django.http import HttpResponseRedirect from django.template.context import RequestContext from django.shortcuts import get_object_or_404, render_to_response from blog import tasks from blog.models import Comment class CommentForm(forms.ModelForm): class Meta: model = Comment def add_comment(request, slug, template_name='comments/create.html'): post = get_object_or_404(Entry, slug=slug) remote_addr = request.META.get('REMOTE_ADDR') if request.method == 'post': form = CommentForm(request.POST, request.FILES) if form.is_valid(): comment = form.save() # Check spam asynchronously. tasks.spam_filter.delay(comment_id=comment.id, remote_addr=remote_addr) return HttpResponseRedirect(post.get_absolute_url()) else: form = CommentForm() context = RequestContext(request, {'form': form}) return render_to_response(template_name, context_instance=context)
tasks如下:
blog/tasks.py
from celery import Celery from akismet import Akismet from django.core.exceptions import ImproperlyConfigured from django.contrib.sites.models import Site from blog.models import Comment app = Celery(broker='amqp://') @app.task def spam_filter(comment_id, remote_addr=None): logger = spam_filter.get_logger() logger.info('Running spam filter for comment %s', comment_id) comment = Comment.objects.get(pk=comment_id) current_domain = Site.objects.get_current().domain akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain)) if not akismet.verify_key(): raise ImproperlyConfigured('Invalid AKISMET_KEY') is_spam = akismet.comment_check(user_ip=remote_addr, comment_content=comment.comment, comment_author=comment.name, comment_author_email=comment.email_address) if is_spam: comment.is_spam = True comment.save() return is_spam