Flask: Setting Up Celery with Flask
Celery is a powerful distributed task queue that integrates seamlessly with Flask to handle asynchronous and background tasks, such as sending emails, processing data, or running scheduled jobs. Setting up Celery with Flask enables scalable, non-blocking task execution, improving application performance and user experience. This guide explores setting up Celery with Flask, covering key techniques, best practices, and practical applications for building efficient, production-ready web applications.
01. Why Use Celery with Flask?
Flask applications often need to perform time-consuming tasks (e.g., image processing, API calls) that can block the main thread, leading to slow responses. Celery offloads these tasks to a distributed worker system, allowing Flask to remain responsive. Combined with NumPy Array Operations for data-intensive tasks and a message broker like Redis or RabbitMQ, Celery enables scalable, asynchronous processing for Flask applications.
Example: Basic Celery Setup with Flask
# app.py
from flask import Flask, jsonify
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def add_numbers(x, y):
return x + y
@app.route('/compute')
def compute():
task = add_numbers.delay(4, 5)
return jsonify({'task_id': task.id})
@app.route('/status/<task_id>')
def task_status(task_id):
task = add_numbers.AsyncResult(task_id)
return jsonify({'status': task.state, 'result': task.result})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker:
celery -A app.celery worker --loglevel=info
Output (curl http://localhost:5000/compute):
{
"task_id": "123e4567-e89b-12d3-a456-426614174000"
}
Output (curl http://localhost:5000/status/123e4567-e89b-12d3-a456-426614174000):
{
"status": "SUCCESS",
"result": 9
}
Explanation:
Celery(app.name)
- Initializes Celery with Flask app context.delay()
- Queues the task asynchronously.- Redis acts as the message broker and result backend.
02. Key Celery Setup Techniques
Setting up Celery with Flask involves configuring the task queue, integrating with Flask’s application context, and managing tasks. The table below summarizes key techniques and their applications:
Technique | Description | Use Case |
---|---|---|
Basic Configuration | Set up Celery with a broker and backend | Simple async tasks |
App Context Integration | Access Flask context in tasks | Database queries, config access |
Task Monitoring | Track task status and results | User feedback, debugging |
Scheduled Tasks | Run tasks periodically with Celery Beat | Cron jobs, recurring tasks |
Error Handling | Handle task failures gracefully | Reliable task execution |
2.1 Basic Configuration
Example: Minimal Celery Setup
# app.py
from flask import Flask
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def process_data(data):
return f"Processed: {data}"
@app.route('/process/<data>')
def process(data):
task = process_data.delay(data)
return jsonify({'task_id': task.id})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker:
celery -A app.celery worker --loglevel=info
Output (curl http://localhost:5000/process/hello):
{
"task_id": "123e4567-e89b-12d3-a456-426614174000"
}
Explanation:
- Minimal setup with Redis as broker and backend.
- Task is queued and processed asynchronously.
2.2 App Context Integration
Example: Accessing Flask Context in Tasks
# myapp/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db'
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
db.init_app(app)
celery.conf.update(broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
with app.app_context():
db.create_all()
return app
# myapp/tasks.py
from myapp import db, celery
from flask import current_app
@celery.task
def update_user(user_id, name):
with current_app.app_context():
user = db.session.get(User, user_id)
if user:
user.name = name
db.session.commit()
return f"Updated user {user_id}"
return f"User {user_id} not found"
# myapp/models.py
from myapp import db
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80))
# myapp/routes.py
from flask import Blueprint, jsonify
from myapp.tasks import update_user
bp = Blueprint('main', __name__)
@bp.route('/update_user/<int:user_id>/<name>')
def update(user_id, name):
task = update_user.delay(user_id, name)
return jsonify({'task_id': task.id})
@bp.route('/status/<task_id>')
def status(task_id):
task = update_user.AsyncResult(task_id)
return jsonify({'status': task.state, 'result': task.result})
Run Celery Worker:
celery -A myapp.celery worker --loglevel=info
Output (curl http://localhost:5000/update_user/1/Alice):
{
"task_id": "123e4567-e89b-12d3-a456-426614174000"
}
Explanation:
current_app.app_context()
- Ensures Flask app context for database access.- Task updates user data asynchronously.
2.3 Task Monitoring
Example: Monitoring Task Progress
# app.py
from flask import Flask, jsonify
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task(bind=True)
def long_task(self):
for i in range(5):
self.update_state(state='PROGRESS', meta={'current': i, 'total': 5})
import time
time.sleep(1)
return 'Task completed'
@app.route('/start_task')
def start_task():
task = long_task.delay()
return jsonify({'task_id': task.id})
@app.route('/status/<task_id>')
def task_status(task_id):
task = long_task.AsyncResult(task_id)
if task.state == 'PROGRESS':
return jsonify({'status': task.state, 'progress': task.info.get('current', 0)})
return jsonify({'status': task.state, 'result': task.result})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker:
celery -A app.celery worker --loglevel=info
Output (curl http://localhost:5000/status/
{
"status": "PROGRESS",
"progress": 3
}
Explanation:
update_state
- Tracks task progress.- API endpoint provides real-time task status.
2.4 Scheduled Tasks
Example: Periodic Tasks with Celery Beat
# app.py
from flask import Flask
from celery import Celery
from celery.schedules import crontab
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
'log-every-minute': {
'task': 'app.log_message',
'schedule': crontab(minute='*'),
}
}
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def log_message():
return "Periodic task executed"
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker:
celery -A app.celery worker --loglevel=info
Run Celery Beat:
celery -A app.celery beat --loglevel=info
Output (worker logs):
[2025-05-11 10:00:00] Task app.log_message succeeded: Periodic task executed
Explanation:
CELERYBEAT_SCHEDULE
- Defines periodic tasks.- Celery Beat triggers tasks on a schedule (e.g., every minute).
- Requires
celery[redis]
andcelery beat
(pip install celery[redis]
).
2.5 Error Handling
Example: Handling Task Errors
# app.py
from flask import Flask, jsonify
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def divide_numbers(a, b):
try:
return a / b
except ZeroDivisionError:
raise ValueError("Cannot divide by zero")
@app.route('/divide/<int:a>/<int:b>')
def divide(a, b):
task = divide_numbers.delay(a, b)
return jsonify({'task_id': task.id})
@app.route('/status/<task_id>')
def task_status(task_id):
task = divide_numbers.AsyncResult(task_id)
if task.state == 'FAILURE':
return jsonify({'status': task.state, 'error': str(task.get(propagate=False))})
return jsonify({'status': task.state, 'result': task.result})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker:
celery -A app.celery worker --loglevel=info
Output (curl http://localhost:5000/divide/10/0):
{
"task_id": "123e4567-e89b-12d3-a456-426614174000"
}
Output (curl http://localhost:5000/status/
{
"status": "FAILURE",
"error": "Cannot divide by zero"
}
Explanation:
try/except
- Handles task-specific errors.task.get(propagate=False)
- Retrieves error details without raising.
2.6 Incorrect Celery Setup
Example: Missing App Context
# app.py (Incorrect)
from flask import Flask, current_app
from celery import Celery
app = Flask(__name__)
celery = Celery(app.name, broker='redis://localhost:6379/0')
@celery.task
def access_config():
return current_app.config['SECRET_KEY'] # No app context
@app.route('/task')
def run_task():
task = access_config.delay()
return jsonify({'task_id': task.id})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Output (worker logs):
RuntimeError: Working outside of application context
Explanation:
- Tasks accessing Flask context without
app_context()
fail. - Solution: Use
with current_app.app_context()
.
03. Effective Usage
3.1 Recommended Practices
- Structure tasks with Flask’s application factory pattern.
Example: Scalable Celery Setup
# myapp/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db'
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
db.init_app(app)
celery.conf.update(broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
from myapp.routes import bp
app.register_blueprint(bp)
with app.app_context():
db.create_all()
return app
# myapp/models.py
from myapp import db
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80))
# myapp/tasks.py
from myapp import db, celery
from flask import current_app
@celery.task(bind=True)
def process_user(self, user_id):
with current_app.app_context():
user = db.session.get(User, user_id)
if not user:
raise ValueError(f"User {user_id} not found")
for i in range(3):
self.update_state(state='PROGRESS', meta={'current': i, 'total': 3})
import time
time.sleep(1)
user.name = f"Processed_{user.name}"
db.session.commit()
return f"Processed user {user_id}"
# myapp/routes.py
from flask import Blueprint, jsonify
from myapp.tasks import process_user
bp = Blueprint('main', __name__)
@bp.route('/process/<int:user_id>')
def process(user_id):
task = process_user.delay(user_id)
return jsonify({'task_id': task.id})
@bp.route('/status/<task_id>')
def status(task_id):
task = process_user.AsyncResult(task_id)
if task.state == 'PROGRESS':
return jsonify({'status': task.state, 'progress': task.info.get('current', 0)})
elif task.state == 'FAILURE':
return jsonify({'status': task.state, 'error': str(task.get(propagate=False))})
return jsonify({'status': task.state, 'result': task.result})
Run Celery Worker:
celery -A myapp.celery worker --loglevel=info
Output (curl http://localhost:5000/process/1):
{
"task_id": "123e4567-e89b-12d3-a456-426614174000"
}
Output (curl http://localhost:5000/status/
{
"status": "PROGRESS",
"progress": 2
}
- Application factory organizes Flask and Celery setup.
- Tasks access database with proper context.
- Progress tracking and error handling enhance reliability.
3.2 Practices to Avoid
- Avoid running heavy tasks without Celery.
Example: Blocking Flask Route
# app.py (Incorrect)
from flask import Flask
app = Flask(__name__)
@app.route('/process')
def process():
import time
time.sleep(5) # Simulate heavy task
return jsonify({'result': 'Done'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Output (curl http://localhost:5000/process):
5-second delay, blocking other requests.
- Blocking tasks degrade performance.
- Solution: Use Celery for asynchronous execution.
04. Common Use Cases
4.1 Sending Emails Asynchronously
Send emails without blocking the main application.
Example: Async Email Sending
# app.py
from flask import Flask, jsonify
from celery import Celery
from flask_mail import Mail, Message
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['MAIL_SERVER'] = 'smtp.example.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USERNAME'] = 'user@example.com'
app.config['MAIL_PASSWORD'] = 'password'
app.config['MAIL_USE_TLS'] = True
mail = Mail(app)
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def send_email(recipient, subject, body):
with app.app_context():
msg = Message(subject, recipients=[recipient], body=body)
mail.send(msg)
return f"Email sent to {recipient}"
@app.route('/send_email/<recipient>')
def send(recipient):
task = send_email.delay(recipient, "Test Email", "Hello from Flask!")
return jsonify({'task_id': task.id})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker:
celery -A app.celery worker --loglevel=info
Output (curl http://localhost:5000/send_email/test@example.com):
{
"task_id": "123e4567-e89b-12d3-a456-426614174000"
}
Explanation:
- Celery sends emails asynchronously.
- Flask-Mail integrates with Celery for reliable delivery.
4.2 Data Processing
Process large datasets in the background.
Example: Background Data Processing
# app.py
from flask import Flask, jsonify
from celery import Celery
import numpy as np
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def process_array(size):
data = np.random.rand(size)
return float(np.mean(data))
@app.route('/process/<int:size>')
def process(size):
task = process_array.delay(size)
return jsonify({'task_id': task.id})
@app.route('/status/<task_id>')
def task_status(task_id):
task = process_array.AsyncResult(task_id)
return jsonify({'status': task.state, 'result': task.result})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker:
celery -A app.celery worker --loglevel=info
Output (curl http://localhost:5000/process/1000000):
{
"task_id": "123e4567-e89b-12d3-a456-426614174000"
}
Explanation:
- Celery processes large NumPy arrays without blocking Flask.
- Results are stored in Redis for later retrieval.
Conclusion
Setting up Celery with Flask enables asynchronous task processing, improving performance and scalability. Key takeaways:
- Configure Celery with a broker (e.g., Redis) and backend.
- Integrate with Flask’s app context for database and config access.
- Monitor tasks and handle errors for reliability.
- Use Celery Beat for scheduled tasks.
- Avoid blocking Flask routes with heavy tasks.
With Celery, Flask applications can handle complex, time-consuming tasks efficiently, making them ready for production-grade use cases!
Comments
Post a Comment