Skip to main content

Flask: Setting Up Celery with Flask

Flask: Setting Up Celery with Flask

Celery is a powerful distributed task queue that integrates seamlessly with Flask to handle asynchronous and background tasks, such as sending emails, processing data, or running scheduled jobs. Setting up Celery with Flask enables scalable, non-blocking task execution, improving application performance and user experience. This guide explores setting up Celery with Flask, covering key techniques, best practices, and practical applications for building efficient, production-ready web applications.


01. Why Use Celery with Flask?

Flask applications often need to perform time-consuming tasks (e.g., image processing, API calls) that can block the main thread, leading to slow responses. Celery offloads these tasks to a distributed worker system, allowing Flask to remain responsive. Combined with NumPy Array Operations for data-intensive tasks and a message broker like Redis or RabbitMQ, Celery enables scalable, asynchronous processing for Flask applications.

Example: Basic Celery Setup with Flask

# app.py
from flask import Flask, jsonify
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def add_numbers(x, y):
    return x + y

@app.route('/compute')
def compute():
    task = add_numbers.delay(4, 5)
    return jsonify({'task_id': task.id})

@app.route('/status/<task_id>')
def task_status(task_id):
    task = add_numbers.AsyncResult(task_id)
    return jsonify({'status': task.state, 'result': task.result})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Run Celery Worker:

celery -A app.celery worker --loglevel=info

Output (curl http://localhost:5000/compute):

{
  "task_id": "123e4567-e89b-12d3-a456-426614174000"
}

Output (curl http://localhost:5000/status/123e4567-e89b-12d3-a456-426614174000):

{
  "status": "SUCCESS",
  "result": 9
}

Explanation:

  • Celery(app.name) - Initializes Celery with Flask app context.
  • delay() - Queues the task asynchronously.
  • Redis acts as the message broker and result backend.

02. Key Celery Setup Techniques

Setting up Celery with Flask involves configuring the task queue, integrating with Flask’s application context, and managing tasks. The table below summarizes key techniques and their applications:

Technique Description Use Case
Basic Configuration Set up Celery with a broker and backend Simple async tasks
App Context Integration Access Flask context in tasks Database queries, config access
Task Monitoring Track task status and results User feedback, debugging
Scheduled Tasks Run tasks periodically with Celery Beat Cron jobs, recurring tasks
Error Handling Handle task failures gracefully Reliable task execution


2.1 Basic Configuration

Example: Minimal Celery Setup

# app.py
from flask import Flask
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def process_data(data):
    return f"Processed: {data}"

@app.route('/process/<data>')
def process(data):
    task = process_data.delay(data)
    return jsonify({'task_id': task.id})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Run Celery Worker:

celery -A app.celery worker --loglevel=info

Output (curl http://localhost:5000/process/hello):

{
  "task_id": "123e4567-e89b-12d3-a456-426614174000"
}

Explanation:

  • Minimal setup with Redis as broker and backend.
  • Task is queued and processed asynchronously.

2.2 App Context Integration

Example: Accessing Flask Context in Tasks

# myapp/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from celery import Celery

db = SQLAlchemy()
celery = Celery()

def create_app():
    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db'
    app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
    
    db.init_app(app)
    celery.conf.update(broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
    
    with app.app_context():
        db.create_all()
    
    return app

# myapp/tasks.py
from myapp import db, celery
from flask import current_app

@celery.task
def update_user(user_id, name):
    with current_app.app_context():
        user = db.session.get(User, user_id)
        if user:
            user.name = name
            db.session.commit()
            return f"Updated user {user_id}"
        return f"User {user_id} not found"

# myapp/models.py
from myapp import db

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(80))

# myapp/routes.py
from flask import Blueprint, jsonify
from myapp.tasks import update_user

bp = Blueprint('main', __name__)

@bp.route('/update_user/<int:user_id>/<name>')
def update(user_id, name):
    task = update_user.delay(user_id, name)
    return jsonify({'task_id': task.id})

@bp.route('/status/<task_id>')
def status(task_id):
    task = update_user.AsyncResult(task_id)
    return jsonify({'status': task.state, 'result': task.result})

Run Celery Worker:

celery -A myapp.celery worker --loglevel=info

Output (curl http://localhost:5000/update_user/1/Alice):

{
  "task_id": "123e4567-e89b-12d3-a456-426614174000"
}

Explanation:

  • current_app.app_context() - Ensures Flask app context for database access.
  • Task updates user data asynchronously.

2.3 Task Monitoring

Example: Monitoring Task Progress

# app.py
from flask import Flask, jsonify
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task(bind=True)
def long_task(self):
    for i in range(5):
        self.update_state(state='PROGRESS', meta={'current': i, 'total': 5})
        import time
        time.sleep(1)
    return 'Task completed'

@app.route('/start_task')
def start_task():
    task = long_task.delay()
    return jsonify({'task_id': task.id})

@app.route('/status/<task_id>')
def task_status(task_id):
    task = long_task.AsyncResult(task_id)
    if task.state == 'PROGRESS':
        return jsonify({'status': task.state, 'progress': task.info.get('current', 0)})
    return jsonify({'status': task.state, 'result': task.result})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Run Celery Worker:

celery -A app.celery worker --loglevel=info

Output (curl http://localhost:5000/status/ during task):

{
  "status": "PROGRESS",
  "progress": 3
}

Explanation:

  • update_state - Tracks task progress.
  • API endpoint provides real-time task status.

2.4 Scheduled Tasks

Example: Periodic Tasks with Celery Beat

# app.py
from flask import Flask
from celery import Celery
from celery.schedules import crontab

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
    'log-every-minute': {
        'task': 'app.log_message',
        'schedule': crontab(minute='*'),
    }
}

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def log_message():
    return "Periodic task executed"

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Run Celery Worker:

celery -A app.celery worker --loglevel=info

Run Celery Beat:

celery -A app.celery beat --loglevel=info

Output (worker logs):

[2025-05-11 10:00:00] Task app.log_message succeeded: Periodic task executed

Explanation:

  • CELERYBEAT_SCHEDULE - Defines periodic tasks.
  • Celery Beat triggers tasks on a schedule (e.g., every minute).
  • Requires celery[redis] and celery beat (pip install celery[redis]).

2.5 Error Handling

Example: Handling Task Errors

# app.py
from flask import Flask, jsonify
from celery import Celery

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def divide_numbers(a, b):
    try:
        return a / b
    except ZeroDivisionError:
        raise ValueError("Cannot divide by zero")

@app.route('/divide/<int:a>/<int:b>')
def divide(a, b):
    task = divide_numbers.delay(a, b)
    return jsonify({'task_id': task.id})

@app.route('/status/<task_id>')
def task_status(task_id):
    task = divide_numbers.AsyncResult(task_id)
    if task.state == 'FAILURE':
        return jsonify({'status': task.state, 'error': str(task.get(propagate=False))})
    return jsonify({'status': task.state, 'result': task.result})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Run Celery Worker:

celery -A app.celery worker --loglevel=info

Output (curl http://localhost:5000/divide/10/0):

{
  "task_id": "123e4567-e89b-12d3-a456-426614174000"
}

Output (curl http://localhost:5000/status/):

{
  "status": "FAILURE",
  "error": "Cannot divide by zero"
}

Explanation:

  • try/except - Handles task-specific errors.
  • task.get(propagate=False) - Retrieves error details without raising.

2.6 Incorrect Celery Setup

Example: Missing App Context

# app.py (Incorrect)
from flask import Flask, current_app
from celery import Celery

app = Flask(__name__)
celery = Celery(app.name, broker='redis://localhost:6379/0')

@celery.task
def access_config():
    return current_app.config['SECRET_KEY']  # No app context

@app.route('/task')
def run_task():
    task = access_config.delay()
    return jsonify({'task_id': task.id})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Output (worker logs):

RuntimeError: Working outside of application context

Explanation:

  • Tasks accessing Flask context without app_context() fail.
  • Solution: Use with current_app.app_context().

03. Effective Usage

3.1 Recommended Practices

  • Structure tasks with Flask’s application factory pattern.

Example: Scalable Celery Setup

# myapp/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from celery import Celery

db = SQLAlchemy()
celery = Celery()

def create_app():
    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db'
    app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
    
    db.init_app(app)
    celery.conf.update(broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
    
    from myapp.routes import bp
    app.register_blueprint(bp)
    
    with app.app_context():
        db.create_all()
    
    return app

# myapp/models.py
from myapp import db

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(80))

# myapp/tasks.py
from myapp import db, celery
from flask import current_app

@celery.task(bind=True)
def process_user(self, user_id):
    with current_app.app_context():
        user = db.session.get(User, user_id)
        if not user:
            raise ValueError(f"User {user_id} not found")
        for i in range(3):
            self.update_state(state='PROGRESS', meta={'current': i, 'total': 3})
            import time
            time.sleep(1)
        user.name = f"Processed_{user.name}"
        db.session.commit()
        return f"Processed user {user_id}"

# myapp/routes.py
from flask import Blueprint, jsonify
from myapp.tasks import process_user

bp = Blueprint('main', __name__)

@bp.route('/process/<int:user_id>')
def process(user_id):
    task = process_user.delay(user_id)
    return jsonify({'task_id': task.id})

@bp.route('/status/<task_id>')
def status(task_id):
    task = process_user.AsyncResult(task_id)
    if task.state == 'PROGRESS':
        return jsonify({'status': task.state, 'progress': task.info.get('current', 0)})
    elif task.state == 'FAILURE':
        return jsonify({'status': task.state, 'error': str(task.get(propagate=False))})
    return jsonify({'status': task.state, 'result': task.result})

Run Celery Worker:

celery -A myapp.celery worker --loglevel=info

Output (curl http://localhost:5000/process/1):

{
  "task_id": "123e4567-e89b-12d3-a456-426614174000"
}

Output (curl http://localhost:5000/status/ during task):

{
  "status": "PROGRESS",
  "progress": 2
}
  • Application factory organizes Flask and Celery setup.
  • Tasks access database with proper context.
  • Progress tracking and error handling enhance reliability.

3.2 Practices to Avoid

  • Avoid running heavy tasks without Celery.

Example: Blocking Flask Route

# app.py (Incorrect)
from flask import Flask

app = Flask(__name__)

@app.route('/process')
def process():
    import time
    time.sleep(5)  # Simulate heavy task
    return jsonify({'result': 'Done'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Output (curl http://localhost:5000/process):

5-second delay, blocking other requests.

  • Blocking tasks degrade performance.
  • Solution: Use Celery for asynchronous execution.

04. Common Use Cases

4.1 Sending Emails Asynchronously

Send emails without blocking the main application.

Example: Async Email Sending

# app.py
from flask import Flask, jsonify
from celery import Celery
from flask_mail import Mail, Message

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['MAIL_SERVER'] = 'smtp.example.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USERNAME'] = 'user@example.com'
app.config['MAIL_PASSWORD'] = 'password'
app.config['MAIL_USE_TLS'] = True

mail = Mail(app)
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def send_email(recipient, subject, body):
    with app.app_context():
        msg = Message(subject, recipients=[recipient], body=body)
        mail.send(msg)
        return f"Email sent to {recipient}"

@app.route('/send_email/<recipient>')
def send(recipient):
    task = send_email.delay(recipient, "Test Email", "Hello from Flask!")
    return jsonify({'task_id': task.id})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Run Celery Worker:

celery -A app.celery worker --loglevel=info

Output (curl http://localhost:5000/send_email/test@example.com):

{
  "task_id": "123e4567-e89b-12d3-a456-426614174000"
}

Explanation:

  • Celery sends emails asynchronously.
  • Flask-Mail integrates with Celery for reliable delivery.

4.2 Data Processing

Process large datasets in the background.

Example: Background Data Processing

# app.py
from flask import Flask, jsonify
from celery import Celery
import numpy as np

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def process_array(size):
    data = np.random.rand(size)
    return float(np.mean(data))

@app.route('/process/<int:size>')
def process(size):
    task = process_array.delay(size)
    return jsonify({'task_id': task.id})

@app.route('/status/<task_id>')
def task_status(task_id):
    task = process_array.AsyncResult(task_id)
    return jsonify({'status': task.state, 'result': task.result})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Run Celery Worker:

celery -A app.celery worker --loglevel=info

Output (curl http://localhost:5000/process/1000000):

{
  "task_id": "123e4567-e89b-12d3-a456-426614174000"
}

Explanation:

  • Celery processes large NumPy arrays without blocking Flask.
  • Results are stored in Redis for later retrieval.

Conclusion

Setting up Celery with Flask enables asynchronous task processing, improving performance and scalability. Key takeaways:

  • Configure Celery with a broker (e.g., Redis) and backend.
  • Integrate with Flask’s app context for database and config access.
  • Monitor tasks and handle errors for reliability.
  • Use Celery Beat for scheduled tasks.
  • Avoid blocking Flask routes with heavy tasks.

With Celery, Flask applications can handle complex, time-consuming tasks efficiently, making them ready for production-grade use cases!

Comments