wy_qcos.task_manager package

Submodules

wy_qcos.task_manager.task_manager module

class wy_qcos.task_manager.task_manager.TaskFlowManager

基类:object

Task manager based on prefect framework.

static convert_to_qcos_state(state)

Convert to qcos state.

参数:

state -- prefect state

返回:

qcos state.

static convert_to_prefect_states(states)

Convert qcos states to prefect states.

参数:

states -- qcos states list

返回:

prefect states list.

generate_deployment_configs(device_names)

Generate deployment configs.

参数:

device_names -- device names

返回:

deployment configs

start()

Create work pools, queues and start workers.

set_driver_manager(driver_manager)

Set driver manager.

参数:

driver_manager -- driver manager

set_device_manager(device_manager)

Set device manager.

参数:

device_manager -- device manager

check_connection()

Check connection to prefect server.

async create_pools(pool_names)

Create all work pools, each device has own work pools.

参数:

pool_names -- pool names

async create_pool(pool_name, concurrency_limit=None)

Create work pool by prefect client.

参数:
  • pool_name -- work pool name, using device name

  • concurrency_limit -- concurrency limit

async create_queues(queue_names)

Create all work queues under work pool.

each priority has own work queue.

参数:

queue_names -- queue names

async create_deployments(deployment_configs)

Create deployment by prefect client.

参数:

deployment_configs -- deployment configs

get_deployment(deployment_name)

Get deployment.

参数:

deployment_name -- deployment name

kill_workers()

Kill workers.

start_workers()

Start workers using multiprocessing.

run_device_monitor()

Run device monitor by prefect.

static get_prefect_configs()

Set prefect configs.

static start_work(process_name, pool_name, concurrency_limit)

Start worker by prefect client.

参数:
  • process_name -- process name

  • pool_name -- work pool name

  • concurrency_limit -- max num of jobs running at the same time

static start_device_monitor_work(process_name, pool_name, concurrency_limit)

Start device monitor worker by prefect client.

参数:
  • process_name -- process name

  • pool_name -- work pool name

  • concurrency_limit -- max num of jobs running at the same time

static start_device_mgr_work(process_name, pool_name, concurrency_limit)

Start device mgr worker by prefect client.

参数:
  • process_name -- process name

  • pool_name -- work pool name

  • concurrency_limit -- max num of jobs running at the same time

async wait_workers()

Start all workers for work pool.

run_flow(deployment_id, args, tags=None, work_queue_name=None)

Run flow.

参数:
  • deployment_id -- deploy uuid

  • args (dict[str, Any]) -- flow function args in dict

  • tags -- prefect flow tags

  • work_queue_name -- work queue name

返回:

flow run uuid

run_manage_task_flow(deployment_id, args, work_queue_name=None)

Run flow.

参数:
  • deployment_id -- deploy uuid

  • args (dict[str, Any]) -- flow function args in dict

  • tags -- prefect flow tags

  • work_queue_name -- work queue name

返回:

flow run uuid

get_flow_run_id_by_job_id(job_id, tags=None)
async run_flow_by_client(deployment_id, args, tags=None, work_queue_name=None)

Run flow by prefect client.

参数:
  • deployment_id -- deploy uuid

  • args (dict[str, Any]) -- flow function args in dict

  • tags -- prefect flow tags

  • work_queue_name -- work queue name

返回:

flow run ids

返回类型:

flow run ids

async run_manage_task_flow_by_client(deployment_id, args, work_queue_name=None)

Run flow by prefect client.

参数:
  • deployment_id -- deploy uuid

  • args (dict[str, Any]) -- flow function args in dict

  • work_queue_name -- work queue name

get_flow_result(job_id, tags=None)

Get flow run state and result.

参数:
  • job_id -- job uuid

  • tags -- prefect flow tags

返回:

state, parameters, result, err_msg

update_flow(flow_run_id, name=None, parameters=None, variables=None)

Update flow.

参数:
  • flow_run_id -- flow run id

  • name -- flow name (Default value = None)

  • parameters -- flow parameters (Default value = None)

  • variables -- flow variables

返回:

if flow exists (Default value = None)

get_flow_result_by_client(flow_run_id)

Get flow run state and result by prefect client.

参数:

flow_run_id -- flow run uuid

返回:

state_name, parameters, result, state_message

get_flow_list(tags=None)

Get flow run list.

参数:

tags -- prefect flow tags

返回:

flow run list

get_flow_list_by_client(tags=None, sort_fields=['-created'], reverse=False)

Get flow run list by prefect client.

参数:
  • sort_fields -- sort fields (Default value = ['-created'])

  • reverse -- reverse order

  • tags -- prefect flow tags

返回:

flow run list

get_flow_run(flow_run_id, tags=None)

Get flow run.

参数:
  • flow_run_id -- flow run id

  • tags -- flow tags

返回:

flow run.

delete_flow_runs(flow_run_ids)

Delete flow runs by client.

参数:

flow_run_ids -- flow run uuid list

返回:

success_list.

cancel_flow_runs(flow_run_ids, tags=None)

Cancel flow runs.

参数:
  • flow_run_ids -- flow run is list

  • tags -- prefect flow tags

返回:

success list.

delete_task_flow_by_name(flow_name)

Delete flow .

参数:

flow_name -- flow name

返回:

success flow_id.

get_flow_runs_with_filters(states=None, tags=None, pool_name=None)

Get flow runs with filters.

参数:
  • states -- flow states

  • tags -- prefect flow tags

  • pool_name -- pool name

返回:

flow runs.

async process_aggregation_job()

Process aggregation job.

wy_qcos.task_manager.task_scheduler module

class wy_qcos.task_manager.task_scheduler.TaskScheduler

基类:object

Task scheduler.

start_taskmanager()

Start TaskManager.

set_driver_manager(driver_manager)

Set driver manager.

参数:

driver_manager -- driver manager

get_task_manager()

Get task manager.

返回:

task manager

get_driver_manager()

Get driver manager.

返回:

driver manager

set_transpiler_manager(transpiler_manager)

Set transpiler manager.

参数:

transpiler_manager -- transpiler manager

get_transpiler_manager()

Get transpiler manager.

返回:

transpiler manager

set_device_manager(device_manager)

Set device manager.

参数:

device_manager -- device manager

set_db_engine(db_engine)

Set database engine.

参数:

db_engine -- database engine

get_device_manager()

Get device manager.

返回:

device manager

submit(job_info, tags=None, extra_job_data_info={})

Submit job to scheduler.

参数:
  • job_info -- job info

  • tags -- prefect flow tags

  • extra_job_data_info -- extra job data info

返回:

submitted job info, error messages

submit_manage_job(job_info)

Submit manage job to scheduler.

参数:

job_info -- job info

返回:

submitted job info, error messages

delete_flows(flow_run_ids)

Delete flows.

参数:

flow_run_ids -- flow run id list

返回:

flow list

cancel_flows(flow_run_ids, tags=None)

Cancel jobs.

参数:
  • flow_run_ids -- flow run id list

  • tags -- prefect flow tags

返回:

flow list

update_flow(flow_run_id, name=None, parameters=None, variables=None)

Update job.

参数:
  • flow_run_id -- flow run id

  • name -- job name (Default value = None)

  • parameters -- job parameters (Default value = None)

  • variables -- job variables

返回:

if flow exists

process_unfinished_jobs()

Process unfinished jobs in database.

Checks all jobs in database. If job_status is not one of the final states (COMPLETED, CANCELLED, DELETED, FAILED), sets it to FAILED. This handles jobs that may have been interrupted or left in intermediate states during system restart.

process_callbacks()

Process unfinished callbacks from database.

Reads job records where callbacks are not empty and is_callback_success is False, then executes the callbacks. Updates is_callback_success to True on successful execution.

class wy_qcos.task_manager.task_scheduler.PrioritySchedulingPolicy(task_manager)

基类:object

Priority Scheduling Policy.

参数:

task_manager (TaskFlowManager)

exec_task(deployment, job_info, tags=None)

Execute task.

参数:
  • deployment -- deployment info

  • job_info -- job info

  • tags -- prefect flow tags

返回:

flow run id

exec_manage_task(deployment, device_mgr_info)

Execute task.

参数:
  • deployment -- deployment info

  • device_mgr_info -- device_mgr_info

返回:

job uuid

Module contents