wy_qcos.task_manager package
Submodules
wy_qcos.task_manager.task_manager module
- class wy_qcos.task_manager.task_manager.TaskFlowManager
基类:
objectTask manager based on prefect framework.
- static convert_to_qcos_state(state)
Convert to qcos state.
- 参数:
state -- prefect state
- 返回:
qcos state.
- static convert_to_prefect_states(states)
Convert qcos states to prefect states.
- 参数:
states -- qcos states list
- 返回:
prefect states list.
- generate_deployment_configs(device_names)
Generate deployment configs.
- 参数:
device_names -- device names
- 返回:
deployment configs
- start()
Create work pools, queues and start workers.
- set_driver_manager(driver_manager)
Set driver manager.
- 参数:
driver_manager -- driver manager
- set_device_manager(device_manager)
Set device manager.
- 参数:
device_manager -- device manager
- check_connection()
Check connection to prefect server.
- async create_pools(pool_names)
Create all work pools, each device has own work pools.
- 参数:
pool_names -- pool names
- async create_pool(pool_name, concurrency_limit=None)
Create work pool by prefect client.
- 参数:
pool_name -- work pool name, using device name
concurrency_limit -- concurrency limit
- async create_queues(queue_names)
Create all work queues under work pool.
each priority has own work queue.
- 参数:
queue_names -- queue names
- async create_deployments(deployment_configs)
Create deployment by prefect client.
- 参数:
deployment_configs -- deployment configs
- get_deployment(deployment_name)
Get deployment.
- 参数:
deployment_name -- deployment name
- kill_workers()
Kill workers.
- start_workers()
Start workers using multiprocessing.
- run_device_monitor()
Run device monitor by prefect.
- static get_prefect_configs()
Set prefect configs.
- static start_work(process_name, pool_name, concurrency_limit)
Start worker by prefect client.
- 参数:
process_name -- process name
pool_name -- work pool name
concurrency_limit -- max num of jobs running at the same time
- static start_device_monitor_work(process_name, pool_name, concurrency_limit)
Start device monitor worker by prefect client.
- 参数:
process_name -- process name
pool_name -- work pool name
concurrency_limit -- max num of jobs running at the same time
- static start_device_mgr_work(process_name, pool_name, concurrency_limit)
Start device mgr worker by prefect client.
- 参数:
process_name -- process name
pool_name -- work pool name
concurrency_limit -- max num of jobs running at the same time
- async wait_workers()
Start all workers for work pool.
- run_flow(deployment_id, args, tags=None, work_queue_name=None)
Run flow.
- 参数:
deployment_id -- deploy uuid
args (dict[str, Any]) -- flow function args in dict
tags -- prefect flow tags
work_queue_name -- work queue name
- 返回:
flow run uuid
- run_manage_task_flow(deployment_id, args, work_queue_name=None)
Run flow.
- 参数:
deployment_id -- deploy uuid
args (dict[str, Any]) -- flow function args in dict
tags -- prefect flow tags
work_queue_name -- work queue name
- 返回:
flow run uuid
- get_flow_run_id_by_job_id(job_id, tags=None)
- async run_flow_by_client(deployment_id, args, tags=None, work_queue_name=None)
Run flow by prefect client.
- 参数:
deployment_id -- deploy uuid
args (dict[str, Any]) -- flow function args in dict
tags -- prefect flow tags
work_queue_name -- work queue name
- 返回:
flow run ids
- 返回类型:
flow run ids
- async run_manage_task_flow_by_client(deployment_id, args, work_queue_name=None)
Run flow by prefect client.
- 参数:
deployment_id -- deploy uuid
args (dict[str, Any]) -- flow function args in dict
work_queue_name -- work queue name
- get_flow_result(job_id, tags=None)
Get flow run state and result.
- 参数:
job_id -- job uuid
tags -- prefect flow tags
- 返回:
state, parameters, result, err_msg
- update_flow(flow_run_id, name=None, parameters=None, variables=None)
Update flow.
- 参数:
flow_run_id -- flow run id
name -- flow name (Default value = None)
parameters -- flow parameters (Default value = None)
variables -- flow variables
- 返回:
if flow exists (Default value = None)
- get_flow_result_by_client(flow_run_id)
Get flow run state and result by prefect client.
- 参数:
flow_run_id -- flow run uuid
- 返回:
state_name, parameters, result, state_message
- get_flow_list(tags=None)
Get flow run list.
- 参数:
tags -- prefect flow tags
- 返回:
flow run list
- get_flow_list_by_client(tags=None, sort_fields=['-created'], reverse=False)
Get flow run list by prefect client.
- 参数:
sort_fields -- sort fields (Default value = ['-created'])
reverse -- reverse order
tags -- prefect flow tags
- 返回:
flow run list
- get_flow_run(flow_run_id, tags=None)
Get flow run.
- 参数:
flow_run_id -- flow run id
tags -- flow tags
- 返回:
flow run.
- delete_flow_runs(flow_run_ids)
Delete flow runs by client.
- 参数:
flow_run_ids -- flow run uuid list
- 返回:
success_list.
- cancel_flow_runs(flow_run_ids, tags=None)
Cancel flow runs.
- 参数:
flow_run_ids -- flow run is list
tags -- prefect flow tags
- 返回:
success list.
- delete_task_flow_by_name(flow_name)
Delete flow .
- 参数:
flow_name -- flow name
- 返回:
success flow_id.
- get_flow_runs_with_filters(states=None, tags=None, pool_name=None)
Get flow runs with filters.
- 参数:
states -- flow states
tags -- prefect flow tags
pool_name -- pool name
- 返回:
flow runs.
- async process_aggregation_job()
Process aggregation job.
wy_qcos.task_manager.task_scheduler module
- class wy_qcos.task_manager.task_scheduler.TaskScheduler
基类:
objectTask scheduler.
- start_taskmanager()
Start TaskManager.
- set_driver_manager(driver_manager)
Set driver manager.
- 参数:
driver_manager -- driver manager
- get_task_manager()
Get task manager.
- 返回:
task manager
- get_driver_manager()
Get driver manager.
- 返回:
driver manager
- set_transpiler_manager(transpiler_manager)
Set transpiler manager.
- 参数:
transpiler_manager -- transpiler manager
- get_transpiler_manager()
Get transpiler manager.
- 返回:
transpiler manager
- set_device_manager(device_manager)
Set device manager.
- 参数:
device_manager -- device manager
- set_db_engine(db_engine)
Set database engine.
- 参数:
db_engine -- database engine
- get_device_manager()
Get device manager.
- 返回:
device manager
- submit(job_info, tags=None, extra_job_data_info={})
Submit job to scheduler.
- 参数:
job_info -- job info
tags -- prefect flow tags
extra_job_data_info -- extra job data info
- 返回:
submitted job info, error messages
- submit_manage_job(job_info)
Submit manage job to scheduler.
- 参数:
job_info -- job info
- 返回:
submitted job info, error messages
- delete_flows(flow_run_ids)
Delete flows.
- 参数:
flow_run_ids -- flow run id list
- 返回:
flow list
- cancel_flows(flow_run_ids, tags=None)
Cancel jobs.
- 参数:
flow_run_ids -- flow run id list
tags -- prefect flow tags
- 返回:
flow list
- update_flow(flow_run_id, name=None, parameters=None, variables=None)
Update job.
- 参数:
flow_run_id -- flow run id
name -- job name (Default value = None)
parameters -- job parameters (Default value = None)
variables -- job variables
- 返回:
if flow exists
- process_unfinished_jobs()
Process unfinished jobs in database.
Checks all jobs in database. If job_status is not one of the final states (COMPLETED, CANCELLED, DELETED, FAILED), sets it to FAILED. This handles jobs that may have been interrupted or left in intermediate states during system restart.
- process_callbacks()
Process unfinished callbacks from database.
Reads job records where callbacks are not empty and is_callback_success is False, then executes the callbacks. Updates is_callback_success to True on successful execution.
- class wy_qcos.task_manager.task_scheduler.PrioritySchedulingPolicy(task_manager)
基类:
objectPriority Scheduling Policy.
- 参数:
task_manager (TaskFlowManager)
- exec_task(deployment, job_info, tags=None)
Execute task.
- 参数:
deployment -- deployment info
job_info -- job info
tags -- prefect flow tags
- 返回:
flow run id
- exec_manage_task(deployment, device_mgr_info)
Execute task.
- 参数:
deployment -- deployment info
device_mgr_info -- device_mgr_info
- 返回:
job uuid