Skip to main content

Flask: Scheduling Tasks

Flask: Scheduling Tasks

Scheduling tasks in Flask enables applications to execute operations at specific times or intervals, such as sending reminders, generating reports, or cleaning up databases. By integrating scheduling tools like Celery Beat or Python’s APScheduler, Flask can handle recurring and time-based tasks efficiently without blocking the main application. This guide explores scheduling tasks in Flask, covering key techniques, best practices, and practical applications for building automated, scalable systems.


01. Why Schedule Tasks in Flask?

Scheduled tasks are essential for automating repetitive processes, such as sending daily emails or updating analytics, without manual intervention. Flask, when paired with scheduling tools, ensures these tasks run reliably in the background, maintaining application responsiveness. Combined with NumPy Array Operations for data-intensive scheduled tasks, Flask can support complex automation workflows in applications like notification systems or data pipelines.

Example: Basic Scheduled Task with Celery Beat

# app.py
from flask import Flask, jsonify
from celery import Celery
from celery.schedules import crontab

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
    'log-every-minute': {
        'task': 'app.log_message',
        'schedule': crontab(minute='*'),  # Run every minute
    }
}

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def log_message():
    with open('log.txt', 'a') as f:
        f.write(f"Task executed at {time.ctime()}\n")
    return "Logged"

@app.route('/log')
def get_log():
    try:
        with open('log.txt', 'r') as f:
            return jsonify({'log': f.read()})
    except FileNotFoundError:
        return jsonify({'log': 'No log yet'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Run Celery Worker and Beat:

celery -A app.celery worker --loglevel=info
celery -A app.celery beat --loglevel=info

Output (curl http://localhost:5000/log after a few minutes):

{
  "log": "Task executed at Sun May 11 10:00:00 2025\nTask executed at Sun May 11 10:01:00 2025\n"
}

Explanation:

  • CELERYBEAT_SCHEDULE - Defines tasks to run on a cron-like schedule.
  • crontab(minute='*') - Triggers the task every minute.
  • Redis serves as the message broker and result backend.

02. Key Task Scheduling Techniques

Flask supports multiple tools for scheduling tasks, each with distinct strengths. The table below summarizes key techniques and their applications:

Technique Description Use Case
Celery Beat Schedule tasks with Celery and a broker Distributed, production-grade scheduling
APScheduler Lightweight in-process scheduler Simple, single-instance apps
Dynamic Scheduling Schedule tasks programmatically User-defined or one-off tasks
Task Monitoring Track scheduled task execution Debugging, user feedback
Error Handling Handle failures in scheduled tasks Reliable automation


2.1 Celery Beat

Example: Scheduled Database Cleanup with Celery Beat

# myapp/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
from celery.schedules import crontab

db = SQLAlchemy()
celery = Celery()

def create_app():
    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db'
    app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
    app.config['CELERYBEAT_SCHEDULE'] = {
        'cleanup-daily': {
            'task': 'myapp.tasks.cleanup_db',
            'schedule': crontab(hour=0, minute=0),  # Run daily at midnight
        }
    }
    
    db.init_app(app)
    celery.conf.update(broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
    
    from myapp.routes import bp
    app.register_blueprint(bp)
    
    with app.app_context():
        db.create_all()
    
    return app

# myapp/models.py
from myapp import db
from datetime import datetime

class Log(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    message = db.Column(db.String(200))
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

# myapp/tasks.py
from myapp import db, celery
from flask import current_app
from myapp.models import Log
from datetime import datetime, timedelta

@celery.task
def cleanup_db():
    with current_app.app_context():
        threshold = datetime.utcnow() - timedelta(days=7)
        deleted = db.session.query(Log).filter(Log.created_at < threshold).delete()
        db.session.commit()
        return f"Deleted {deleted} old logs"

# myapp/routes.py
from flask import Blueprint, jsonify
from myapp.models import Log

bp = Blueprint('main', __name__)

@bp.route('/logs')
def get_logs():
    logs = Log.query.all()
    return jsonify([{'id': log.id, 'message': log.message, 'created_at': log.created_at} for log in logs])

@bp.route('/add_log/<message>')
def add_log(message):
    log = Log(message=message)
    db.session.add(log)
    db.session.commit()
    return jsonify({'status': 'Log added'})

Run Celery Worker and Beat:

celery -A myapp.celery worker --loglevel=info
celery -A myapp.celery beat --loglevel=info

Output (curl http://localhost:5000/logs after cleanup):

[
  {"id": 1, "message": "Recent log", "created_at": "2025-05-11T00:00:00"}
]

Explanation:

  • crontab(hour=0, minute=0) - Schedules daily cleanup.
  • Task deletes logs older than 7 days, accessing Flask context.

2.2 APScheduler

Example: Scheduled Task with APScheduler

# app.py
from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
import time

app = Flask(__name__)
scheduler = BackgroundScheduler()

def log_message():
    with open('log.txt', 'a') as f:
        f.write(f"Task executed at {time.ctime()}\n")

@app.route('/log')
def get_log():
    try:
        with open('log.txt', 'r') as f:
            return jsonify({'log': f.read()})
    except FileNotFoundError:
        return jsonify({'log': 'No log yet'})

if __name__ == '__main__':
    scheduler.add_job(log_message, 'interval', minutes=1)
    scheduler.start()
    app.run(host='0.0.0.0', port=5000)

Output (curl http://localhost:5000/log after a few minutes):

{
  "log": "Task executed at Sun May 11 10:00:00 2025\nTask executed at Sun May 11 10:01:00 2025\n"
}

Explanation:

  • BackgroundScheduler - Runs tasks in-process, ideal for lightweight apps.
  • add_job - Schedules task every minute.
  • Requires apscheduler (pip install apscheduler).

2.3 Dynamic Scheduling

Example: User-Defined Scheduled Task

# app.py
from flask import Flask, jsonify, request
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime
import time

app = Flask(__name__)
scheduler = BackgroundScheduler()

def send_reminder(task_id, message):
    with open('reminders.txt', 'a') as f:
        f.write(f"Reminder {task_id}: {message} at {time.ctime()}\n")

@app.route('/schedule', methods=['POST'])
def schedule_task():
    data = request.get_json()
    task_id = data['task_id']
    message = data['message']
    run_time = datetime.fromisoformat(data['run_time'])  # e.g., "2025-05-11T10:00:00"
    scheduler.add_job(send_reminder, 'date', run_date=run_time, args=[task_id, message], id=task_id)
    return jsonify({'status': f'Task {task_id} scheduled'})

@app.route('/reminders')
def get_reminders():
    try:
        with open('reminders.txt', 'r') as f:
            return jsonify({'reminders': f.read()})
    except FileNotFoundError:
        return jsonify({'reminders': 'No reminders yet'})

if __name__ == '__main__':
    scheduler.start()
    app.run(host='0.0.0.0', port=5000)

Output (curl -X POST -H "Content-Type: application/json" -d '{"task_id":"1","message":"Meeting","run_time":"2025-05-11T10:00:00"}' http://localhost:5000/schedule):

{
  "status": "Task 1 scheduled"
}

Output (curl http://localhost:5000/reminders after 10:00):

{
  "reminders": "Reminder 1: Meeting at Sun May 11 10:00:00 2025\n"
}

Explanation:

  • add_job('date') - Schedules one-off task at a specific time.
  • Users can define custom schedules via API.

2.4 Task Monitoring

Example: Monitoring Celery Scheduled Tasks

# app.py
from flask import Flask, jsonify
from celery import Celery
from celery.schedules import crontab
import time

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
    'log-every-minute': {
        'task': 'app.log_message',
        'schedule': crontab(minute='*'),
    }
}

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def log_message():
    with open('log.txt', 'a') as f:
        f.write(f"Task executed at {time.ctime()}\n")
    return f"Logged at {time.ctime()}"

@app.route('/status/<task_id>')
def task_status(task_id):
    task = log_message.AsyncResult(task_id)
    return jsonify({'status': task.state, 'result': task.result})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Run Celery Worker and Beat:

celery -A app.celery worker --loglevel=info
celery -A app.celery beat --loglevel=info

Output (curl http://localhost:5000/status/ after task):

{
  "status": "SUCCESS",
  "result": "Logged at Sun May 11 10:00:00 2025"
}

Explanation:

  • AsyncResult - Tracks task execution status.
  • Useful for debugging or user feedback.

2.5 Error Handling

Example: Error Handling in Scheduled Tasks

# app.py
from flask import Flask, jsonify
from celery import Celery
from celery.schedules import crontab

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
    'check-file-every-minute': {
        'task': 'app.check_file',
        'schedule': crontab(minute='*'),
    }
}

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def check_file():
    try:
        with open('nonexistent.txt', 'r') as f:
            return f.read()
    except FileNotFoundError:
        raise ValueError("File not found")

@app.route('/status/<task_id>')
def task_status(task_id):
    task = check_file.AsyncResult(task_id)
    if task.state == 'FAILURE':
        return jsonify({'status': task.state, 'error': str(task.get(propagate=False))})
    return jsonify({'status': task.state, 'result': task.result})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Run Celery Worker and Beat:

celery -A app.celery worker --loglevel=info
celery -A app.celery beat --loglevel=info

Output (curl http://localhost:5000/status/):

{
  "status": "FAILURE",
  "error": "File not found"
}

Explanation:

  • try/except - Handles task-specific errors.
  • task.get(propagate=False) - Retrieves error details safely.

2.6 Incorrect Scheduling Approach

Example: Blocking Scheduled Task

# app.py (Incorrect)
from flask import Flask
import threading
import time

app = Flask(__name__)

def scheduled_task():
    while True:
        time.sleep(60)  # Run every minute
        with open('log.txt', 'a') as f:
            f.write(f"Task executed at {time.ctime()}\n")

@app.route('/log')
def get_log():
    try:
        with open('log.txt', 'r') as f:
            return jsonify({'log': f.read()})
    except FileNotFoundError:
        return jsonify({'log': 'No log yet'})

if __name__ == '__main__':
    threading.Thread(target=scheduled_task, daemon=True).start()
    app.run(host='0.0.0.0', port=5000)

Output (curl http://localhost:5000/log):

Logs appear, but scheduling is unreliable and lacks management.

Explanation:

  • Ad-hoc threading lacks robustness and monitoring.
  • Solution: Use Celery Beat or APScheduler for reliable scheduling.

03. Effective Usage

3.1 Recommended Practices

  • Use Celery Beat for distributed, production-grade scheduling.

Example: Scalable Scheduled Task System

# myapp/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from celery import Celery
from celery.schedules import crontab

db = SQLAlchemy()
celery = Celery()

def create_app():
    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///test.db'
    app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
    app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
    app.config['CELERYBEAT_SCHEDULE'] = {
        'generate-report-daily': {
            'task': 'myapp.tasks.generate_report',
            'schedule': crontab(hour=0, minute=0),
        }
    }
    
    db.init_app(app)
    celery.conf.update(broker=app.config['CELERY_BROKER_URL'], backend=app.config['CELERY_RESULT_BACKEND'])
    
    from myapp.routes import bp
    app.register_blueprint(bp)
    
    with app.app_context():
        db.create_all()
    
    return app

# myapp/models.py
from myapp import db
from datetime import datetime

class Report(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    data = db.Column(db.String(200))
    created_at = db.Column(db.DateTime, default=datetime.utcnow)

# myapp/tasks.py
from myapp import db, celery
from flask import current_app
import numpy as np

@celery.task
def generate_report():
    with current_app.app_context():
        try:
            data = np.random.rand(10).tolist()
            report = Report(data=str(data))
            db.session.add(report)
            db.session.commit()
            return f"Report {report.id} generated"
        except Exception as e:
            return f"Error: {str(e)}"

# myapp/routes.py
from flask import Blueprint, jsonify
from myapp.models import Report
from myapp.tasks import generate_report

bp = Blueprint('main', __name__)

@bp.route('/reports')
def get_reports():
    reports = Report.query.all()
    return jsonify([{'id': r.id, 'data': r.data, 'created_at': r.created_at} for r in reports])

@bp.route('/trigger_report')
def trigger_report():
    task = generate_report.delay()
    return jsonify({'task_id': task.id})

@bp.route('/status/<task_id>')
def status(task_id):
    task = generate_report.AsyncResult(task_id)
    if task.state == 'FAILURE':
        return jsonify({'status': task.state, 'error': str(task.get(propagate=False))})
    return jsonify({'status': task.state, 'result': task.result})

Run Celery Worker and Beat:

celery -A myapp.celery worker --loglevel=info
celery -A myapp.celery beat --loglevel=info

Output (curl http://localhost:5000/reports after a day):

[
  {"id": 1, "data": "[0.23, 0.45, ..., 0.89]", "created_at": "2025-05-11T00:00:00"}
]
  • Application factory ensures modularity.
  • Celery Beat schedules daily reports with error handling.
  • Database stores results for persistence.

3.2 Practices to Avoid

  • Avoid scheduling tasks without a proper scheduler.

Example: Manual Scheduling Loop

# app.py (Incorrect)
from flask import Flask
import threading
import time

app = Flask(__name__)

def manual_schedule():
    while True:
        time.sleep(60)
        with open('log.txt', 'a') as f:
            f.write(f"Task executed at {time.ctime()}\n")

@app.route('/log')
def get_log():
    try:
        with open('log.txt', 'r') as f:
            return jsonify({'log': f.read()})
    except FileNotFoundError:
        return jsonify({'log': 'No log yet'})

if __name__ == '__main__':
    threading.Thread(target=manual_schedule, daemon=True).start()
    app.run(host='0.0.0.0', port=5000)

Output (curl http://localhost:5000/log):

Unreliable scheduling, no error handling or monitoring.

  • Manual loops are error-prone and hard to manage.
  • Solution: Use Celery Beat or APScheduler.

04. Common Use Cases

4.1 Sending Scheduled Notifications

Send periodic notifications to users.

Example: Daily Email Reminders

# app.py
from flask import Flask, jsonify
from celery import Celery
from celery.schedules import crontab
from flask_mail import Mail, Message

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
    'send-reminder-daily': {
        'task': 'app.send_reminder',
        'schedule': crontab(hour=8, minute=0),  # 8 AM daily
    }
}
app.config['MAIL_SERVER'] = 'smtp.example.com'
app.config['MAIL_PORT'] = 587
app.config['MAIL_USERNAME'] = 'user@example.com'
app.config['MAIL_PASSWORD'] = 'password'
app.config['MAIL_USE_TLS'] = True

mail = Mail(app)
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def send_reminder():
    with app.app_context():
        msg = Message("Daily Reminder", recipients=['user@example.com'], body="Don't forget to check your tasks!")
        mail.send(msg)
        return "Reminder sent"

@app.route('/status/<task_id>')
def task_status(task_id):
    task = send_reminder.AsyncResult(task_id)
    return jsonify({'status': task.state, 'result': task.result})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Run Celery Worker and Beat:

celery -A app.celery worker --loglevel=info
celery -A app.celery beat --loglevel=info

Output (worker logs at 8 AM):

[2025-05-11 08:00:00] Task app.send_reminder succeeded: Reminder sent

Explanation:

  • Celery Beat schedules daily emails.
  • Flask-Mail integrates for reliable delivery.

4.2 Generating Periodic Reports

Generate and store reports on a schedule.

Example: Weekly Data Report

# app.py
from flask import Flask, jsonify
from celery import Celery
from celery.schedules import crontab
import numpy as np

app = Flask(__name__)
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
    'generate-report-weekly': {
        'task': 'app.generate_report',
        'schedule': crontab(hour=0, minute=0, day_of_week=0),  # Sunday midnight
    }
}

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task
def generate_report():
    data = np.random.rand(10).tolist()
    with open('reports.txt', 'a') as f:
        f.write(f"Weekly report: {data}\n")
    return f"Report generated: {data}"

@app.route('/reports')
def get_reports():
    try:
        with open('reports.txt', 'r') as f:
            return jsonify({'reports': f.read()})
    except FileNotFoundError:
        return jsonify({'reports': 'No reports yet'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Run Celery Worker and Beat:

celery -A app.celery worker --loglevel=info
celery -A app.celery beat --loglevel=info

Output (curl http://localhost:5000/reports after a week):

{
  "reports": "Weekly report: [0.23, 0.45, ..., 0.89]\n"
}

Explanation:

  • Celery Beat schedules weekly reports with NumPy data.
  • Results are stored for later retrieval.

Conclusion

Scheduling tasks in Flask automates repetitive processes, enhancing application functionality. Key takeaways:

  • Use Celery Beat for distributed, scalable scheduling.
  • APScheduler for lightweight, in-process scheduling.
  • Support dynamic scheduling for user-defined tasks.
  • Monitor tasks and handle errors for reliability.
  • Avoid manual scheduling loops for production.

With Celery Beat or APScheduler, Flask applications can efficiently manage scheduled tasks, making them ideal for automation-driven systems!

Comments