Flask: Scheduling Tasks
Scheduling tasks in Flask enables applications to execute operations at specific times or intervals, such as sending reminders, generating reports, or cleaning up databases. By integrating scheduling tools like Celery Beat or Python’s APScheduler, Flask can handle recurring and time-based tasks efficiently without blocking the main application. This guide explores scheduling tasks in Flask, covering key techniques, best practices, and practical applications for building automated, scalable systems.
01. Why Schedule Tasks in Flask?
Scheduled tasks are essential for automating repetitive processes, such as sending daily emails or updating analytics, without manual intervention. Flask, when paired with scheduling tools, ensures these tasks run reliably in the background, maintaining application responsiveness. Combined with NumPy Array Operations for data-intensive scheduled tasks, Flask can support complex automation workflows in applications like notification systems or data pipelines.
Example: Basic Scheduled Task with Celery Beat
# app.py
from flask import Flask, jsonify
from celery import Celery
from celery.schedules import crontab
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
'log-every-minute': {
'task': 'app.log_message',
'schedule': crontab(minute='*'), # Run every minute
}
}
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def log_message():
with open('log.txt', 'a') as f:
f.write(f"Task executed at {time.ctime()}\n")
return "Logged"
@app.route('/log')
def get_log():
try:
with open('log.txt', 'r') as f:
return jsonify({'log': f.read()})
except FileNotFoundError:
return jsonify({'log': 'No log yet'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker and Beat:
celery -A app.celery worker --loglevel=info
celery -A app.celery beat --loglevel=info
Output (curl http://localhost:5000/log after a few minutes):
{
"log": "Task executed at Sun May 11 10:00:00 2025\nTask executed at Sun May 11 10:01:00 2025\n"
}
Explanation:
CELERYBEAT_SCHEDULE
- Defines tasks to run on a cron-like schedule.crontab(minute='*')
- Triggers the task every minute.- Redis serves as the message broker and result backend.
02. Key Task Scheduling Techniques
Flask supports multiple tools for scheduling tasks, each with distinct strengths. The table below summarizes key techniques and their applications:
Technique | Description | Use Case |
---|---|---|
Celery Beat | Schedule tasks with Celery and a broker | Distributed, production-grade scheduling |
APScheduler | Lightweight in-process scheduler | Simple, single-instance apps |
Dynamic Scheduling | Schedule tasks programmatically | User-defined or one-off tasks |
Task Monitoring | Track scheduled task execution | Debugging, user feedback |
Error Handling | Handle failures in scheduled tasks | Reliable automation |
2.1 Celery Beat
Example: Scheduled Database Cleanup with Celery Beat
# myapp/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
from celery.schedules import crontab
db = SQLAlchemy()
celery = Celery()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db'
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
'cleanup-daily': {
'task': 'myapp.tasks.cleanup_db',
'schedule': crontab(hour=0, minute=0), # Run daily at midnight
}
}
db.init_app(app)
celery.conf.update(broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
from myapp.routes import bp
app.register_blueprint(bp)
with app.app_context():
db.create_all()
return app
# myapp/models.py
from myapp import db
from datetime import datetime
class Log(db.Model):
id = db.Column(db.Integer, primary_key=True)
message = db.Column(db.String(200))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# myapp/tasks.py
from myapp import db, celery
from flask import current_app
from myapp.models import Log
from datetime import datetime, timedelta
@celery.task
def cleanup_db():
with current_app.app_context():
threshold = datetime.utcnow() - timedelta(days=7)
deleted = db.session.query(Log).filter(Log.created_at < threshold).delete()
db.session.commit()
return f"Deleted {deleted} old logs"
# myapp/routes.py
from flask import Blueprint, jsonify
from myapp.models import Log
bp = Blueprint('main', __name__)
@bp.route('/logs')
def get_logs():
logs = Log.query.all()
return jsonify([{'id': log.id, 'message': log.message, 'created_at': log.created_at} for log in logs])
@bp.route('/add_log/<message>')
def add_log(message):
log = Log(message=message)
db.session.add(log)
db.session.commit()
return jsonify({'status': 'Log added'})
Run Celery Worker and Beat:
celery -A myapp.celery worker --loglevel=info
celery -A myapp.celery beat --loglevel=info
Output (curl http://localhost:5000/logs after cleanup):
[
{"id": 1, "message": "Recent log", "created_at": "2025-05-11T00:00:00"}
]
Explanation:
crontab(hour=0, minute=0)
- Schedules daily cleanup.- Task deletes logs older than 7 days, accessing Flask context.
2.2 APScheduler
Example: Scheduled Task with APScheduler
# app.py
from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
import time
app = Flask(__name__)
scheduler = BackgroundScheduler()
def log_message():
with open('log.txt', 'a') as f:
f.write(f"Task executed at {time.ctime()}\n")
@app.route('/log')
def get_log():
try:
with open('log.txt', 'r') as f:
return jsonify({'log': f.read()})
except FileNotFoundError:
return jsonify({'log': 'No log yet'})
if __name__ == '__main__':
scheduler.add_job(log_message, 'interval', minutes=1)
scheduler.start()
app.run(host='0.0.0.0', port=5000)
Output (curl http://localhost:5000/log after a few minutes):
{
"log": "Task executed at Sun May 11 10:00:00 2025\nTask executed at Sun May 11 10:01:00 2025\n"
}
Explanation:
BackgroundScheduler
- Runs tasks in-process, ideal for lightweight apps.add_job
- Schedules task every minute.- Requires
apscheduler
(pip install apscheduler
).
2.3 Dynamic Scheduling
Example: User-Defined Scheduled Task
# app.py
from flask import Flask, jsonify, request
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time
app = Flask(__name__)
scheduler = BackgroundScheduler()
def send_reminder(task_id, message):
with open('reminders.txt', 'a') as f:
f.write(f"Reminder {task_id}: {message} at {time.ctime()}\n")
@app.route('/schedule', methods=['POST'])
def schedule_task():
data = request.get_json()
task_id = data['task_id']
message = data['message']
run_time = datetime.fromisoformat(data['run_time']) # e.g., "2025-05-11T10:00:00"
scheduler.add_job(send_reminder, 'date', run_date=run_time, args=[task_id, message], id=task_id)
return jsonify({'status': f'Task {task_id} scheduled'})
@app.route('/reminders')
def get_reminders():
try:
with open('reminders.txt', 'r') as f:
return jsonify({'reminders': f.read()})
except FileNotFoundError:
return jsonify({'reminders': 'No reminders yet'})
if __name__ == '__main__':
scheduler.start()
app.run(host='0.0.0.0', port=5000)
Output (curl -X POST -H "Content-Type: application/json" -d '{"task_id":"1","message":"Meeting","run_time":"2025-05-11T10:00:00"}' http://localhost:5000/schedule):
{
"status": "Task 1 scheduled"
}
Output (curl http://localhost:5000/reminders after 10:00):
{
"reminders": "Reminder 1: Meeting at Sun May 11 10:00:00 2025\n"
}
Explanation:
add_job('date')
- Schedules one-off task at a specific time.- Users can define custom schedules via API.
2.4 Task Monitoring
Example: Monitoring Celery Scheduled Tasks
# app.py
from flask import Flask, jsonify
from celery import Celery
from celery.schedules import crontab
import time
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
'log-every-minute': {
'task': 'app.log_message',
'schedule': crontab(minute='*'),
}
}
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def log_message():
with open('log.txt', 'a') as f:
f.write(f"Task executed at {time.ctime()}\n")
return f"Logged at {time.ctime()}"
@app.route('/status/<task_id>')
def task_status(task_id):
task = log_message.AsyncResult(task_id)
return jsonify({'status': task.state, 'result': task.result})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker and Beat:
celery -A app.celery worker --loglevel=info
celery -A app.celery beat --loglevel=info
Output (curl http://localhost:5000/status/
{
"status": "SUCCESS",
"result": "Logged at Sun May 11 10:00:00 2025"
}
Explanation:
AsyncResult
- Tracks task execution status.- Useful for debugging or user feedback.
2.5 Error Handling
Example: Error Handling in Scheduled Tasks
# app.py
from flask import Flask, jsonify
from celery import Celery
from celery.schedules import crontab
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
'check-file-every-minute': {
'task': 'app.check_file',
'schedule': crontab(minute='*'),
}
}
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def check_file():
try:
with open('nonexistent.txt', 'r') as f:
return f.read()
except FileNotFoundError:
raise ValueError("File not found")
@app.route('/status/<task_id>')
def task_status(task_id):
task = check_file.AsyncResult(task_id)
if task.state == 'FAILURE':
return jsonify({'status': task.state, 'error': str(task.get(propagate=False))})
return jsonify({'status': task.state, 'result': task.result})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker and Beat:
celery -A app.celery worker --loglevel=info
celery -A app.celery beat --loglevel=info
Output (curl http://localhost:5000/status/
{
"status": "FAILURE",
"error": "File not found"
}
Explanation:
try/except
- Handles task-specific errors.task.get(propagate=False)
- Retrieves error details safely.
2.6 Incorrect Scheduling Approach
Example: Blocking Scheduled Task
# app.py (Incorrect)
from flask import Flask
import threading
import time
app = Flask(__name__)
def scheduled_task():
while True:
time.sleep(60) # Run every minute
with open('log.txt', 'a') as f:
f.write(f"Task executed at {time.ctime()}\n")
@app.route('/log')
def get_log():
try:
with open('log.txt', 'r') as f:
return jsonify({'log': f.read()})
except FileNotFoundError:
return jsonify({'log': 'No log yet'})
if __name__ == '__main__':
threading.Thread(target=scheduled_task, daemon=True).start()
app.run(host='0.0.0.0', port=5000)
Output (curl http://localhost:5000/log):
Logs appear, but scheduling is unreliable and lacks management.
Explanation:
- Ad-hoc threading lacks robustness and monitoring.
- Solution: Use Celery Beat or APScheduler for reliable scheduling.
03. Effective Usage
3.1 Recommended Practices
- Use Celery Beat for distributed, production-grade scheduling.
Example: Scalable Scheduled Task System
# myapp/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
from celery.schedules import crontab
db = SQLAlchemy()
celery = Celery()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db'
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
'generate-report-daily': {
'task': 'myapp.tasks.generate_report',
'schedule': crontab(hour=0, minute=0),
}
}
db.init_app(app)
celery.conf.update(broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
from myapp.routes import bp
app.register_blueprint(bp)
with app.app_context():
db.create_all()
return app
# myapp/models.py
from myapp import db
from datetime import datetime
class Report(db.Model):
id = db.Column(db.Integer, primary_key=True)
data = db.Column(db.String(200))
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# myapp/tasks.py
from myapp import db, celery
from flask import current_app
import numpy as np
@celery.task
def generate_report():
with current_app.app_context():
try:
data = np.random.rand(10).tolist()
report = Report(data=str(data))
db.session.add(report)
db.session.commit()
return f"Report {report.id} generated"
except Exception as e:
return f"Error: {str(e)}"
# myapp/routes.py
from flask import Blueprint, jsonify
from myapp.models import Report
from myapp.tasks import generate_report
bp = Blueprint('main', __name__)
@bp.route('/reports')
def get_reports():
reports = Report.query.all()
return jsonify([{'id': r.id, 'data': r.data, 'created_at': r.created_at} for r in reports])
@bp.route('/trigger_report')
def trigger_report():
task = generate_report.delay()
return jsonify({'task_id': task.id})
@bp.route('/status/<task_id>')
def status(task_id):
task = generate_report.AsyncResult(task_id)
if task.state == 'FAILURE':
return jsonify({'status': task.state, 'error': str(task.get(propagate=False))})
return jsonify({'status': task.state, 'result': task.result})
Run Celery Worker and Beat:
celery -A myapp.celery worker --loglevel=info
celery -A myapp.celery beat --loglevel=info
Output (curl http://localhost:5000/reports after a day):
[
{"id": 1, "data": "[0.23, 0.45, ..., 0.89]", "created_at": "2025-05-11T00:00:00"}
]
- Application factory ensures modularity.
- Celery Beat schedules daily reports with error handling.
- Database stores results for persistence.
3.2 Practices to Avoid
- Avoid scheduling tasks without a proper scheduler.
Example: Manual Scheduling Loop
# app.py (Incorrect)
from flask import Flask
import threading
import time
app = Flask(__name__)
def manual_schedule():
while True:
time.sleep(60)
with open('log.txt', 'a') as f:
f.write(f"Task executed at {time.ctime()}\n")
@app.route('/log')
def get_log():
try:
with open('log.txt', 'r') as f:
return jsonify({'log': f.read()})
except FileNotFoundError:
return jsonify({'log': 'No log yet'})
if __name__ == '__main__':
threading.Thread(target=manual_schedule, daemon=True).start()
app.run(host='0.0.0.0', port=5000)
Output (curl http://localhost:5000/log):
Unreliable scheduling, no error handling or monitoring.
- Manual loops are error-prone and hard to manage.
- Solution: Use Celery Beat or APScheduler.
04. Common Use Cases
4.1 Sending Scheduled Notifications
Send periodic notifications to users.
Example: Daily Email Reminders
# app.py
from flask import Flask, jsonify
from celery import Celery
from celery.schedules import crontab
from flask_mail import Mail, Message
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
'send-reminder-daily': {
'task': 'app.send_reminder',
'schedule': crontab(hour=8, minute=0), # 8 AM daily
}
}
app.config['MAIL_SERVER'] = 'smtp.example.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USERNAME'] = 'user@example.com'
app.config['MAIL_PASSWORD'] = 'password'
app.config['MAIL_USE_TLS'] = True
mail = Mail(app)
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def send_reminder():
with app.app_context():
msg = Message("Daily Reminder", recipients=['user@example.com'], body="Don't forget to check your tasks!")
mail.send(msg)
return "Reminder sent"
@app.route('/status/<task_id>')
def task_status(task_id):
task = send_reminder.AsyncResult(task_id)
return jsonify({'status': task.state, 'result': task.result})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker and Beat:
celery -A app.celery worker --loglevel=info
celery -A app.celery beat --loglevel=info
Output (worker logs at 8 AM):
[2025-05-11 08:00:00] Task app.send_reminder succeeded: Reminder sent
Explanation:
- Celery Beat schedules daily emails.
- Flask-Mail integrates for reliable delivery.
4.2 Generating Periodic Reports
Generate and store reports on a schedule.
Example: Weekly Data Report
# app.py
from flask import Flask, jsonify
from celery import Celery
from celery.schedules import crontab
import numpy as np
app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
'generate-report-weekly': {
'task': 'app.generate_report',
'schedule': crontab(hour=0, minute=0, day_of_week=0), # Sunday midnight
}
}
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task
def generate_report():
data = np.random.rand(10).tolist()
with open('reports.txt', 'a') as f:
f.write(f"Weekly report: {data}\n")
return f"Report generated: {data}"
@app.route('/reports')
def get_reports():
try:
with open('reports.txt', 'r') as f:
return jsonify({'reports': f.read()})
except FileNotFoundError:
return jsonify({'reports': 'No reports yet'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Run Celery Worker and Beat:
celery -A app.celery worker --loglevel=info
celery -A app.celery beat --loglevel=info
Output (curl http://localhost:5000/reports after a week):
{
"reports": "Weekly report: [0.23, 0.45, ..., 0.89]\n"
}
Explanation:
- Celery Beat schedules weekly reports with NumPy data.
- Results are stored for later retrieval.
Conclusion
Scheduling tasks in Flask automates repetitive processes, enhancing application functionality. Key takeaways:
- Use Celery Beat for distributed, scalable scheduling.
- APScheduler for lightweight, in-process scheduling.
- Support dynamic scheduling for user-defined tasks.
- Monitor tasks and handle errors for reliability.
- Avoid manual scheduling loops for production.
With Celery Beat or APScheduler, Flask applications can efficiently manage scheduled tasks, making them ideal for automation-driven systems!
Comments
Post a Comment