Flask: Running Background Tasks
Running background tasks in Flask allows applications to handle time-consuming operations, such as data processing, sending emails, or API calls, without blocking the main request-response cycle. By offloading these tasks to background processes, Flask remains responsive, improving user experience and scalability. This guide explores running background tasks in Flask, focusing on using Celery, threading, and other techniques, along with best practices and practical applications for building efficient web applications.
01. Why Run Background Tasks in Flask?
Flask’s synchronous nature means long-running tasks can delay responses, degrading performance. Background tasks enable asynchronous execution, allowing Flask to process requests while tasks run concurrently. Tools like Celery or Python’s threading, combined with NumPy Array Operations for data-intensive tasks, make Flask suitable for applications requiring high responsiveness, such as real-time dashboards or notification systems.
Example: Basic Background Task with Celery
# app.py
from flask import Flask, jsonify
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def long_task(data):
import time
time.sleep(5) # Simulate long-running task
return f"Processed: {data}"
@app.route('/process/<data>')
def process(data):
task = long_task.delay(data)
return jsonify({'task_id': task.id, 'status': 'Task queued'})
@app.route('/status/<task_id>')
def task_status(task_id):
task = long_task.AsyncResult(task_id)
return jsonify({'status': task.state, 'result': task.result})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker:
celery -A app.celery worker --loglevel=info
Output (curl http://localhost:5000/process/test):
{
"task_id": "123e4567-e89b-12d3-a456-426614174000",
"status": "Task queued"
}
Output (curl http://localhost:5000/status/
{
"status": "SUCCESS",
"result": "Processed: test"
}
Explanation:
Celery
- Queues tasks to a Redis broker for asynchronous execution.delay()
- Triggers the task without blocking the Flask response.- Redis stores task results for status checks.
02. Key Background Task Techniques
Flask supports multiple approaches for running background tasks, each suited to different use cases. The table below summarizes key techniques and their applications:
Technique | Description | Use Case |
---|---|---|
Celery Tasks | Distributed task queue with a broker | Scalable, production-grade tasks |
Threading | Run tasks in separate threads | Lightweight, simple tasks |
AsyncIO | Use async/await for concurrency | I/O-bound tasks, modern Python |
Scheduled Tasks | Run tasks periodically | Cron jobs, periodic updates |
Task Monitoring | Track task progress and results | User feedback, debugging |
2.1 Celery Tasks
Example: Celery with Flask Context
# myapp/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db'
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
db.init_app(app)
celery.conf.update(broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
from myapp.routes import bp
app.register_blueprint(bp)
with app.app_context():
db.create_all()
return app
# myapp/models.py
from myapp import db
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(80))
# myapp/tasks.py
from myapp import db, celery
from flask import current_app
@celery.task
def update_user(user_id, name):
with current_app.app_context():
user = db.session.get(User, user_id)
if user:
user.name = name
db.session.commit()
return f"Updated user {user_id}"
return f"User {user_id} not found"
# myapp/routes.py
from flask import Blueprint, jsonify
from myapp.tasks import update_user
bp = Blueprint('main', __name__)
@bp.route('/update/<int:user_id>/<name>')
def update(user_id, name):
task = update_user.delay(user_id, name)
return jsonify({'task_id': task.id})
Run Celery Worker:
celery -A myapp.celery worker --loglevel=info
Output (curl http://localhost:5000/update/1/Alice):
{
"task_id": "123e4567-e89b-12d3-a456-426614174000"
}
Explanation:
current_app.app_context()
- Ensures Flask context for database operations.- Celery handles database updates asynchronously.
2.2 Threading
Example: Background Task with Threading
# app.py
from flask import Flask, jsonify
import threading
import time
app = Flask(__name__)
def long_running_task(data):
time.sleep(5) # Simulate long task
with open('result.txt', 'a') as f:
f.write(f"Processed: {data}\n")
@app.route('/process/<data>')
def process(data):
thread = threading.Thread(target=long_running_task, args=(data,))
thread.start()
return jsonify({'status': 'Task started'})
@app.route('/result')
def result():
try:
with open('result.txt', 'r') as f:
return jsonify({'result': f.read()})
except FileNotFoundError:
return jsonify({'result': 'No results yet'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Output (curl http://localhost:5000/process/test):
{
"status": "Task started"
}
Output (curl http://localhost:5000/result after 5s):
{
"result": "Processed: test\n"
}
Explanation:
threading.Thread
- Runs task in a separate thread.- Suitable for lightweight tasks but not scalable for production.
2.3 AsyncIO
Example: Background Task with AsyncIO
# app.py
from flask import Flask, jsonify
import asyncio
import aiofiles
import threading
app = Flask(__name__)
loop = asyncio.new_event_loop()
async def long_running_task(data):
await asyncio.sleep(5) # Simulate async task
async with aiofiles.open('result.txt', 'a') as f:
await f.write(f"Processed: {data}\n")
def run_async_task(data):
asyncio.run_coroutine_threadsafe(long_running_task(data), loop).result()
@app.route('/process/<data>')
def process(data):
thread = threading.Thread(target=run_async_task, args=(data,))
thread.start()
return jsonify({'status': 'Task started'})
@app.route('/result')
def result():
try:
with open('result.txt', 'r') as f:
return jsonify({'result': f.read()})
except FileNotFoundError:
return jsonify({'result': 'No results yet'})
if __name__ == '__main__':
threading.Thread(target=loop.run_forever, daemon=True).start()
app.run(host='0.0.0.0', port=5000)
Output (curl http://localhost:5000/process/test):
{
"status": "Task started"
}
Explanation:
asyncio
- Handles I/O-bound tasks efficiently.- Thread runs AsyncIO event loop to avoid blocking Flask.
- Requires
aiofiles
(pip install aiofiles
).
2.4 Scheduled Tasks
Example: Periodic Tasks with Celery Beat
# app.py
from flask import Flask
from celery import Celery
from celery.schedules import crontab
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
'log-every-minute': {
'task': 'app.log_message',
'schedule': crontab(minute='*'),
}
}
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def log_message():
with open('log.txt', 'a') as f:
f.write("Periodic task executed\n")
return "Logged"
@app.route('/log')
def get_log():
try:
with open('log.txt', 'r') as f:
return jsonify({'log': f.read()})
except FileNotFoundError:
return jsonify({'log': 'No log yet'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker and Beat:
celery -A app.celery worker --loglevel=info
celery -A app.celery beat --loglevel=info
Output (curl http://localhost:5000/log after a minute):
{
"log": "Periodic task executed\n"
}
Explanation:
CELERYBEAT_SCHEDULE
- Schedules periodic tasks.- Celery Beat triggers tasks on a cron-like schedule.
2.5 Task Monitoring
Example: Monitoring Celery Task Progress
# app.py
from flask import Flask, jsonify
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task(bind=True)
def long_task(self, iterations):
for i in range(iterations):
self.update_state(state='PROGRESS', meta={'current': i + 1, 'total': iterations})
import time
time.sleep(1)
return "Task completed"
@app.route('/start/<int:iterations>')
def start(iterations):
task = long_task.delay(iterations)
return jsonify({'task_id': task.id})
@app.route('/status/<task_id>')
def task_status(task_id):
task = long_task.AsyncResult(task_id)
if task.state == 'PROGRESS':
return jsonify({'status': task.state, 'progress': task.info.get('current', 0)})
return jsonify({'status': task.state, 'result': task.result})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker:
celery -A app.celery worker --loglevel=info
Output (curl http://localhost:5000/status/
{
"status": "PROGRESS",
"progress": 3
}
Explanation:
update_state
- Tracks task progress in real time.- API endpoint provides feedback to users.
2.6 Incorrect Background Task Usage
Example: Blocking Flask Route
# app.py (Incorrect)
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/process/<data>')
def process(data):
import time
time.sleep(5) # Blocking task
return jsonify({'result': f"Processed: {data}"})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Output (curl http://localhost:5000/process/test):
5-second delay, blocking other requests.
Explanation:
- Blocking tasks in routes degrade performance.
- Solution: Use Celery, threading, or AsyncIO for background execution.
03. Effective Usage
3.1 Recommended Practices
- Use Celery for production-grade, scalable background tasks.
Example: Scalable Background Task System
# myapp/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
db = SQLAlchemy()
celery = Celery()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db'
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
db.init_app(app)
celery.conf.update(broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
from myapp.routes import bp
app.register_blueprint(bp)
with app.app_context():
db.create_all()
return app
# myapp/models.py
from myapp import db
class Report(db.Model):
id = db.Column(db.Integer, primary_key=True)
data = db.Column(db.String(200))
# myapp/tasks.py
from myapp import db, celery
from flask import current_app
import numpy as np
@celery.task(bind=True)
def generate_report(self, report_id):
with current_app.app_context():
report = db.session.get(Report, report_id)
if not report:
raise ValueError(f"Report {report_id} not found")
for i in range(5):
self.update_state(state='PROGRESS', meta={'current': i + 1, 'total': 5})
import time
time.sleep(1)
data = np.random.rand(10).tolist()
report.data = str(data)
db.session.commit()
return f"Report {report_id} generated"
# myapp/routes.py
from flask import Blueprint, jsonify
from myapp.tasks import generate_report
from myapp.models import Report
from myapp import db
bp = Blueprint('main', __name__)
@bp.route('/create_report')
def create_report():
report = Report(data="Pending")
db.session.add(report)
db.session.commit()
task = generate_report.delay(report.id)
return jsonify({'task_id': task.id, 'report_id': report.id})
@bp.route('/status/<task_id>')
def status(task_id):
task = generate_report.AsyncResult(task_id)
if task.state == 'PROGRESS':
return jsonify({'status': task.state, 'progress': task.info.get('current', 0)})
elif task.state == 'FAILURE':
return jsonify({'status': task.state, 'error': str(task.get(propagate=False))})
return jsonify({'status': task.state, 'result': task.result})
@bp.route('/report/<int:report_id>')
def get_report(report_id):
report = db.session.get(Report, report_id)
if report:
return jsonify({'data': report.data})
return jsonify({'error': 'Report not found'})
Run Celery Worker:
celery -A myapp.celery worker --loglevel=info
Output (curl http://localhost:5000/create_report):
{
"task_id": "123e4567-e89b-12d3-a456-426614174000",
"report_id": 1
}
Output (curl http://localhost:5000/report/1 after task):
{
"data": "[0.23, 0.45, ..., 0.89]"
}
- Application factory ensures modularity.
- Celery with progress tracking and error handling.
- Database integration for persistent results.
3.2 Practices to Avoid
- Avoid using threading for complex or production tasks.
Example: Unscalable Threading
# app.py (Incorrect)
from flask import Flask
import threading
import time
app = Flask(__name__)
def heavy_task():
time.sleep(10) # Simulate heavy task
return "Done"
@app.route('/process')
def process():
thread = threading.Thread(target=heavy_task)
thread.start()
return {'status': 'Started'}
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, threaded=True)
Output (curl http://localhost:5000/process):
No way to track task status or results.
- Threading lacks scalability and task management.
- Solution: Use Celery for robust background tasks.
04. Common Use Cases
4.1 Sending Notifications
Send notifications asynchronously to avoid blocking.
Example: Async Notification with Celery
# app.py
from flask import Flask, jsonify
from celery import Celery
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def send_notification(user, message):
import time
time.sleep(2) # Simulate notification delay
return f"Notification sent to {user}: {message}"
@app.route('/notify/<user>/<message>')
def notify(user, message):
task = send_notification.delay(user, message)
return jsonify({'task_id': task.id})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker:
celery -A app.celery worker --loglevel=info
Output (curl http://localhost:5000/notify/alice/Update):
{
"task_id": "123e4567-e89b-12d3-a456-426614174000"
}
Explanation:
- Celery handles notification tasks asynchronously.
- Flask responds immediately, improving UX.
4.2 Data Processing
Process large datasets in the background.
Example: Background Data Processing
# app.py
from flask import Flask, jsonify
from celery import Celery
import numpy as np
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def process_data(size):
data = np.random.rand(size)
return float(np.sum(data))
@app.route('/process/<int:size>')
def process(size):
task = process_data.delay(size)
return jsonify({'task_id': task.id})
@app.route('/status/<task_id>')
def task_status(task_id):
task = process_data.AsyncResult(task_id)
return jsonify({'status': task.state, 'result': task.result})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker:
celery -A app.celery worker --loglevel=info
Output (curl http://localhost:5000/process/1000000):
{
"task_id": "123e4567-e89b-12d3-a456-426614174000"
}
Explanation:
- Celery processes large NumPy arrays in the background.
- Results are stored in Redis for retrieval.
Conclusion
Running background tasks in Flask enhances application performance by offloading heavy operations. Key takeaways:
- Use Celery for scalable, production-ready tasks.
- Threading or AsyncIO for lightweight or I/O-bound tasks.
- Schedule periodic tasks with Celery Beat.
- Monitor tasks for user feedback and debugging.
- Avoid blocking Flask routes with long-running tasks.
With these techniques, Flask applications can handle complex background tasks efficiently, ensuring responsiveness and scalability!
Comments
Post a Comment