3. API Reference

3.1. Graph

class artron.graph.Graph

Graph class is an oriented graph are directed graphs having no bidirected edges.

To get a complete Graph course, code and more generate, go on http://www.python-course.eu/graphs_python.php

Parameters:tasks (dict) – dict with key as task id and value as task obj
size

int – Graph’s tasks size.

Examples

>>> from artron import Task, Graph
>>>
>>> task1 = Task('for_test-tid1', {'msg': 'hello1'}, 'for_test')
>>> task2 = Task('for_test-tid2', {'msg': 'hello2'}, 'for_test')
>>> task3 = Task('for_test-tid3', {'msg': 'hello3'}, 'for_test')
>>> task4 = Task('for_test-tid4', {'msg': 'hello4'}, 'for_test')
>>>
>>> task1.add_require(task2.tid)
>>> task1.add_require(task3.tid)
>>> task1.add_require(task4.tid)
>>> task2.add_require(task4.tid)
>>>
>>> tasks = {
...     task1.tid: task1,
...     task2.tid: task2,
...     task3.tid: task3,
...     task4.tid: task4
... }
>>>
>>> graph = Graph(tasks)
>>> graph
{
    'for_test-tid4': [],
    'for_test-tid1': ['for_test-tid2',
    'for_test-tid3',
    'for_test-tid4'],
    'for_test-tid3': [],
    'for_test-tid2': ['for_test-tid4']
}
edges()

Generate oriented graph’s edges

Yields:

tuple

a tuple of 2 vertices the edge ie. (A,B,)

where A depends on B

Examples

>>> list(graph.edges())
[
    ('for_test-tid1', 'for_test-tid2'),
    ('for_test-tid1', 'for_test-tid3'),
    ('for_test-tid1', 'for_test-tid4'),
    ('for_test-tid2', 'for_test-tid4'),
    ('for_test-tid3', 'for_test-tid3'),
    ('for_test-tid4', 'for_test-tid4')
]
isolated_vertices()

Generate a list of isolated vertices (ie no dependency)

Yields:str – isolated vertices.

Examples

>>> graph
[
    ('for_test-tid1', 'for_test-tid2'),
    ('for_test-tid1', 'for_test-tid3'),
    ('for_test-tid1', 'for_test-tid4'),
    ('for_test-tid2', 'for_test-tid4'),
    ('for_test-tid3', 'for_test-tid3'),
    ('for_test-tid4', 'for_test-tid4')
]
>>> list(graph.isolated_vertices())
['for_test-tid4', 'for_test-tid3']
remove_vertex(vertex)

Remove vertex from graph and all childs.

Parameters:vertex (str) – vertex to remove.
size = None

ici

3.2. Manager

class artron.manager.Manager

Manager class is the central piece of Artron. It takes care of creating, start and queueing jobs.

builder

obj – Builder object with the func to run.

lock

multiprocessing.Lock – Lock on ressource access.

max_retry

int – Number of retry when task fail.

progress

obj – Progress bar.

queue

multiprocessing.Manager.Queue – queue can be shared between subprocesses.

sleep

int – sleep value in seconds when iterating edges.

tasks

multiprocessing.managers.DictProxy – shared dict object and return a proxy for it.

timeout

int – timestamp when timeout will be triggered.

workers

list – list of artron.worker.Worker

Parameters:
  • builder (obj) – Builder object with the func to run.
  • nb_workers (int) – number of worker
  • workers (list) – list of artron.worker.Worker
  • queue (multiprocessing.Manager.Queue) – queue can be shared between subprocesses.
  • run_timeout (int) – number of seconds for timeout. Will be added to attribute timeout with current timestamp.
  • sleep (int) – sleep value itasks (multiprocessing.managers.DictProxy): shared dict object and return a proxy for it.tasks=None
  • max_retry (int) – Number of retry when task fail.
  • progress (obj) – Progress bar.

Examples

>>> manager = Manager(builder, max_retry=2)
add(task)

Add task to manager

Parameters:
start()

Start manager

Returns:
run result in form.
>>> {
    'date_end': '2018-08-29T20:02:54.640Z',
    'date_start': '2018-08-29T20:02:39.606Z',
    'elapsed': '00:00:15',
    'exit_code': 1,
    'results': {
        'aborted': 0,
        'deps': 2,
        'failures': 1,
        'nrun': 0,
        'ready': 0,
        'success': 3
    },
    'tasks': [
        {
            'date_created': '2018-08-29T20:02:39.605Z',
            'date_end': None,
            'date_start': None,
            'func': 'builder_func_1',
            'inputs': {'msg': 'task-1-msg'},
            'require': ['task-id-2', 'task-id-4'],
            'results': None,
            'state': -2,
            'tid': 'task-id-1',
            'time_duration': 0.0,
            'time_duration_str': '00:00:00'
        },{
            'date_created': '2018-08-29T20:02:39.605Z',
            'date_end': '2018-08-29T20:02:41.617Z',
            'date_start': '2018-08-29T20:02:40.615Z',
            'func': 'builder_func_3',
            'inputs': {'msg': 'task-3-msg'},
            'require': [],
            'results': 'ERROR builder_func_3',
            'state': -1,
            'tid': 'task-id-3',
            'time_duration': 1.0019969940185547,
            'time_duration_str': '00:00:01'
        }
    ]
}
Return type:dict

3.3. Task

class artron.task.Task

A task could run on a builder.

It’s not the builder who manage tasks but a task will safely be runnable on each builder.

To get a complete Graph course, code and more generate, go on http://www.python-course.eu/graphs_python.php

Parameters:
  • tasks (dict) – dict with key as task id and value as task obj
  • tid (str) – task uniq identifier.
  • inputs (dict) – kwargs format to send to the func
  • func (str) – function name to use on the builder. The builder is not in object but as arg in run. Because the task should be runnable on different builders.
  • require (Optional[list]) – List of required task ids .Defaults to None.
tid

str – task uniq identifier.

inputs

dict – kwargs format to send to the func

func

str – function name to use on the builder. The builder is not in object but as arg in run. Because the task should be runnable on different builders.

require

Optional[list] – List of required task ids .Defaults to None.

state

int – Task state, one of TASK_* attribute.

results

obj – Task’s func results.

date_created

str – date when task created.

date_start

str – date when task started.

date_end

str – date when task ended.

time_duration

float – Task run duration.

time_duration_str

str – Task run duration in form ‘00:00:00’.

STATE_WRONG

int – status for wrong execution like task w requirements.

STATE_DEPENDENCY

int – status for deps error ie. parent task failed.

STATE_ERROR

int – status for failed task.

STATE_INIT

int – status for fresh task.

STATE_READY

int – status for ready to run ie. not deps.

STATE_RUNNING

int – status for running task.

STATE_SUCCESS

int – status for finished and success task.

add_require(task_id)

Add dependency

Parameters:task_id (str) – task id to add as dependency
del_require(task_id)

Delete dependency

Parameters:task_id (str) – task id to remove as dependency
is_finished()

Check if a task is finnished

Returns:if a task is not in state init or ready
Return type:bool
is_runnable()

Is the task runnable

Returns:true if state init and no dependencies
Return type:bool
run(builder, retry)

Run task on specified builder.

Parameters:
  • builder (obj) – Builder object with the func to run.
  • retry (bool) – number of retry.
Raises:

TaskDependenciesError – If the task has dependencies.

update_childs(_tasks)

Update task childs

When a task state changes, update childs. If the task success, remove dependencies on childs otherwise mark childs states with STATE_DEPENDENCY

Parameters:tasks (dict) – dict with key as task id and value as task obj
Yields:dict – updated {taskid: task} item.

3.4. Utils

3.5. Worker

class artron.worker.Worker

This module provides a queue implementation. It provides a convenient way of moving Python objects between different subprocesses.

Parameters:
  • builder (obj) – Builder object with the func to run.
  • queue (multiprocessing.Manager.Queue) – queue can be shared between subprocesses.
  • name (str) – Worker name.
  • tasks (multiprocessing.managers.DictProxy) – shared dict object and return a proxy for it.
  • max_retry (str) – Number of retry when task fail.
builder

obj – Builder object with the func to run.

queue

multiprocessing.Manager.Queue – queue can be shared between subprocesses.

name

str – Worker name.

tasks

multiprocessing.managers.DictProxy – shared dict object and return a proxy for it.

max_retry

str – Number of retry when task fail.

lock

multiprocessing.Lock – Lock on ressource access.

run()

Run infinite while receive a marker var or exec something

stop()

Stop the worker