wy_qcos.user package
Submodules
wy_qcos.user.permission_manager module
- class wy_qcos.user.permission_manager.PermissionManager(access_control_model_file, access_control_policy_file)
基类:
objectPermission manager using Casbin for access control.
- init_enforcer()
Initialize Casbin Enforcer.
- enforce(sub, obj, act)
Permission enforce.
- 参数:
sub (str) -- sub
obj (str) -- obj
act (str) -- act
- 返回:
True, False
- 返回类型:
policy enforced results
- add_policy(sub, obj, act)
Add permission policy.
- 参数:
sub (str) -- sub
obj (str) -- obj
act (str) -- act
- 返回:
True, False
- 返回类型:
policy added results
- remove_policy(sub, obj=None, act=None)
Remove permission policy.
- 参数:
sub (str) -- sub
obj (str | None) -- obj
act (str | None) -- act
- 返回:
True, False
- 返回类型:
policy removed results
- remove_role(role_name)
Remove permission role.
- 参数:
role_name -- role name
- 返回:
True, False
- 返回类型:
role removed results
- get_for_role(role)
Get all permissions for role.
- 参数:
role (str) -- role
- 返回:
role permissions
- 返回类型:
list
- add_role_for_user(user, role)
Add permission role for user.
- 参数:
user (str) -- user
role (str) -- role
- 返回类型:
bool
- delete_role_for_user(user, role=None)
Delete permission role for user.
- 参数:
user (str) -- user
role (str | None) -- role
- 返回:
True, False
- 返回类型:
role deleted for user results
- reload_policy()
Reload all policies from policy file.
This method reloads the access control policies from the policy file, ensuring that any changes to role permissions are reflected in the system.
- 返回:
True if reload successful, False otherwise
- 返回类型:
bool
- reload_policy_from_db(roles_repo=None)
Reload all policies from database.
This method clears all policies and reloads them from database, useful when role permissions are updated in the database.
- 参数:
roles_repo -- RoleRepository instance to load roles and permissions from
- 返回:
True if reload successful, False otherwise
- 返回类型:
bool
- clear_policy()
Clear all policies from memory.
This method clears all policies from the Casbin enforcer, useful when policies need to be refreshed from database.
- 返回:
True if clear successful, False otherwise
- 返回类型:
bool
wy_qcos.user.project_manager module
- class wy_qcos.user.project_manager.ProjectManager(db_session=None, projects_repo=None)
基类:
objectProject manager for CRUD operations on projects.
- 参数:
db_session (Session | None)
projects_repo (ProjectRepository | None)
- validate_project_name(project_name)
Validate project name.
- 参数:
project_name (str) -- project name
- 抛出:
ValueError -- if project name is invalid
- 返回类型:
None
- validate_description(description)
Validate description.
- 参数:
description (str | None) -- description
- 抛出:
ValueError -- if description is invalid
- 返回类型:
None
- create_project(project_name, description=None, project_id=None)
Create a new project.
- 参数:
project_name (str) -- project name (required)
description (str | None) -- project description (optional)
project_id (str | None) -- project ID (UUID, optional - auto-generated if not provided)
- 返回:
created project
- 抛出:
ValueError -- if project name is invalid or already exists
- update_project(project_id, project_name=None, description=None)
Update a project.
- 参数:
project_id (str) -- project ID (UUID)
project_name (str | None) -- new project name (optional)
description (str | None) -- new project description (optional)
- 返回:
updated project
- 抛出:
ValueError -- if project not found or validation fails
- get_project(project_name=None)
Get project by name.
- 参数:
project_name (str | None) -- project name
- 返回:
project object or None if not found
- get_project_by_id(project_id)
Get project by ID.
- 参数:
project_id (str) -- project ID (UUID)
- 返回:
project object or None if not found
- get_projects(filters=None)
Get all projects with optional filtering.
- 参数:
filters (dict | None) -- Dictionary with filter conditions (e.g., {'name': 'admin'})
- 返回:
projects keyed by project_id, optionally filtered
- 返回类型:
dict[str, object]
- delete_project(project_id)
Delete a project.
- 参数:
project_id (str) -- project ID (UUID)
- 返回:
deleted project
- 抛出:
ValueError -- if project not found or is a reserved project
- project_exists(project_id)
Check if a project exists.
- 参数:
project_id (str) -- project ID (UUID)
- 返回:
True if project exists, False otherwise
- 返回类型:
bool
- find_projects_by_name(pattern)
Find projects by name pattern.
- 参数:
pattern (str) -- name pattern to search for (case-insensitive)
- 返回:
list of project names matching the pattern
- 返回类型:
list[str]
wy_qcos.user.security_manager module
- class wy_qcos.user.security_manager.SecurityManager(user_manager)
基类:
objectEnhanced security manager with advanced authentication features.
- 参数:
user_manager (UserManager)
- static verify_password(plain_password, hashed_password)
Verify password against hash.
- 参数:
plain_password (str) -- plain text password
hashed_password (str) -- hashed password
- 返回:
True if password matches
- 返回类型:
bool
- static get_password_hash(password)
Hash password.
- 参数:
password (str) -- plain text password
- 返回:
hashed password
- 返回类型:
str
- check_account_lockout(user_name)
Check if account is locked due to failed login attempts.
- 参数:
user_name (str) -- user name
- 返回:
True if account is locked
- 返回类型:
bool
- record_failed_attempt(user_name)
Record a failed login attempt.
- 参数:
user_name (str) -- user name
- 返回类型:
None
- record_successful_login(user_name, ip_address, user_agent)
Record a successful login.
- 参数:
user_name (str) -- user name
ip_address (str) -- IP address
user_agent (str) -- user agent string
- 返回类型:
None
- create_access_token(data, expires_delta=None)
Create JWT access token.
- 参数:
data (dict) -- data to encode in token
expires_delta (timedelta | None) -- token expiration time
- 返回:
JWT token string
- 返回类型:
str
- create_refresh_token(data)
Create JWT refresh token.
- 参数:
data (dict) -- data to encode in token
- 返回:
JWT refresh token string
- 返回类型:
str
- verify_token(token)
Verify JWT token.
- 参数:
token (str) -- JWT token string
- 返回:
decoded token data
- 抛出:
HTTPException -- if token is invalid
- 返回类型:
dict
- authenticate_user(user_name, password, ip_address, user_agent)
Authenticate user with enhanced security.
- 参数:
user_name (str) -- user name
password (str) -- password
ip_address (str) -- IP address
user_agent (str) -- user agent string
- 返回:
authenticated user
- 抛出:
HTTPException -- if authentication fails
- 返回类型:
- get_current_user(credentials=Depends(security))
Get current authenticated user.
- 参数:
credentials (HTTPAuthorizationCredentials) -- HTTP authorization credentials
- 返回:
current user
- 抛出:
HTTPException -- if authentication fails
- 返回类型:
- get_current_active_user(current_user=Depends(get_current_user))
Get current active user.
- check_permissions(user, resource, action='call')
Check if user has permission for resource.
- 参数:
user (User) -- user
resource (str) -- resource to check
action (str) -- action to check (default: "call")
- 返回:
True if user has permission
- 返回类型:
bool
- wy_qcos.user.security_manager.get_security_manager(request)
Get security manager from request state.
- 参数:
request -- FastAPI request
- 返回:
security manager
- 返回类型:
wy_qcos.user.user_manager module
- class wy_qcos.user.user_manager.UserManager(access_control_model_file, access_control_policy_file, all_api, db_session=None)
基类:
objectUser manager.
- 参数:
access_control_model_file (str)
access_control_policy_file (str)
db_session (Session)
- get_permissions_list(policies)
Get permissions list.
- 参数:
policies -- permission policies
- 返回:
permission list
- init_db()
Init database (idempotent - safe to run multiple times).
Creates default projects, roles, and admin user.
- load_role_permissions()
Load all role permissions from database.
This method reads all roles and their permissions from the database and adds them to the casbin-based permission manager.
- perms_enforce(sub, obj, act)
Permission enforce.
- 参数:
sub (str) -- sub
obj (str) -- obj
act (str) -- act
- 返回:
True, False
- 返回类型:
policy enforced results
- perms_add_policy(sub, obj, act)
Add permission policy.
- 参数:
sub (str) -- sub
obj (str) -- obj
act (str) -- act
- 返回:
True, False
- 返回类型:
policy added results
- perms_remove_policy(sub, obj=None, act=None)
Remove permission policy.
- 参数:
sub (str) -- sub
obj (str | None) -- obj
act (str | None) -- act
- 返回:
True, False
- 返回类型:
policy removed results
- perms_remove_role(role_name)
Remove permission role.
- 参数:
role_name -- role name
- 返回:
True, False
- 返回类型:
role removed results
- perms_get_for_role(role)
Get all permissions for role.
- 参数:
role (str) -- role
- 返回:
role permissions
- 返回类型:
list
- perms_add_role_for_user(user, role)
Add permission role for user.
- 参数:
user (str) -- user
role (str) -- role
- 返回类型:
bool
- perms_delete_role_for_user(user, role=None)
Delete permission role for user.
- 参数:
user (str) -- user
role (str | None) -- role
- 返回:
True, False
- 返回类型:
role deleted for user results
- reload_role_permissions_from_db()
Reload all role permissions from database to permission system.
This method clears all policies and reloads them from the database, ensuring that any role permission changes are reflected in Casbin.
- 返回:
True if reload successful, False otherwise
- 返回类型:
bool
- fetch_default_policies(role=None)
Fetch default policies based on role.
- 参数:
role -- role
- 返回:
default policies
- get_default_policies(role=None, simple=False)
Get default policies based on role.
- 参数:
role -- role
simple -- return simple list
- 返回:
default policies
- validate_user_name(user_name)
Validate user name.
- 参数:
user_name (str) -- user name
- 抛出:
ValueError -- if user name is invalid
- 返回类型:
None
- validate_password(password)
Validate password.
- 参数:
password (str) -- password
- 抛出:
ValueError -- if password is invalid
- 返回类型:
None
- validate_role_name(role_name)
Validate role name.
- 参数:
role_name (str) -- role name
- 抛出:
ValueError -- if role name is invalid
- 返回类型:
None
- validate_description(description)
Validate description.
- 参数:
description (str | None) -- description
- 抛出:
ValueError -- if description is invalid
- 返回类型:
None
- validate_roles(roles)
Validate roles.
- 参数:
roles (list[str]) -- list of roles
- 抛出:
ValueError -- if roles are invalid
- 返回类型:
None
- validate_permissions(permissions)
Validate permissions.
- 参数:
permissions (list[str]) -- list of permissions
- 抛出:
ValueError -- if permissions are invalid
- 返回类型:
None
- create_role(role_name, permissions, description=None)
Create a new role.
- 参数:
role_name (str) -- role name
permissions (list[str] | None) -- permissions
description (str | None) -- description
- 返回:
created role
- 抛出:
ValueError -- if role name is invalid or already exists
- 返回类型:
- update_role(role_id, permissions=None, description=None, role_name=None)
Update role.
- 参数:
role_id (str) -- role ID (primary identifier)
permissions (list[str] | None) -- permissions
description (str | None) -- description
role_name (str | None) -- role name
- 返回:
updated role
- 抛出:
ValueError -- if role not found or validation fails
- 返回类型:
- get_role(role_name)
Get role by name.
- 参数:
role_name (str) -- name of the role
- 返回:
role
- 返回类型:
Role | None
- get_role_by_id(role_id)
Get role by ID.
- 参数:
role_id (str) -- role ID (UUID or role name)
- 返回:
role
- 返回类型:
Role | None
- get_roles(filters=None)
Get roles keyed by role ID with optional filtering.
- 参数:
filters (dict | None) -- Dictionary with filter conditions (e.g., {'role_name': 'admin'})
- 返回:
roles keyed by role_id, filtered by criteria
- 返回类型:
dict[str, Role]
- delete_role(role_id)
Delete role by ID.
- 参数:
role_id (str) -- role ID (UUID)
- 返回:
deleted role
- 抛出:
ValueError -- if role not found, is admin role, or is used by users
- 返回类型:
- create_user(project_id, user_name, password, roles, is_enabled, is_locked, password_expiry_days, description=None, user_id=None)
Create user.
- 参数:
project_id (str) -- project id (required, defaults to DEFAULT_PROJECT_ID)
user_name (str) -- user name
password (str) -- password
roles (list[str]) -- roles
is_enabled (bool) -- is enabled
is_locked (bool) -- is locked
password_expiry_days (int) -- password expiry days
description (str | None) -- description
user_id (str | None) -- user id
- 返回:
created user
- update_user(user_id, roles=None, is_enabled=None, is_locked=None, password_expiry_days=None, description=None, user_name=None)
Update user.
- 参数:
user_id (str) -- user ID (primary identifier)
roles (list[str] | None) -- roles
is_enabled (bool | None) -- is enabled
is_locked (bool | None) -- is locked
password_expiry_days (int | None) -- password expiry days
description (str | None) -- description
user_name (str | None) -- user name (for compatibility, optional)
- 返回:
updated user
- change_password(user_id, old_password=None, new_password=None)
Change user password.
- 参数:
user_id (str) -- user ID (UUID)
old_password (str | None) -- current password (required for non-admin users)
new_password (str | None) -- new password
- 返回:
updated user
- 抛出:
ValueError -- if user not found, password validation fails, etc.
- get_user(user_name=None)
Get user by name.
- 参数:
user_name (str | None) -- name of the user
- 返回:
user
- 返回类型:
User | None
- get_user_by_id(user_id)
Get user by ID.
- 参数:
user_id (str) -- user ID (UUID or user name)
- 返回:
user
- 返回类型:
User | None
- get_user_by_name(user_name)
Get user by name.
- 参数:
user_name (str) -- name of the user
- 返回:
user
- 返回类型:
User | None
- get_users(filters=None)
Get users with optional filtering.
- 参数:
filters (dict | None) -- Dictionary with filter conditions (e.g., {'user_name': 'admin', 'is_enabled': True})
- 返回:
users keyed by user_id, filtered by criteria
- 返回类型:
dict[str, User]
- delete_user(user_id, force=False)
Delete user by user_id.
- 参数:
user_id (str) -- user ID (primary identifier)
force (bool) -- force delete (cascade delete related jobs if True)
- 返回:
deleted user
- 抛出:
ValueError -- if user not found, user is admin, or has related jobs (when force=False)
- 返回类型:
- find_users_by_role(role_name)
Find users by role name.
- 参数:
role_name (str) -- role name
- 返回:
list of usernames
- 返回类型:
list[str]
- log_login_attempt(user_name, ip_address, success, failure_reason=None, user_agent=None)
Log login attempt.
- 参数:
user_name (str) -- user name
ip_address (str) -- ip address
success (bool) -- whether login was successful
failure_reason (str | None) -- reason for failure if not successful
user_agent (str | None) -- user agent
- get_login_logs(user_id=None, user_name=None, start_time=None, end_time=None, limit=100, offset=0)
Get login logs with optional filtering.
- 参数:
user_id (str | None) -- Filter logs by user_id (UUID, optional)
user_name (str | None) -- Filter logs by user_name (optional)
start_time (datetime | None) -- Filter logs after this time (optional)
end_time (datetime | None) -- Filter logs before this time (optional)
limit (int) -- Maximum number of logs to return. Use -1 to get all logs without limit (default: 100)
offset (int) -- Number of logs to skip (default: 0)
- 返回:
List of login logs with response format
- 抛出:
ValueError -- if both user_id and user_name are provided, or if user is not found
- 返回类型:
list
- clear_login_logs(user_id=None, user_name=None)
Clear login logs (all or for a specific user).
- 参数:
user_id (str | None) -- User ID (UUID) to clear logs for (optional)
user_name (str | None) -- User name to clear logs for (optional)
- 返回:
Dictionary with count of deleted logs
- 抛出:
ValueError -- if both user_id and user_name are provided
- 返回类型:
dict
- add_to_blacklist(token_jti, expires_at)
Add a token to the blacklist.
- 参数:
token_jti (str) -- Unique identifier of the token (JWT 'jti' claim)
expires_at (datetime) -- When the token would have expired
- 返回类型:
None
- is_blacklisted(token_jti)
Check if a token is blacklisted.
- 参数:
token_jti (str) -- Unique identifier of the token (JWT 'jti' claim)
- 返回:
True if token is blacklisted, False otherwise
- 返回类型:
bool
- static is_password_expired(user)
Check if password has expired.
- 参数:
user (User) -- user
- 返回:
is_password_expired
- 返回类型:
bool
- static hash_password(password)
Hash password.
- 参数:
password (str) -- password
- 返回:
hashed password
- 返回类型:
str
- static check_password(password, password_hash)
Check if password matches hash.
- 参数:
password (str) -- password
password_hash (str) -- hashed password
- 返回:
passwords are matched
- 返回类型:
bool
- auto_unlock_user(user_id)
Auto-unlock user after lockout period expires.
- 参数:
user_id (str) -- user ID (UUID)
- 返回:
True if successful, False otherwise
- 返回类型:
bool
- increment_failed_login_attempts(user_id)
Increment failed login attempts for user.
- 参数:
user_id (str) -- user ID (UUID)
- 返回:
True if successful, False otherwise
- 返回类型:
bool
- lock_user(user_id, locked_until)
Lock user account until specified time.
- 参数:
user_id (str) -- user ID (UUID)
locked_until (datetime) -- datetime until which user is locked
- 返回:
True if successful, False otherwise
- 返回类型:
bool
- update_successful_login(user_id)
Update user after successful login.
Resets failed login attempts, updates last login time, and unlocks the account if needed.
- 参数:
user_id (str) -- user ID (UUID)
- 返回:
True if successful, False otherwise
- 返回类型:
bool