scheduler Module

This file is part of the web2py Web Framework
Copyrighted by Massimo Di Pierro <>

Background processes made simple

class gluon.scheduler.JobGraph(db, job_name)[source]

Bases: object

Experimental: with JobGraph you can specify dependencies amongs tasks

add_deps(task_parent, task_child)[source]

Creates a dependency between task_parent and task_child


Validates if all tasks job_name can be completed, i.e. there are no mutual dependencies among tasks. Commits at the end if successfull, or it rollbacks the entire transaction. Handle with care!

class gluon.scheduler.MetaScheduler[source]

Bases: threading.Thread

Base class documenting scheduler’s base methods


Starts the background process

Parameters:task – a Task object
Return type:tuple

Forces termination of the worker process along with any running task


Waits for any running task to be executed, then exits the worker process


Main loop, fetching tasks and starting executor’s background processes


Fetches a task ready to be executed

report_task(task, task_report)[source]

Creates a task report


This is executed by the main thread to send heartbeats


Terminates any running tasks (internal use only)

class gluon.scheduler.Scheduler(db, tasks=None, migrate=True, worker_name=None, group_names=None, heartbeat=3, max_empty_runs=0, discard_results=False, utc_time=False)[source]

Bases: gluon.scheduler.MetaScheduler

Scheduler object

  • db – DAL connection where Scheduler will create its tables
  • tasks (dict) – either a dict containing name–>func or None. If None, functions will be searched in the environment
  • migrate (bool) – turn migration on/off for the Scheduler’s tables
  • worker_name (str) – force worker_name to identify each process. Leave it to None to autoassign a name (hostname#pid)
  • group_names (list) – process tasks belonging to this group defaults to [‘main’] if nothing gets passed
  • heartbeat (int) – how many seconds the worker sleeps between one execution and the following one. Indirectly sets how many seconds will pass between checks for new tasks
  • max_empty_runs (int) – how many loops are allowed to pass without processing any tasks before exiting the process. 0 to keep always the process alive
  • discard_results (bool) – Scheduler stores executions’s details into the scheduler_run table. By default, only if there is a result the details are kept. Turning this to True means discarding results even for tasks that return something
  • utc_time (bool) – do all datetime calculations assuming UTC as the timezone. Remember to pass start_time and stop_time to tasks accordingly

Used to increase the “sleep” interval for DISABLED workers


Assigns task to workers, that can then pop them from the queue

Deals with group_name(s) logic, in order to assign linearly tasks to available workers for those groups


Elects a TICKER process that assigns tasks to available workers. Does its best to elect a worker that is not busy processing other tasks to allow a proper distribution of tasks among all active workers ASAP

define_tables(db, migrate)[source]

Defines Scheduler tables structure

disable(group_names=None, limit=None, worker_name=None)[source]

Sets DISABLED on the workers processing group_names tasks. A DISABLED worker will be kept alive but it won’t be able to process any waiting tasks, essentially putting it to sleep. By default, all group_names of Scheduler’s instantation are selected


Returns a dict holding worker_name : {**columns} representing all “registered” workers only_ticker returns only the workers running as a TICKER, if there are any

kill(group_names=None, limit=None, worker_name=None)[source]

Sets KILL as worker status. The worker will be killed even if it’s processing a task.


Main loop

This works basically as a neverending loop that:

  • checks if the worker is ready to process tasks (is not DISABLED)
  • pops a task from the queue
  • if there is a task:
    • spawns the executor background process
    • waits for the process to be finished
    • sleeps heartbeat seconds
  • if there is not a task:
    • checks for max_empty_runs
    • sleeps heartbeat seconds

Shortcut that fetches current time based on UTC preferences


Grabs a task ready to be executed from the queue

queue_task(function, pargs=[], pvars={}, **kwargs)[source]

Queue tasks. This takes care of handling the validation of all parameters

  • function – the function (anything callable with a __name__)
  • pargs – “raw” args to be passed to the function. Automatically jsonified.
  • pvars – “raw” kwargs to be passed to the function. Automatically jsonified
  • kwargs – all the parameters available (basically, every scheduler_task column). If args and vars are here, they should be jsonified already, and they will override pargs and pvars

a dict just as a normal validate_and_insert(), plus a uuid key holding the uuid of the queued task. If validation is not passed ( i.e. some parameters are invalid) both id and uuid will be None, and you’ll get an “error” dict holding the errors found.

report_task(task, task_report)[source]

Takes care of storing the result according to preferences and deals with logic for repeating tasks

resume(group_names=None, limit=None, worker_name=None)[source]

Wakes a worker up (it will be able to process queued tasks)


This function is vital for proper coordination among available workers. It:

  • sends the heartbeat

  • elects a ticker among available workers (the only process that

    effectively dispatch tasks to workers)

  • deals with worker’s statuses

  • does “housecleaning” for dead workers

  • triggers tasks assignment to workers


Called to set defaults for lazy_tables connections

set_worker_status(group_names=None, action='ACTIVE', exclude=None, limit=None, worker_name=None)[source]

Internal function to set worker’s status


Calculates the number of seconds to sleep according to worker’s status and heartbeat parameter


Shortcut for task termination.

If the task is RUNNING it will terminate it, meaning that status will be set as FAILED.

If the task is QUEUED, its stop_time will be set as to “now”,
the enabled flag will be set to False, and the status to STOPPED

can be

  • an integer : lookup will be done by
  • a string : lookup will be done by scheduler_task.uuid
  • 1 if task was stopped (meaning an update has been done)
  • None if task was not found, or if task was not RUNNING or QUEUED



task_status(ref, output=False)[source]

Retrieves task status and optionally the result of the task

  • ref

    can be

    • an integer : lookup will be done by
    • a string : lookup will be done by scheduler_task.uuid
    • a Query : lookup as you wish, e.g.
      db.scheduler_task.task_name == 'test1'
  • output (bool) – if True, fetch also the scheduler_run record

a single Row object, for the last queued task. If output == True, returns also the last scheduler_run record. The scheduler_run record is fetched by a left join, so it can have all fields == None

terminate(group_names=None, limit=None, worker_name=None)[source]

Sets TERMINATE as worker status. The worker will wait for any currently running tasks to be executed and then it will exit gracefully

update_dependencies(db, task_id)[source]

Commodity function to call assign_tasks and trap exceptions If an exception is raised, assume it happened because of database contention and retries assign_task after 0.5 seconds


Commodity function to call pop_task and trap exceptions If an exception is raised, assume it happened because of database contention and retries pop_task after 0.5 seconds

wrapped_report_task(task, task_report)[source]

Commodity function to call report_task and trap exceptions If an exception is raised, assume it happened because of database contention and retries pop_task after 0.5 seconds

class gluon.scheduler.TYPE(myclass=<type 'list'>, parse=False)[source]

Bases: object

Validator that checks whether field is valid json and validates its type. Used for args and vars of the scheduler_task table

class gluon.scheduler.Task(app, function, timeout, args='[]', vars='{}', **kwargs)[source]

Bases: object

Defines a “task” object that gets passed from the main thread to the executor’s one

class gluon.scheduler.TaskReport(status, result=None, output=None, tb=None)[source]

Bases: object

Defines a “task report” object that gets passed from the executor’s thread to the main one

gluon.scheduler.demo_function(*argv, **kwargs)[source]

test function

gluon.scheduler.executor(queue, task, out)[source]

The function used to execute tasks in the background process


allows to run worker without python .... by simply:

python gluon/