wy_qcos.task_manager package
Submodules
wy_qcos.task_manager.task_manager module
- class wy_qcos.task_manager.task_manager.TaskFlowManager
基类:
ABCTask manager based on prefect framework.
- static convert_to_qcos_state(state)
Convert to qcos state.
- 参数:
state -- prefect state
- 返回:
qcos state.
- static convert_to_prefect_states(states)
Convert qcos states to prefect states.
- 参数:
states -- qcos states list
- 返回:
prefect states list.
- generate_deployment_configs(device_names)
Generate deployment configs.
- 参数:
device_names -- device names
- 返回:
deployment configs
- start()
Create work pools, queues and start workers.
- set_driver_manager(driver_manager)
Set driver manager.
- 参数:
driver_manager -- driver manager
- set_device_manager(device_manager)
Set device manager.
- 参数:
device_manager -- device manager
- check_connection()
Check connection to prefect server.
- async create_pools(pool_names)
Create all work pools, each device has own work pools.
- 参数:
pool_names -- pool names
- async create_pool(pool_name, concurrency_limit=None)
Create work pool by prefect client.
- 参数:
pool_name -- work pool name, using device name
concurrency_limit -- concurrency limit
- async create_queues(queue_names)
Create all work queues under work pool.
each priority has own work queue.
- 参数:
queue_names -- queue names
- async create_deployments(deployment_configs)
Create deployment by prefect client.
- 参数:
deployment_configs -- deployment configs
- get_deployment(deployment_name)
Get deployment.
- 参数:
deployment_name -- deployment name
- kill_workers()
Kill workers.
- start_workers()
Start workers using multiprocessing.
- run_device_monitor()
Run device monitor by prefect.
- static get_prefect_configs()
Set prefect configs.
- static start_work(process_name, pool_name, concurrency_limit)
Start worker by prefect client.
- 参数:
process_name -- process name
pool_name -- work pool name
concurrency_limit -- max num of jobs running at the same time
- static start_device_monitor_work(process_name, pool_name, concurrency_limit)
Start device monitor worker by prefect client.
- 参数:
process_name -- process name
pool_name -- work pool name
concurrency_limit -- max num of jobs running at the same time
- async wait_workers()
Start all workers for work pool.
- run_task_flow(deployment_id, args, tags=None, work_queue_name=None)
Run flow.
- 参数:
deployment_id -- deploy uuid
args (dict[str, Any]) -- flow function args in dict
tags -- prefect flow tags
work_queue_name -- work queue name
- 返回:
flow run uuid
- get_flow_run_id_by_job_id(job_id, tags=None)
- async run_task_flow_by_client(deployment_id, args, tags=None, work_queue_name=None)
Run flow by prefect client.
- 参数:
deployment_id -- deploy uuid
args (dict[str, Any]) -- flow function args in dict
tags -- prefect flow tags
work_queue_name -- work queue name
- 返回:
job uuid
- 返回类型:
job_id
- get_task_flow_result(job_id, tags=None)
Get flow run state and result.
- 参数:
job_id -- job uuid
tags -- prefect flow tags
- 返回:
state, parameters, result, err_msg
- delete_flow_artifacts(flow_run_id)
Delete flow artifacts.
- 参数:
flow_run_id -- flow run id
- get_job_artifact(job_id)
Get job artifact.
- 参数:
job_id -- job id
- 返回:
artifact
- get_job_artifact_by_client(job_id)
Get job artifact by client.
- 参数:
job_id -- job id
- 返回:
artifact
- has_flow(job_id)
Check if flow exists.
- 参数:
job_id -- job uuid
- 返回:
if job exists
- update_flow(job_id, name=None, parameters=None, variables=None)
Update flow.
- 参数:
job_id -- job uuid
name -- flow name (Default value = None)
parameters -- flow parameters (Default value = None)
variables -- flow variables
- 返回:
if flow exists (Default value = None)
- get_task_flow_result_by_client(flow_run_id)
Get flow run state and result by prefect client.
- 参数:
flow_run_id -- flow run uuid
- 返回:
state_name, parameters, result, state_message
- get_task_flow_list(tags=None)
Get flow run list.
- 参数:
tags -- prefect flow tags
- 返回:
flow run list
- get_task_flow_list_by_client(tags=None, sort_fields=['-created'], reverse=False)
Get flow run list by prefect client.
- 参数:
sort_fields -- sort fields (Default value = ['-created'])
reverse -- reverse order
tags -- prefect flow tags
- 返回:
flow run list
- get_task_flow_run(job_id, tags=None)
Get flow run.
- 参数:
job_id -- job id
tags -- prefect flow tags
- 返回:
flow run.
- delete_task_flow_run(job_ids, tags=None)
Delete flow run.
- 参数:
job_ids -- job uuid list
tags -- prefect flow tags
- 返回:
success list.
- delete_task_flow_run_by_client(flow_run_ids)
Delete flow run by client.
- 参数:
flow_run_ids -- flow run uuid list
- 返回:
success_list.
- cancel_task_flow_run(job_ids, tags=None)
Cancel flow run.
- 参数:
job_ids -- job uuid list
tags -- prefect flow tags
- 返回:
success list.
- cancel_task_flow_run_by_client(flow_run_ids)
Cancel flow run by client.
- 参数:
flow_run_ids -- flow run uuid list
- 返回:
success list.
- delete_task_flow_by_name(flow_name)
Delete flow .
- 参数:
flow_name -- flow name
- 返回:
success flow_id.
- get_flow_runs_with_filters(states=None, tags=None)
Get flow runs with filters.
- 参数:
states -- flow states
tags -- prefect flow tags
- 返回:
flow runs.
- run_callbacks(data, callbacks)
Run callbacks for job.
- 参数:
data -- data to send
callbacks -- callbacks.
- async process_aggregation_job()
Process aggregation job.
wy_qcos.task_manager.task_scheduler module
- class wy_qcos.task_manager.task_scheduler.TaskScheduler
基类:
ABCTask scheduler.
- start_taskmanager()
Start TaskManager.
- set_driver_manager(driver_manager)
Set driver manager.
- 参数:
driver_manager -- driver manager
- get_task_manager()
Get task manager.
- 返回:
task manager
- get_driver_manager()
Get driver manager.
- 返回:
driver manager
- set_transpiler_manager(transpiler_manager)
Set transpiler manager.
- 参数:
transpiler_manager -- transpiler manager
- get_transpiler_manager()
Get transpiler manager.
- 返回:
transpiler manager
- set_device_manager(device_manager)
Set device manager.
- 参数:
device_manager -- device manager
- get_device_manager()
Get device manager.
- 返回:
device manager
- add(job_info, tags=None)
Add job to scheduler.
- 参数:
job_info -- job info
tags -- prefect flow tags
- 返回:
added job info, error messages
- get_result_by_id(job_id, tags=None)
Get result by job id.
- 参数:
job_id -- job id
tags -- prefect flow tags
- 返回:
flow info
- has_job(job_id)
Check if flow exists.
- 参数:
job_id -- job id
- 返回:
if flow exists
- get_jobs(tags=None)
Get job list.
- 参数:
tags -- prefect flow tags
- 返回:
job list
- delete_jobs(ids, tags=None)
Delete jobs.
- 参数:
ids -- job id list
tags -- prefect flow tags
- 返回:
flow list
- cancel_jobs(ids, tags=None)
Cancel jobs.
- 参数:
ids -- job id list
tags -- prefect flow tags
- 返回:
flow list
- update_job(job_id, name=None, parameters=None, variables=None, tags=None)
Update job.
- 参数:
job_id -- job id
name -- job name (Default value = None)
parameters -- job parameters (Default value = None)
variables -- job variables
tags -- prefect flow tags
- 返回:
if flow exists
- run_callbacks(data, callbacks)
Run callbacks for job.
- 参数:
data -- data to send
callbacks -- callbacks
- process_callbacks()
Process unfinished callbacks.
- static get_job_status(job_status, flow_results, flow_parameters)
Get job status by combining flow state and user defined task status.
- 参数:
job_status -- job status
flow_results -- flow results
flow_parameters -- parameters
- 返回:
job status
- class wy_qcos.task_manager.task_scheduler.PrioritySchedulingPolicy(task_manager)
基类:
ABCPriority Scheduling Policy.
- 参数:
task_manager (TaskFlowManager)
- exec_task(deployment, job_info, tags=None)
Execute task.
- 参数:
deployment -- deployment info
job_info -- job info
tags -- prefect flow tags
- 返回:
job uuid
- calculate_priority(job_info)
Calculate priority.
- 参数:
job_info -- job info
- 返回:
job priority