time

Celery: добавляем параллелизм в Django

django_celery.png Введение

Основное предназначение Celery - синхронный или асинхронный запуск задач, как реакция на некое действие. Выполнение этих задач может происходить параллельно на нескольких машинах (worker-сервера). В пределах одной машины задачи выполняются в потоках основного процесса Celery. Действием, в результате которого запускается выполнение задачи, может быть как основной поток выполнения программы так и наступление некоторого времени (по аналогия с cron).

Пара примеров использования: после сохранения данных в модели может понадобится пересчитать некий массив данных, сам процесс обсчета может занять немалое время, потому лучше сохранить данные и вернуть управление обратно пользователю, а тем временем параллельно запустить обсчет данных. Когда расчет закончится можно известить пользователя об этом. Еще один пример использования - регулярное выполнение некоторой задачи в определенное время, например зачистка логов или обновление курса валют.

Теперь можно озвучить более строгое определение celery - очередь асинхронных задач, управляемая с помощью распределенных сообщений.

Наглядная схема работы приведена в посте RabbitMQ, Celery and Django

celery-diagram.png

Для тех кому интересно как это все работает под капотом, можно почитать про AMPQ и параллельное выполнение задач с использование асинхронного обмена сообщениями:

Установка и настройка

Нам понадобятся такие компоненты:

Установка RabbitMQ

Скачиваем последнюю весрию с официального сайта и устанавливаем

wget http://www.rabbitmq.com/releases/rabbitmq-server/v2.0.0/rabbitmq-server_2.0.0-1_all.deb
dpkg -i rabbitmq-server_2.0.0-1_all.deb
sudo apt-get -f install

Запуск сервера

sudo rabbitmq-server

Настройка пользователя и хоста для RabbitMQ

rabbitmqctl add_user myusername mypassword
rabbitmqctl add_vhost myvhost
rabbitmqctl set_permissions -p myvhost myusername ".*" ".*" ".*"

Установка celery и django-celery

sudo apt-get install gcc python-dev
pip install celery django-celery

Извещаем django о новом брокере и приложении. Добавлять в settings.py:

import djcelery
djcelery.setup_loader()

INSTALLED_APPS = (
    ...
    'djcelery',
)

BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "myusername"
BROKER_PASSWORD = "mypassword"
BROKER_VHOST = "myvhost"

Обновляем базу

python manage.py syncdb

Запускаем демона celery, который будет выполнять наши задачи и следить за периодическими задачами

./manage.py celeryd -l INFO -B

Перезапуск worker выполняется командой

kill -HUP $pid

Использование

Представим что у нас есть модель с товарами - Ware. Товары могут отображаться на сайте, но не числится на складе. Клиент может просмотреть каталог и если товара нет в наличии оставить свой email для уведомления о поступлении товара. Когда менеджер меняет статус товара на 'Есть в наличии' - всем подписанным клиентам рассылается уведомление. Содержание файла models.py

# coding=utf-8
from django.db import models

class Ware(models.Model):
    title = models.CharField('Название', max_length=200)
    availability = models.BooleanField('Наличие', default=False)
    price = models.FloatField('Цена',default=0)

    def save(self, *args, **kwargs):
        from orders.tasks import send_notify
        super(Ware, self).save(*args, **kwargs)
        if self.availability:
            send_notify.delay(self)

class Request(models.Model):
    ware = models.ForeignKey(Ware, verbose_name="Товар")
    email = models.EmailField('EMail')
    is_notified = models.BooleanField('Уведомлен?', default=False)

В папке с моделью добавляем новый файл tasks.py, в котором будут лежать все задачи связанные с данным приложением.

# coding=utf-8
from celery.decorators import task, periodic_task
from celery.task.schedules import crontab
from orders.models import Ware, Request
from datetime import timedelta

@task
def send_notify(ware):
    # ...
    # рассылаем письма всем желающим, меняем поле is_notified на True для всех обработанных запросов
    # ...
    print 'Sended.'

#@periodic_task(run_every=crontab(hour=0, minute=0, day_of_week="fri"))
def clean_orders():
    # очищаем каждую пятницу уведомления
    Request.objects.filter(is_notified=True).delete()
    print 'Cleaned'

Еще вкусные возможности celery:

  • выполняемая задача может возвращать значение, которое хранится в объекте класса AsyncResult;
  • если выполняемая задача вылетела с ексепшином, например запрашиваемый ресурс временно не отвечает, то можно повторить выполнение задачи;
  • выполнение задачи можно задержать на определенное время;
  • роутинг задач к разным worker'ам;
  • удаленное управление worker'ом;
  • выполнение группы задач за раз;

Ложка дегтя:

  • на данный момент celery не поддерживает пул соединений с брокером, это значит что если начать слать сразу много задач на выполнение, то некоторые из них могут затеряться в пространственно-временном континууме :). Это связанно с тем, что apply_async/delay каждый раз устанавливают новое соединение. Способ решения.

Обратите внимание, что для работы django-celery с mod_wsgi, необходимо в файл app.wsgi добавить

import os
os.environ["CELERY_LOADER"] = "django"

Мониторинг работы

Для мониторинга работы worker'ов можно воспользоваться celerymon либо встроенными возможностями самого celery.

Полностью подружить celery 2.0.3 и celerymon 0.2.0 у меня не получилось, выкидывает ошибку 500: Internal Server Error. Отчет о ошибке оставил на github.com, подожду реакции разработчиков.

flower - web based tool for monitoring and administrating Celery clusters.

Ghetto queues

В случаи если вам не надо вся мощь брокера RabbitMQ, а просто хочется изредка выполнять отложенные задачи, либо воспользоваться возможностями celery для периодического выполнения задач - то вам вполне подойдет ghettoq. Все задачи хранятся в одной из поддерживаемых баз данных: Redis, MongoDB, Beanstalk либо те что поддерживает джанга.

Про настройку ghettoq читаем в блоге Романа Ворушина либо тут.

Дополнительное чтиво

blog comments powered by Disqus