wy_qcos.task_manager package

Submodules

wy_qcos.task_manager.task_manager module

class wy_qcos.task_manager.task_manager.TaskFlowManager

基类:ABC

Task manager based on prefect framework.

static convert_to_qcos_state(state)

Convert to qcos state.

参数:

state -- prefect state

返回:

qcos state.

static convert_to_prefect_states(states)

Convert qcos states to prefect states.

参数:

states -- qcos states list

返回:

prefect states list.

generate_deployment_configs(device_names)

Generate deployment configs.

参数:

device_names -- device names

返回:

deployment configs

start()

Create work pools, queues and start workers.

set_driver_manager(driver_manager)

Set driver manager.

参数:

driver_manager -- driver manager

set_device_manager(device_manager)

Set device manager.

参数:

device_manager -- device manager

check_connection()

Check connection to prefect server.

async create_pools(pool_names)

Create all work pools, each device has own work pools.

参数:

pool_names -- pool names

async create_pool(pool_name, concurrency_limit=None)

Create work pool by prefect client.

参数:
  • pool_name -- work pool name, using device name

  • concurrency_limit -- concurrency limit

async create_queues(queue_names)

Create all work queues under work pool.

each priority has own work queue.

参数:

queue_names -- queue names

async create_deployments(deployment_configs)

Create deployment by prefect client.

参数:

deployment_configs -- deployment configs

get_deployment(deployment_name)

Get deployment.

参数:

deployment_name -- deployment name

kill_workers()

Kill workers.

start_workers()

Start workers using multiprocessing.

run_device_monitor()

Run device monitor by prefect.

static get_prefect_configs()

Set prefect configs.

static start_work(process_name, pool_name, concurrency_limit)

Start worker by prefect client.

参数:
  • process_name -- process name

  • pool_name -- work pool name

  • concurrency_limit -- max num of jobs running at the same time

static start_device_monitor_work(process_name, pool_name, concurrency_limit)

Start device monitor worker by prefect client.

参数:
  • process_name -- process name

  • pool_name -- work pool name

  • concurrency_limit -- max num of jobs running at the same time

static start_device_mgr_work(process_name, pool_name, concurrency_limit)

Start device mgr worker by prefect client.

参数:
  • process_name -- process name

  • pool_name -- work pool name

  • concurrency_limit -- max num of jobs running at the same time

async wait_workers()

Start all workers for work pool.

run_task_flow(deployment_id, args, tags=None, work_queue_name=None)

Run flow.

参数:
  • deployment_id -- deploy uuid

  • args (dict[str, Any]) -- flow function args in dict

  • tags -- prefect flow tags

  • work_queue_name -- work queue name

返回:

flow run uuid

run_manage_task_flow(deployment_id, args, work_queue_name=None)

Run flow.

参数:
  • deployment_id -- deploy uuid

  • args (dict[str, Any]) -- flow function args in dict

  • tags -- prefect flow tags

  • work_queue_name -- work queue name

返回:

flow run uuid

get_flow_run_id_by_job_id(job_id, tags=None)
async run_task_flow_by_client(deployment_id, args, tags=None, work_queue_name=None)

Run flow by prefect client.

参数:
  • deployment_id -- deploy uuid

  • args (dict[str, Any]) -- flow function args in dict

  • tags -- prefect flow tags

  • work_queue_name -- work queue name

返回:

job uuid

返回类型:

job_id

async run_manage_task_flow_by_client(deployment_id, args, work_queue_name=None)

Run flow by prefect client.

参数:
  • deployment_id -- deploy uuid

  • args (dict[str, Any]) -- flow function args in dict

  • work_queue_name -- work queue name

get_task_flow_result(job_id, tags=None)

Get flow run state and result.

参数:
  • job_id -- job uuid

  • tags -- prefect flow tags

返回:

state, parameters, result, err_msg

delete_flow_artifacts(flow_run_id)

Delete flow artifacts.

参数:

flow_run_id -- flow run id

get_job_artifact(job_id)

Get job artifact.

参数:

job_id -- job id

返回:

artifact

get_job_artifact_by_client(job_id)

Get job artifact by client.

参数:

job_id -- job id

返回:

artifact

has_flow(job_id)

Check if flow exists.

参数:

job_id -- job uuid

返回:

if job exists

update_flow(job_id, name=None, parameters=None, variables=None)

Update flow.

参数:
  • job_id -- job uuid

  • name -- flow name (Default value = None)

  • parameters -- flow parameters (Default value = None)

  • variables -- flow variables

返回:

if flow exists (Default value = None)

get_task_flow_result_by_client(flow_run_id)

Get flow run state and result by prefect client.

参数:

flow_run_id -- flow run uuid

返回:

state_name, parameters, result, state_message

get_task_flow_list(tags=None)

Get flow run list.

参数:

tags -- prefect flow tags

返回:

flow run list

get_task_flow_list_by_client(tags=None, sort_fields=['-created'], reverse=False)

Get flow run list by prefect client.

参数:
  • sort_fields -- sort fields (Default value = ['-created'])

  • reverse -- reverse order

  • tags -- prefect flow tags

返回:

flow run list

get_task_flow_run(job_id, tags=None)

Get flow run.

参数:
  • job_id -- job id

  • tags -- prefect flow tags

返回:

flow run.

delete_task_flow_run(job_ids, tags=None)

Delete flow run.

参数:
  • job_ids -- job uuid list

  • tags -- prefect flow tags

返回:

success list.

delete_task_flow_run_by_client(flow_run_ids)

Delete flow run by client.

参数:

flow_run_ids -- flow run uuid list

返回:

success_list.

cancel_task_flow_run(job_ids, tags=None)

Cancel flow run.

参数:
  • job_ids -- job uuid list

  • tags -- prefect flow tags

返回:

success list.

cancel_task_flow_run_by_client(flow_run_ids)

Cancel flow run by client.

参数:

flow_run_ids -- flow run uuid list

返回:

success list.

delete_task_flow_by_name(flow_name)

Delete flow .

参数:

flow_name -- flow name

返回:

success flow_id.

get_flow_runs_with_filters(states=None, tags=None, pool_name=None)

Get flow runs with filters.

参数:
  • states -- flow states

  • tags -- prefect flow tags

  • pool_name -- pool_name

返回:

flow runs.

run_callbacks(data, callbacks)

Run callbacks for job.

参数:
  • data -- data to send

  • callbacks -- callbacks.

async process_aggregation_job()

Process aggregation job.

wy_qcos.task_manager.task_scheduler module

class wy_qcos.task_manager.task_scheduler.TaskScheduler

基类:ABC

Task scheduler.

start_taskmanager()

Start TaskManager.

set_driver_manager(driver_manager)

Set driver manager.

参数:

driver_manager -- driver manager

get_task_manager()

Get task manager.

返回:

task manager

get_driver_manager()

Get driver manager.

返回:

driver manager

set_transpiler_manager(transpiler_manager)

Set transpiler manager.

参数:

transpiler_manager -- transpiler manager

get_transpiler_manager()

Get transpiler manager.

返回:

transpiler manager

set_device_manager(device_manager)

Set device manager.

参数:

device_manager -- device manager

get_device_manager()

Get device manager.

返回:

device manager

add(job_info, tags=None)

Add job to scheduler.

参数:
  • job_info -- job info

  • tags -- prefect flow tags

返回:

added job info, error messages

add_manage_job(job_info)

Add manage job to scheduler.

参数:

job_info -- job info

返回:

added job info, error messages

get_result_by_id(job_id, tags=None)

Get result by job id.

参数:
  • job_id -- job id

  • tags -- prefect flow tags

返回:

flow info

has_job(job_id)

Check if flow exists.

参数:

job_id -- job id

返回:

if flow exists

get_jobs(tags=None)

Get job list.

参数:

tags -- prefect flow tags

返回:

job list

async aget_jobs(tags=None)

Asynchronously get job list.

This method is designed to be used in async contexts to avoid coroutine object issues when accessing state.result().

参数:

tags -- prefect flow tags

返回:

job list

delete_jobs(ids, tags=None)

Delete jobs.

参数:
  • ids -- job id list

  • tags -- prefect flow tags

返回:

flow list

cancel_jobs(ids, tags=None)

Cancel jobs.

参数:
  • ids -- job id list

  • tags -- prefect flow tags

返回:

flow list

update_job(job_id, name=None, parameters=None, variables=None, tags=None)

Update job.

参数:
  • job_id -- job id

  • name -- job name (Default value = None)

  • parameters -- job parameters (Default value = None)

  • variables -- job variables

  • tags -- prefect flow tags

返回:

if flow exists

run_callbacks(data, callbacks)

Run callbacks for job.

参数:
  • data -- data to send

  • callbacks -- callbacks

process_callbacks()

Process unfinished callbacks.

static get_job_status(job_status, flow_results, flow_parameters)

Get job status by combining flow state and user defined task status.

参数:
  • job_status -- job status

  • flow_results -- flow results

  • flow_parameters -- parameters

返回:

job status

class wy_qcos.task_manager.task_scheduler.PrioritySchedulingPolicy(task_manager)

基类:ABC

Priority Scheduling Policy.

参数:

task_manager (TaskFlowManager)

exec_task(deployment, job_info, tags=None)

Execute task.

参数:
  • deployment -- deployment info

  • job_info -- job info

  • tags -- prefect flow tags

返回:

job uuid

exec_manage_task(deployment, device_mgr_info)

Execute task.

参数:
  • deployment -- deployment info

  • device_mgr_info -- device_mgr_info

返回:

job uuid

calculate_priority(job_info)

Calculate priority.

参数:

job_info -- job info

返回:

job priority

Module contents