wy_qcos.metrics package

Submodules

wy_qcos.metrics.metrics_collector module

class wy_qcos.metrics.metrics_collector.JobMetrics

基类:object

for job_metrics management.

class JobMetricsData(total=0, completed=0, failed=0, running=0, queued=0, cancelling=0, cancelled=0, deleted=0, unknown=0)

基类:object

job metrics data.

参数:
  • total (int)

  • completed (int)

  • failed (int)

  • running (int)

  • queued (int)

  • cancelling (int)

  • cancelled (int)

  • deleted (int)

  • unknown (int)

total
completed
failed
running
queued
cancelling
cancelled
deleted
unknown
update(data)

Update Prometheus metrics by status.

参数:

data (JobMetricsData)

get_values()

Get current values of all job metrics.

This method returns the internally tracked values, which is faster than collecting from Prometheus metrics.

返回:

JobMetricsData object with current metric values

返回类型:

JobMetricsData

class wy_qcos.metrics.metrics_collector.SystemHealthMetrics

基类:object

System health metrics.

class SystemHealthMetricsData(heartbeat_timestamp=0, worker_healthy=False, prefect_healthy=False, fastapi_healthy=False, redis_healthy=False)

基类:object

System health metrics data.

参数:
  • heartbeat_timestamp (int)

  • worker_healthy (bool)

  • prefect_healthy (bool)

  • fastapi_healthy (bool)

  • redis_healthy (bool)

heartbeat_timestamp
worker_healthy
prefect_healthy
fastapi_healthy
redis_healthy
overall_healthy
update(data)

Update system health metrics.

参数:

data (SystemHealthMetricsData) -- SystemHealthMetricsData object with system health status

get_values()

Get current system health status.

This method returns the internally tracked values, which is faster than collecting from Prometheus metrics.

返回:

SystemHealthMetricsData object with current health status

返回类型:

SystemHealthMetricsData

class wy_qcos.metrics.metrics_collector.APIMetrics

基类:object

API metrics.

class APIMetricsData(module, method, endpoint, status_code, duration)

基类:object

API metrics data.

参数:
  • module (str)

  • method (str)

  • endpoint (str)

  • status_code (int)

  • duration (float)

module
method
endpoint
status_code
duration
record_api_request(data)

Record an API request.

参数:
  • data (APIMetricsData) -- API request data

  • { -- module: Module name method: HTTP method endpoint: API endpoint status_code: HTTP status code duration: Request duration in seconds

  • }

increment_api_requests_in_progress()

Increment the counter of in-progress API requests.

decrement_api_requests_in_progress()

Decrement the counter of in-progress API requests.

get_api_stats()

Get API statistics for different time windows.

返回:

  • total_requests: Total API requests

  • last_hour_requests: Requests in the last hour

  • last_day_requests: Requests in the last day

返回类型:

Dictionary containing

class wy_qcos.metrics.metrics_collector.MetricsCollector

基类:object

Singleton class for collecting and exposing Prometheus metrics.

update_job_metrics(data)

Update job-related metrics.

参数:

data (JobMetricsData) -- JobMetrics.JobMetricsData

record_api_request(data)

Record an API request.

参数:
  • data (APIMetricsData) -- APIMetrics.APIMetricsData

  • { -- module: Module name method: HTTP method endpoint: API endpoint status_code: HTTP status code duration: Request duration in seconds

  • }

record_api_requests_in_progress(is_increment)

Record the counter of in-progress API requests.

参数:

is_increment (bool)

update_system_health(data)

Update system health metrics.

参数:
  • data (SystemHealthMetricsData) -- SystemHealthMetrics.SystemHealthMetricsData

  • { -- heartbeat_timestamp: Timestamp of the last heartbeat workers_healthy: Whether all workers are healthy prefect_healthy: Whether Prefect is healthy fastapi_healthy: Whether FastAPI is healthy redis_healthy: Whether Redis is healthy

  • }

get_system_health_status()

Get system health status.

返回:

System health status

返回类型:

SystemHealthMetricsData

get_metrics()

Generate Prometheus metrics output.

返回:

Prometheus metrics in text format

返回类型:

bytes

get_content_type()

Get the content type for Prometheus metrics.

返回:

Content type string

返回类型:

str

wy_qcos.metrics.metrics_middleware module

class wy_qcos.metrics.metrics_middleware.MetricsMiddleware(app, dispatch=None)

基类:BaseHTTPMiddleware

FastAPI middleware for collecting API access metrics.

参数:
  • app (ASGIApp)

  • dispatch (DispatchFunction | None)

EXCLUDED_PATHS = {'/favicon.ico', '/health', '/metrics'}
MODULES = {'device', 'driver', 'job', 'system', 'transpiler'}
async dispatch(request, call_next)

Process each request and collect metrics.

参数:
  • request (fastapi.Request) -- FastAPI request object

  • call_next (Callable) -- Next middleware/handler in chain

返回:

Response object

返回类型:

fastapi.Response

wy_qcos.metrics.metrics_scheduler module

class wy_qcos.metrics.metrics_scheduler.MetricsScheduler

基类:object

Periodic metrics scheduler powered by APScheduler.

APScheduler guarantees serial execution and minimum interval via triggers="interval" with coalesce=True, max_instances=1 and seconds=self._interval.

async start()

Start the APScheduler and register the metrics job.

async stop()

Shutdown the APScheduler gracefully.

wy_qcos.metrics.metrics_server module

class wy_qcos.metrics.metrics_server.PrometheusHandler(request, client_address, server)

基类:BaseHTTPRequestHandler

HTTP Handler for Prometheus Metrics Server.

DEFAULT_SERVER_METRICS_PATH = '/metrics'
HEADER_CONTENT_TYPE = 'Content-Type'
HEADER_CONTENT_LENGTH = 'Content-Length'
CONTENT_TYPE_TEXT_UTF8 = 'text/plain; charset=utf-8'
do_GET()

Handle GET requests.

log_message(format, *args)

Log messages for the http server.

参数:
  • format -- {str} -- log message format

  • *args -- {tuple} -- log message arguments

class wy_qcos.metrics.metrics_server.MetricsServer(ip=None, port=None)

基类:object

Prometheus Metrics Server.

async start()

Start the metrics server.

async stop()

Stop the metrics server gracefully.

wy_qcos.metrics.metrics_task module

async wy_qcos.metrics.metrics_task.get_redis_client()

Get a singleton Redis client for health checks.

async wy_qcos.metrics.metrics_task.clear_redis_client()
async wy_qcos.metrics.metrics_task.call_sync_with_timeout(func, timeout=3.0, *args, **kwargs)

Execute a synchronous function in a thread pool with a timeout.

参数:
  • func (callable) -- The synchronous function to execute.

  • timeout (float, optional) -- Timeout in seconds. Defaults to 3.0.

  • *args -- Arguments to pass to the function.

  • **kwargs -- Keyword arguments to pass to the function.

返回:

The result of the function execution.

返回类型:

Any

wy_qcos.metrics.metrics_task.require_sync_client(func)

Decorator for synchronous task functions.

Ensure task_manager and _sync_client exist, and inject sync_client as keyword argument.

参数:

func (callable) -- The function to be decorated.

返回:

The decorated function.

返回类型:

callable

async wy_qcos.metrics.metrics_task.update_job_metrics()

Update job metrics from task scheduler.

async wy_qcos.metrics.metrics_task.check_worker_health(*args, **kwargs)
async wy_qcos.metrics.metrics_task.check_prefect_health(*args, **kwargs)
async wy_qcos.metrics.metrics_task.check_fastapi_health()

Check FastAPI service health.

FastAPI service is in the same process, default healthy

async wy_qcos.metrics.metrics_task.check_redis_health()

Check Redis connectivity with short timeout.

async wy_qcos.metrics.metrics_task.update_system_health_metrics()

Update system health metrics.

Execute all component health checks in parallel and update metrics

async wy_qcos.metrics.metrics_task.update_metrics_task_async()

Asynchronously update task metrics from task scheduler.

Module contents