scheduler
Module¶
Background processes made simple¶
-
class
gluon.scheduler.
JobGraph
(db, job_name)[source]¶ Bases:
object
Experimental: with JobGraph you can specify dependencies amongs tasks
-
class
gluon.scheduler.
MetaScheduler
[source]¶ Bases:
threading.Thread
Base class documenting scheduler’s base methods
-
class
gluon.scheduler.
Scheduler
(db, tasks=None, migrate=True, worker_name=None, group_names=None, heartbeat=3, max_empty_runs=0, discard_results=False, utc_time=False)[source]¶ Bases:
gluon.scheduler.MetaScheduler
Scheduler object
Parameters: - db – DAL connection where Scheduler will create its tables
- tasks (dict) – either a dict containing name–>func or None. If None, functions will be searched in the environment
- migrate (bool) – turn migration on/off for the Scheduler’s tables
- worker_name (str) – force worker_name to identify each process. Leave it to None to autoassign a name (hostname#pid)
- group_names (list) – process tasks belonging to this group defaults to [‘main’] if nothing gets passed
- heartbeat (int) – how many seconds the worker sleeps between one execution and the following one. Indirectly sets how many seconds will pass between checks for new tasks
- max_empty_runs (int) – how many loops are allowed to pass without processing any tasks before exiting the process. 0 to keep always the process alive
- discard_results (bool) – Scheduler stores executions’s details into the scheduler_run table. By default, only if there is a result the details are kept. Turning this to True means discarding results even for tasks that return something
- utc_time (bool) – do all datetime calculations assuming UTC as the timezone. Remember to pass start_time and stop_time to tasks accordingly
-
assign_tasks
(db)[source]¶ Assigns task to workers, that can then pop them from the queue
Deals with group_name(s) logic, in order to assign linearly tasks to available workers for those groups
-
being_a_ticker
()[source]¶ Elects a TICKER process that assigns tasks to available workers. Does its best to elect a worker that is not busy processing other tasks to allow a proper distribution of tasks among all active workers ASAP
-
disable
(group_names=None, limit=None, worker_name=None)[source]¶ Sets DISABLED on the workers processing group_names tasks. A DISABLED worker will be kept alive but it won’t be able to process any waiting tasks, essentially putting it to sleep. By default, all group_names of Scheduler’s instantation are selected
-
get_workers
(only_ticker=False)[source]¶ Returns a dict holding worker_name : {**columns} representing all “registered” workers only_ticker returns only the workers running as a TICKER, if there are any
-
kill
(group_names=None, limit=None, worker_name=None)[source]¶ Sets KILL as worker status. The worker will be killed even if it’s processing a task.
-
loop
(worker_name=None)[source]¶ Main loop
This works basically as a neverending loop that:
- checks if the worker is ready to process tasks (is not DISABLED)
- pops a task from the queue
- if there is a task:
- spawns the executor background process
- waits for the process to be finished
- sleeps heartbeat seconds
- if there is not a task:
- checks for max_empty_runs
- sleeps heartbeat seconds
-
queue_task
(function, pargs=[], pvars={}, **kwargs)[source]¶ Queue tasks. This takes care of handling the validation of all parameters
Parameters: - function – the function (anything callable with a __name__)
- pargs – “raw” args to be passed to the function. Automatically jsonified.
- pvars – “raw” kwargs to be passed to the function. Automatically jsonified
- kwargs – all the parameters available (basically, every scheduler_task column). If args and vars are here, they should be jsonified already, and they will override pargs and pvars
Returns: a dict just as a normal validate_and_insert(), plus a uuid key holding the uuid of the queued task. If validation is not passed ( i.e. some parameters are invalid) both id and uuid will be None, and you’ll get an “error” dict holding the errors found.
-
report_task
(task, task_report)[source]¶ Takes care of storing the result according to preferences and deals with logic for repeating tasks
-
resume
(group_names=None, limit=None, worker_name=None)[source]¶ Wakes a worker up (it will be able to process queued tasks)
-
send_heartbeat
(counter)[source]¶ This function is vital for proper coordination among available workers. It:
sends the heartbeat
- elects a ticker among available workers (the only process that
effectively dispatch tasks to workers)
deals with worker’s statuses
does “housecleaning” for dead workers
triggers tasks assignment to workers
-
set_worker_status
(group_names=None, action='ACTIVE', exclude=None, limit=None, worker_name=None)[source]¶ Internal function to set worker’s status
-
sleep
()[source]¶ Calculates the number of seconds to sleep according to worker’s status and heartbeat parameter
-
stop_task
(ref)[source]¶ Shortcut for task termination.
If the task is RUNNING it will terminate it, meaning that status will be set as FAILED.
- If the task is QUEUED, its stop_time will be set as to “now”,
- the enabled flag will be set to False, and the status to STOPPED
Parameters: ref – can be
- an integer : lookup will be done by scheduler_task.id
- a string : lookup will be done by scheduler_task.uuid
Returns: - 1 if task was stopped (meaning an update has been done)
- None if task was not found, or if task was not RUNNING or QUEUED
Note
Experimental
-
task_status
(ref, output=False)[source]¶ Retrieves task status and optionally the result of the task
Parameters: - ref –
can be
- an integer : lookup will be done by scheduler_task.id
- a string : lookup will be done by scheduler_task.uuid
- a Query : lookup as you wish, e.g.
db.scheduler_task.task_name == 'test1'
- output (bool) – if True, fetch also the scheduler_run record
Returns: a single Row object, for the last queued task. If output == True, returns also the last scheduler_run record. The scheduler_run record is fetched by a left join, so it can have all fields == None
- ref –
-
terminate
(group_names=None, limit=None, worker_name=None)[source]¶ Sets TERMINATE as worker status. The worker will wait for any currently running tasks to be executed and then it will exit gracefully
-
wrapped_assign_tasks
(db)[source]¶ Commodity function to call assign_tasks and trap exceptions If an exception is raised, assume it happened because of database contention and retries assign_task after 0.5 seconds
-
class
gluon.scheduler.
TYPE
(myclass=<type 'list'>, parse=False)[source]¶ Bases:
object
Validator that checks whether field is valid json and validates its type. Used for args and vars of the scheduler_task table
-
class
gluon.scheduler.
Task
(app, function, timeout, args='[]', vars='{}', **kwargs)[source]¶ Bases:
object
Defines a “task” object that gets passed from the main thread to the executor’s one
-
class
gluon.scheduler.
TaskReport
(status, result=None, output=None, tb=None)[source]¶ Bases:
object
Defines a “task report” object that gets passed from the executor’s thread to the main one