scheduler Module

This file is part of the web2py Web Framework
Copyrighted by Massimo Di Pierro <mdipierro@cs.depaul.edu>

Background processes made simple

class gluon.scheduler.JobGraph(db, job_name)[source]

Bases: object

Experimental: dependencies amongs tasks

add_deps(task_parent, task_child)[source]

Create a dependency between task_parent and task_child.

validate(job_name=None)[source]

Validate if all tasks job_name can be completed.

Checks if there are no mutual dependencies among tasks. Commits at the end if successfull, or it rollbacks the entire transaction. Handle with care!

class gluon.scheduler.MetaScheduler[source]

Bases: threading.Thread

Base class documenting scheduler’s base methods

async(task)[source]

Start the background process.

Parameters:task – a Task object
Returns:containing:
('ok',result,output)
('error',exception,None)
('timeout',None,None)
('terminated',None,None)
Return type:tuple
die()[source]

Forces termination of the worker process along with any running task

give_up()[source]

Waits for any running task to be executed, then exits the worker process

loop()[source]

Main loop, fetching tasks and starting executor’s background processes

pop_task()[source]

Fetches a task ready to be executed

report_task(task, task_report)[source]

Creates a task report

run()[source]

This is executed by the main thread to send heartbeats

send_heartbeat(counter)[source]
sleep()[source]
start_heartbeats()[source]
terminate_process()[source]

Terminates any running tasks (internal use only)

class gluon.scheduler.Scheduler(db, tasks=None, migrate=True, worker_name=None, group_names=None, heartbeat=3, max_empty_runs=0, discard_results=False, utc_time=False)[source]

Bases: gluon.scheduler.MetaScheduler

Scheduler object

Parameters:
  • db – DAL connection where Scheduler will create its tables
  • tasks (dict) – either a dict containing name–>func or None. If None, functions will be searched in the environment
  • migrate (bool) – turn migration on/off for the Scheduler’s tables
  • worker_name (str) – force worker_name to identify each process. Leave it to None to autoassign a name (hostname#pid)
  • group_names (list) – process tasks belonging to this group defaults to [‘main’] if nothing gets passed
  • heartbeat (int) – how many seconds the worker sleeps between one execution and the following one. Indirectly sets how many seconds will pass between checks for new tasks
  • max_empty_runs (int) – how many loops are allowed to pass without processing any tasks before exiting the process. 0 to keep always the process alive
  • discard_results (bool) – Scheduler stores executions’s details into the scheduler_run table. By default, only if there is a result the details are kept. Turning this to True means discarding results even for tasks that return something
  • utc_time (bool) – do all datetime calculations assuming UTC as the timezone. Remember to pass start_time and stop_time to tasks accordingly
adj_hibernation()[source]

Used to increase the “sleep” interval for DISABLED workers.

assign_tasks(db)[source]

Assign task to workers, that can then pop them from the queue.

Deals with group_name(s) logic, in order to assign linearly tasks to available workers for those groups

being_a_ticker()[source]

Elect a TICKER process that assigns tasks to available workers.

Does its best to elect a worker that is not busy processing other tasks to allow a proper distribution of tasks among all active workers ASAP

define_tables(db, migrate)[source]

Define Scheduler tables structure.

disable(group_names=None, limit=None, worker_name=None)[source]

Set DISABLED on the workers processing group_names tasks.

A DISABLED worker will be kept alive but it won’t be able to process any waiting tasks, essentially putting it to sleep. By default, all group_names of Scheduler’s instantation are selected

get_workers(only_ticker=False)[source]

Returns a dict holding worker_name : {**columns} representing all “registered” workers only_ticker returns only the workers running as a TICKER, if there are any

kill(group_names=None, limit=None, worker_name=None)[source]

Sets KILL as worker status. The worker will be killed even if it’s processing a task.

loop(worker_name=None)[source]

Main loop.

This works basically as a neverending loop that:

  • checks if the worker is ready to process tasks (is not DISABLED)
  • pops a task from the queue
  • if there is a task:
    • spawns the executor background process
    • waits for the process to be finished
    • sleeps heartbeat seconds
  • if there is not a task:
    • checks for max_empty_runs
    • sleeps heartbeat seconds
now()[source]

Shortcut that fetches current time based on UTC preferences.

pop_task(db)[source]

Grab a task ready to be executed from the queue.

queue_task(function, pargs=[], pvars={}, **kwargs)[source]

Queue tasks. This takes care of handling the validation of all parameters

Parameters:
  • function – the function (anything callable with a __name__)
  • pargs – “raw” args to be passed to the function. Automatically jsonified.
  • pvars – “raw” kwargs to be passed to the function. Automatically jsonified
  • kwargs – all the parameters available (basically, every scheduler_task column). If args and vars are here, they should be jsonified already, and they will override pargs and pvars
Returns:

a dict just as a normal validate_and_insert(), plus a uuid key holding the uuid of the queued task. If validation is not passed ( i.e. some parameters are invalid) both id and uuid will be None, and you’ll get an “error” dict holding the errors found.

report_task(task, task_report)[source]

Take care of storing the result according to preferences.

Deals with logic for repeating tasks.

resume(group_names=None, limit=None, worker_name=None)[source]

Wakes a worker up (it will be able to process queued tasks)

send_heartbeat(counter)[source]

Coordination among available workers.

It: - sends the heartbeat - elects a ticker among available workers (the only process that

effectively dispatch tasks to workers)
  • deals with worker’s statuses
  • does “housecleaning” for dead workers
  • triggers tasks assignment to workers
set_requirements(scheduler_task)[source]

Called to set defaults for lazy_tables connections.

set_worker_status(group_names=None, action='ACTIVE', exclude=None, limit=None, worker_name=None)[source]

Internal function to set worker’s status.

sleep()[source]

Calculate the number of seconds to sleep.

stop_task(ref)[source]

Shortcut for task termination.

If the task is RUNNING it will terminate it, meaning that status will be set as FAILED.

If the task is QUEUED, its stop_time will be set as to “now”,
the enabled flag will be set to False, and the status to STOPPED
Parameters:ref

can be

  • an integer : lookup will be done by scheduler_task.id
  • a string : lookup will be done by scheduler_task.uuid
Returns:
  • 1 if task was stopped (meaning an update has been done)
  • None if task was not found, or if task was not RUNNING or QUEUED

Note

Experimental

task_status(ref, output=False)[source]

Retrieves task status and optionally the result of the task

Parameters:
  • ref

    can be

    • an integer : lookup will be done by scheduler_task.id
    • a string : lookup will be done by scheduler_task.uuid
    • a Query : lookup as you wish, e.g.
      db.scheduler_task.task_name == 'test1'
      
  • output (bool) – if True, fetch also the scheduler_run record
Returns:

a single Row object, for the last queued task. If output == True, returns also the last scheduler_run record. The scheduler_run record is fetched by a left join, so it can have all fields == None

terminate(group_names=None, limit=None, worker_name=None)[source]

Sets TERMINATE as worker status. The worker will wait for any currently running tasks to be executed and then it will exit gracefully

static total_seconds(td)[source]

Backport for py2.6.

update_dependencies(db, task_id)[source]

Unblock execution paths for Jobs.

wrapped_assign_tasks(db)[source]

Commodity function to call assign_tasks and trap exceptions.

If an exception is raised, assume it happened because of database contention and retries assign_task after 0.5 seconds

wrapped_pop_task()[source]

Commodity function to call pop_task and trap exceptions.

If an exception is raised, assume it happened because of database contention and retries pop_task after 0.5 seconds

wrapped_report_task(task, task_report)[source]

Commodity function to call report_task and trap exceptions.

If an exception is raised, assume it happened because of database contention and retries pop_task after 0.5 seconds

class gluon.scheduler.TYPE(myclass=<type 'list'>, parse=False)[source]

Bases: object

Validator that checks whether field is valid json and validates its type. Used for args and vars of the scheduler_task table

class gluon.scheduler.Task(app, function, timeout, args='[]', vars='{}', **kwargs)[source]

Bases: object

Defines a “task” object that gets passed from the main thread to the executor’s one

class gluon.scheduler.TaskReport(status, result=None, output=None, tb=None)[source]

Bases: object

Defines a “task report” object that gets passed from the executor’s thread to the main one

gluon.scheduler.executor(queue, task, out)[source]

The function used to execute tasks in the background process.

gluon.scheduler.main()[source]

allows to run worker without python web2py.py …. by simply:

python gluon/scheduler.py