wy_qcos.user package
Submodules
wy_qcos.user.permission_manager module
- class wy_qcos.user.permission_manager.PermissionManager(access_control_model_file, access_control_policy_file)
基类:
objectPermission manager using Casbin for access control.
- init_enforcer()
Initialize Casbin Enforcer.
- enforce(sub, obj, act)
Permission enforce.
- 参数:
sub (str) -- sub
obj (str) -- obj
act (str) -- act
- 返回:
True, False
- 返回类型:
policy enforced results
- add_policy(sub, obj, act)
Add permission policy.
- 参数:
sub (str) -- sub
obj (str) -- obj
act (str) -- act
- 返回:
True, False
- 返回类型:
policy added results
- remove_policy(sub, obj=None, act=None)
Remove permission policy.
- 参数:
sub (str) -- sub
obj (str | None) -- obj
act (str | None) -- act
- 返回:
True, False
- 返回类型:
policy removed results
- remove_role(role_name)
Remove permission role.
- 参数:
role_name -- role name
- 返回:
True, False
- 返回类型:
role removed results
- get_for_role(role)
Get all permissions for role.
- 参数:
role (str) -- role
- 返回:
role permissions
- 返回类型:
list
- add_role_for_user(user, role)
Add permission role for user.
- 参数:
user (str) -- user
role (str) -- role
- 返回类型:
bool
- delete_role_for_user(user, role=None)
Delete permission role for user.
- 参数:
user (str) -- user
role (str | None) -- role
- 返回:
True, False
- 返回类型:
role deleted for user results
- reload_policy()
Reload all policies from policy file.
This method reloads the access control policies from the policy file, ensuring that any changes to role permissions are reflected in the system.
- 返回:
True if reload successful, False otherwise
- 返回类型:
bool
- reload_policy_from_db(roles_repo=None)
Reload all policies from database.
This method clears all policies and reloads them from database, useful when role permissions are updated in the database.
- 参数:
roles_repo -- RoleRepository instance to load roles and permissions from
- 返回:
True if reload successful, False otherwise
- 返回类型:
bool
- clear_policy()
Clear all policies from memory.
This method clears all policies from the Casbin enforcer, useful when policies need to be refreshed from database.
- 返回:
True if clear successful, False otherwise
- 返回类型:
bool
wy_qcos.user.security_manager module
- class wy_qcos.user.security_manager.SecurityManager(user_manager)
基类:
objectEnhanced security manager with advanced authentication features.
- 参数:
user_manager (UserManager)
- static verify_password(plain_password, hashed_password)
Verify password against hash.
- 参数:
plain_password (str) -- plain text password
hashed_password (str) -- hashed password
- 返回:
True if password matches
- 返回类型:
bool
- static get_password_hash(password)
Hash password.
- 参数:
password (str) -- plain text password
- 返回:
hashed password
- 返回类型:
str
- check_account_lockout(user_name)
Check if account is locked due to failed login attempts.
- 参数:
user_name (str) -- user name
- 返回:
True if account is locked
- 返回类型:
bool
- record_failed_attempt(user_name)
Record a failed login attempt.
- 参数:
user_name (str) -- user name
- 返回类型:
None
- record_successful_login(user_name, ip_address, user_agent)
Record a successful login.
- 参数:
user_name (str) -- user name
ip_address (str) -- IP address
user_agent (str) -- user agent string
- 返回类型:
None
- create_access_token(data, expires_delta=None)
Create JWT access token.
- 参数:
data (dict) -- data to encode in token
expires_delta (timedelta | None) -- token expiration time
- 返回:
JWT token string
- 返回类型:
str
- create_refresh_token(data)
Create JWT refresh token.
- 参数:
data (dict) -- data to encode in token
- 返回:
JWT refresh token string
- 返回类型:
str
- verify_token(token)
Verify JWT token.
- 参数:
token (str) -- JWT token string
- 返回:
decoded token data
- 抛出:
HTTPException -- if token is invalid
- 返回类型:
dict
- authenticate_user(user_name, password, ip_address, user_agent)
Authenticate user with enhanced security.
- 参数:
user_name (str) -- user name
password (str) -- password
ip_address (str) -- IP address
user_agent (str) -- user agent string
- 返回:
authenticated user
- 抛出:
HTTPException -- if authentication fails
- 返回类型:
- get_current_user(credentials=Depends(security))
Get current authenticated user.
- 参数:
credentials (HTTPAuthorizationCredentials) -- HTTP authorization credentials
- 返回:
current user
- 抛出:
HTTPException -- if authentication fails
- 返回类型:
- get_current_active_user(current_user=Depends(get_current_user))
Get current active user.
- check_permissions(user, resource, action='call')
Check if user has permission for resource.
- 参数:
user (User) -- user
resource (str) -- resource to check
action (str) -- action to check (default: "call")
- 返回:
True if user has permission
- 返回类型:
bool
- wy_qcos.user.security_manager.get_security_manager(request)
Get security manager from request state.
- 参数:
request -- FastAPI request
- 返回:
security manager
- 返回类型:
wy_qcos.user.user_manager module
- class wy_qcos.user.user_manager.UserManager(access_control_model_file, access_control_policy_file, all_api, db_session=None)
基类:
objectUser manager.
- 参数:
access_control_model_file (str)
access_control_policy_file (str)
db_session (Session)
- get_permissions_list(policies)
Get permissions list.
- 参数:
policies -- permission policies
- 返回:
permission list
- init_db()
Init database (idempotent - safe to run multiple times).
Creates default projects, roles, and admin user.
- load_role_permissions()
Load all role permissions from database.
This method reads all roles and their permissions from the database and adds them to the casbin-based permission manager.
- perms_enforce(sub, obj, act)
Permission enforce.
- 参数:
sub (str) -- sub
obj (str) -- obj
act (str) -- act
- 返回:
True, False
- 返回类型:
policy enforced results
- perms_add_policy(sub, obj, act)
Add permission policy.
- 参数:
sub (str) -- sub
obj (str) -- obj
act (str) -- act
- 返回:
True, False
- 返回类型:
policy added results
- perms_remove_policy(sub, obj=None, act=None)
Remove permission policy.
- 参数:
sub (str) -- sub
obj (str | None) -- obj
act (str | None) -- act
- 返回:
True, False
- 返回类型:
policy removed results
- perms_remove_role(role_name)
Remove permission role.
- 参数:
role_name -- role name
- 返回:
True, False
- 返回类型:
role removed results
- perms_get_for_role(role)
Get all permissions for role.
- 参数:
role (str) -- role
- 返回:
role permissions
- 返回类型:
list
- perms_add_role_for_user(user, role)
Add permission role for user.
- 参数:
user (str) -- user
role (str) -- role
- 返回类型:
bool
- perms_delete_role_for_user(user, role=None)
Delete permission role for user.
- 参数:
user (str) -- user
role (str | None) -- role
- 返回:
True, False
- 返回类型:
role deleted for user results
- reload_role_permissions_from_db()
Reload all role permissions from database to permission system.
This method clears all policies and reloads them from the database, ensuring that any role permission changes are reflected in Casbin.
- 返回:
True if reload successful, False otherwise
- 返回类型:
bool
- fetch_default_policies(role=None)
Fetch default policies based on role.
- 参数:
role -- role
- 返回:
default policies
- get_default_policies(role=None, simple=False)
Get default policies based on role.
- 参数:
role -- role
simple -- return simple list
- 返回:
default policies
- validate_user_name(user_name)
Validate user name.
- 参数:
user_name (str) -- user name
- 抛出:
ValueError -- if user name is invalid
- 返回类型:
None
- validate_password(password)
Validate password.
- 参数:
password (str) -- password
- 抛出:
ValueError -- if password is invalid
- 返回类型:
None
- validate_role_name(role_name)
Validate role name.
- 参数:
role_name (str) -- role name
- 抛出:
ValueError -- if role name is invalid
- 返回类型:
None
- validate_description(description)
Validate description.
- 参数:
description (str | None) -- description
- 抛出:
ValueError -- if description is invalid
- 返回类型:
None
- validate_roles(roles)
Validate roles.
- 参数:
roles (list[str]) -- list of roles
- 抛出:
ValueError -- if roles are invalid
- 返回类型:
None
- validate_permissions(permissions)
Validate permissions.
- 参数:
permissions (list[str]) -- list of permissions
- 抛出:
ValueError -- if permissions are invalid
- 返回类型:
None
- create_role(role_name, permissions, description=None)
Add role.
- 参数:
role_name (str) -- role name
permissions (list[str] | None) -- permissions
description (str | None) -- description
- 返回:
created role
- 返回类型:
- update_role(role_name, permissions, description=None)
Update role.
- 参数:
role_name (str) -- role name
permissions (list[str] | None) -- permissions
description (str | None) -- description
- 返回:
updated role
- 返回类型:
- get_role(role_name)
Get role by name.
- 参数:
role_name (str) -- name of the role
- 返回:
role
- 返回类型:
Role | None
- get_roles()
Get roles keyed by role_name.
- 返回:
roles keyed by role_name
- 返回类型:
dict[str, Role]
- delete_role(role_name)
Delete role.
- 参数:
role_name (str) -- role name
- 返回类型:
- create_user(project_id, user_name, password, roles, is_enabled, is_locked, password_expiry_days, description=None)
Create user.
- 参数:
project_id (str) -- project id (required, defaults to DEFAULT_PROJECT_ID)
user_name (str) -- user name
password (str) -- password
roles (list[str]) -- roles
is_enabled (bool) -- is enabled
is_locked (bool) -- is locked
password_expiry_days (int) -- password expiry days
description (str | None) -- description
- 返回:
created user
- update_user(user_name, roles=None, is_enabled=None, is_locked=None, password_expiry_days=None, description=None)
Update user.
- 参数:
user_name (str) -- user name
roles (list[str] | None) -- roles
is_enabled (bool | None) -- is enabled
is_locked (bool | None) -- is locked
password_expiry_days (int | None) -- password expiry days
description (str | None) -- description
- 返回:
updated user
- get_user(user_name=None)
Get user by name.
- 参数:
user_name (str | None) -- name of the user
- 返回:
user
- 返回类型:
User | None
- get_user_by_id(user_id)
Get user by ID.
- 参数:
user_id (str) -- user ID
- 返回:
user
- 返回类型:
User | None
- get_user_by_name(user_name)
Get user by name.
- 参数:
user_name (str) -- name of the user
- 返回:
user
- 返回类型:
User | None
- get_users()
Get users.
- 返回:
users
- 返回类型:
dict[str, User]
- delete_user(user_name)
Delete user.
- 参数:
user_name (str) -- user name
- 返回类型:
- find_users_by_role(role_name)
Find users by role name.
- 参数:
role_name (str) -- role name
- 返回:
list of usernames
- 返回类型:
list[str]
- log_login_attempt(user_name, ip_address, success, failure_reason=None, user_agent=None)
Log login attempt.
- 参数:
user_name (str) -- user name
ip_address (str) -- ip address
success (bool) -- whether login was successful
failure_reason (str | None) -- reason for failure if not successful
user_agent (str | None) -- user agent
- get_login_logs(user_name=None, limit=100)
Get login logs with optional user_name filter.
- 参数:
user_name (str | None) -- Filter logs by username (optional)
limit (int) -- Maximum number of logs to return
- 返回:
login logs
- clear_login_logs(user_id=None, user_name=None)
Clear login logs (all or for a specific user).
- 参数:
user_id (str | None) -- User ID (UUID) to clear logs for (optional)
user_name (str | None) -- User name to clear logs for (optional)
- 返回:
Dictionary with count of deleted logs
- 返回类型:
dict
- add_to_blacklist(token_jti, expires_at)
Add a token to the blacklist.
- 参数:
token_jti (str) -- Unique identifier of the token (JWT 'jti' claim)
expires_at (datetime) -- When the token would have expired
- 返回类型:
None
- is_blacklisted(token_jti)
Check if a token is blacklisted.
- 参数:
token_jti (str) -- Unique identifier of the token (JWT 'jti' claim)
- 返回:
True if token is blacklisted, False otherwise
- 返回类型:
bool
- static is_password_expired(user)
Check if password has expired.
- 参数:
user (User) -- user
- 返回:
is_password_expired
- 返回类型:
bool
- static hash_password(password)
Hash password.
- 参数:
password (str) -- password
- 返回:
hashed password
- 返回类型:
str
- static check_password(password, password_hash)
Check if password matches hash.
- 参数:
password (str) -- password
password_hash (str) -- hashed password
- 返回:
passwords are matched
- 返回类型:
bool