wy_qcos.metrics package
Submodules
wy_qcos.metrics.metrics_collector module
- class wy_qcos.metrics.metrics_collector.JobMetrics
基类:
objectfor job_metrics management.
- class JobMetricsData(total=0, completed=0, failed=0, running=0, queued=0, cancelling=0, cancelled=0, deleted=0, unknown=0)
基类:
objectjob metrics data.
- 参数:
total (int)
completed (int)
failed (int)
running (int)
queued (int)
cancelling (int)
cancelled (int)
deleted (int)
unknown (int)
- total
- completed
- failed
- running
- queued
- cancelling
- cancelled
- deleted
- unknown
- update(data)
Update Prometheus metrics by status.
- 参数:
data (JobMetricsData)
- get_values()
Get current values of all job metrics.
This method returns the internally tracked values, which is faster than collecting from Prometheus metrics.
- 返回:
JobMetricsData object with current metric values
- 返回类型:
- class wy_qcos.metrics.metrics_collector.SystemHealthMetrics
基类:
objectSystem health metrics.
- class SystemHealthMetricsData(heartbeat_timestamp=0, worker_healthy=False, prefect_healthy=False, fastapi_healthy=False, redis_healthy=False)
基类:
objectSystem health metrics data.
- 参数:
heartbeat_timestamp (int)
worker_healthy (bool)
prefect_healthy (bool)
fastapi_healthy (bool)
redis_healthy (bool)
- heartbeat_timestamp
- worker_healthy
- prefect_healthy
- fastapi_healthy
- redis_healthy
- overall_healthy
- update(data)
Update system health metrics.
- 参数:
data (SystemHealthMetricsData) -- SystemHealthMetricsData object with system health status
- get_values()
Get current system health status.
This method returns the internally tracked values, which is faster than collecting from Prometheus metrics.
- 返回:
SystemHealthMetricsData object with current health status
- 返回类型:
- class wy_qcos.metrics.metrics_collector.APIMetrics
基类:
objectAPI metrics.
- class APIMetricsData(module, method, endpoint, status_code, duration)
基类:
objectAPI metrics data.
- 参数:
module (str)
method (str)
endpoint (str)
status_code (int)
duration (float)
- module
- method
- endpoint
- status_code
- duration
- record_api_request(data)
Record an API request.
- 参数:
data (APIMetricsData) -- API request data
{ -- module: Module name method: HTTP method endpoint: API endpoint status_code: HTTP status code duration: Request duration in seconds
}
- increment_api_requests_in_progress()
Increment the counter of in-progress API requests.
- decrement_api_requests_in_progress()
Decrement the counter of in-progress API requests.
- get_api_stats()
Get API statistics for different time windows.
- 返回:
total_requests: Total API requests
last_hour_requests: Requests in the last hour
last_day_requests: Requests in the last day
- 返回类型:
Dictionary containing
- class wy_qcos.metrics.metrics_collector.MetricsCollector
基类:
objectSingleton class for collecting and exposing Prometheus metrics.
- update_job_metrics(data)
Update job-related metrics.
- 参数:
data (JobMetricsData) -- JobMetrics.JobMetricsData
- record_api_request(data)
Record an API request.
- 参数:
data (APIMetricsData) -- APIMetrics.APIMetricsData
{ -- module: Module name method: HTTP method endpoint: API endpoint status_code: HTTP status code duration: Request duration in seconds
}
- record_api_requests_in_progress(is_increment)
Record the counter of in-progress API requests.
- 参数:
is_increment (bool)
- update_system_health(data)
Update system health metrics.
- 参数:
data (SystemHealthMetricsData) -- SystemHealthMetrics.SystemHealthMetricsData
{ -- heartbeat_timestamp: Timestamp of the last heartbeat workers_healthy: Whether all workers are healthy prefect_healthy: Whether Prefect is healthy fastapi_healthy: Whether FastAPI is healthy redis_healthy: Whether Redis is healthy
}
- get_system_health_status()
Get system health status.
- 返回:
System health status
- 返回类型:
- get_metrics()
Generate Prometheus metrics output.
- 返回:
Prometheus metrics in text format
- 返回类型:
bytes
- get_content_type()
Get the content type for Prometheus metrics.
- 返回:
Content type string
- 返回类型:
str
wy_qcos.metrics.metrics_middleware module
- class wy_qcos.metrics.metrics_middleware.MetricsMiddleware(app, dispatch=None)
基类:
BaseHTTPMiddlewareFastAPI middleware for collecting API access metrics.
- 参数:
app (ASGIApp)
dispatch (DispatchFunction | None)
- EXCLUDED_PATHS = {'/favicon.ico', '/health', '/metrics'}
- MODULES = {'device', 'driver', 'job', 'system', 'transpiler'}
- async dispatch(request, call_next)
Process each request and collect metrics.
- 参数:
request (fastapi.Request) -- FastAPI request object
call_next (Callable) -- Next middleware/handler in chain
- 返回:
Response object
- 返回类型:
fastapi.Response
wy_qcos.metrics.metrics_scheduler module
- class wy_qcos.metrics.metrics_scheduler.MetricsScheduler
基类:
objectPeriodic metrics scheduler powered by APScheduler.
APScheduler guarantees serial execution and minimum interval via
triggers="interval"withcoalesce=True, max_instances=1andseconds=self._interval.- async start()
Start the APScheduler and register the metrics job.
- async stop()
Shutdown the APScheduler gracefully.
wy_qcos.metrics.metrics_server module
- class wy_qcos.metrics.metrics_server.PrometheusHandler(request, client_address, server)
基类:
BaseHTTPRequestHandlerHTTP Handler for Prometheus Metrics Server.
- DEFAULT_SERVER_METRICS_PATH = '/metrics'
- HEADER_CONTENT_TYPE = 'Content-Type'
- HEADER_CONTENT_LENGTH = 'Content-Length'
- CONTENT_TYPE_TEXT_UTF8 = 'text/plain; charset=utf-8'
- do_GET()
Handle GET requests.
- log_message(format, *args)
Log messages for the http server.
- 参数:
format -- {str} -- log message format
*args -- {tuple} -- log message arguments
- class wy_qcos.metrics.metrics_server.MetricsServer(ip=None, port=None)
基类:
objectPrometheus Metrics Server.
- async start()
Start the metrics server.
- async stop()
Stop the metrics server gracefully.
wy_qcos.metrics.metrics_task module
- async wy_qcos.metrics.metrics_task.get_redis_client()
Get a singleton Redis client for health checks.
- async wy_qcos.metrics.metrics_task.clear_redis_client()
- async wy_qcos.metrics.metrics_task.call_sync_with_timeout(func, timeout=3.0, *args, **kwargs)
Execute a synchronous function in a thread pool with a timeout.
- 参数:
func (callable) -- The synchronous function to execute.
timeout (float, optional) -- Timeout in seconds. Defaults to 3.0.
*args -- Arguments to pass to the function.
**kwargs -- Keyword arguments to pass to the function.
- 返回:
The result of the function execution.
- 返回类型:
Any
- wy_qcos.metrics.metrics_task.require_sync_client(func)
Decorator for synchronous task functions.
Ensure task_manager and _sync_client exist, and inject sync_client as keyword argument.
- 参数:
func (callable) -- The function to be decorated.
- 返回:
The decorated function.
- 返回类型:
callable
- async wy_qcos.metrics.metrics_task.update_job_metrics()
Update job metrics from task scheduler.
- async wy_qcos.metrics.metrics_task.check_worker_health(*args, **kwargs)
- async wy_qcos.metrics.metrics_task.check_prefect_health(*args, **kwargs)
- async wy_qcos.metrics.metrics_task.check_fastapi_health()
Check FastAPI service health.
FastAPI service is in the same process, default healthy
- async wy_qcos.metrics.metrics_task.check_redis_health()
Check Redis connectivity with short timeout.
- async wy_qcos.metrics.metrics_task.update_system_health_metrics()
Update system health metrics.
Execute all component health checks in parallel and update metrics
- async wy_qcos.metrics.metrics_task.update_metrics_task_async()
Asynchronously update task metrics from task scheduler.