wy_qcos.user package

Submodules

wy_qcos.user.permission_manager module

class wy_qcos.user.permission_manager.PermissionManager(access_control_model_file, access_control_policy_file)

基类:object

Permission manager using Casbin for access control.

init_enforcer()

Initialize Casbin Enforcer.

enforce(sub, obj, act)

Permission enforce.

参数:
  • sub (str) -- sub

  • obj (str) -- obj

  • act (str) -- act

返回:

True, False

返回类型:

policy enforced results

add_policy(sub, obj, act)

Add permission policy.

参数:
  • sub (str) -- sub

  • obj (str) -- obj

  • act (str) -- act

返回:

True, False

返回类型:

policy added results

remove_policy(sub, obj=None, act=None)

Remove permission policy.

参数:
  • sub (str) -- sub

  • obj (str | None) -- obj

  • act (str | None) -- act

返回:

True, False

返回类型:

policy removed results

remove_role(role_name)

Remove permission role.

参数:

role_name -- role name

返回:

True, False

返回类型:

role removed results

get_for_role(role)

Get all permissions for role.

参数:

role (str) -- role

返回:

role permissions

返回类型:

list

add_role_for_user(user, role)

Add permission role for user.

参数:
  • user (str) -- user

  • role (str) -- role

返回类型:

bool

delete_role_for_user(user, role=None)

Delete permission role for user.

参数:
  • user (str) -- user

  • role (str | None) -- role

返回:

True, False

返回类型:

role deleted for user results

reload_policy()

Reload all policies from policy file.

This method reloads the access control policies from the policy file, ensuring that any changes to role permissions are reflected in the system.

返回:

True if reload successful, False otherwise

返回类型:

bool

reload_policy_from_db(roles_repo=None)

Reload all policies from database.

This method clears all policies and reloads them from database, useful when role permissions are updated in the database.

参数:

roles_repo -- RoleRepository instance to load roles and permissions from

返回:

True if reload successful, False otherwise

返回类型:

bool

clear_policy()

Clear all policies from memory.

This method clears all policies from the Casbin enforcer, useful when policies need to be refreshed from database.

返回:

True if clear successful, False otherwise

返回类型:

bool

wy_qcos.user.security_manager module

class wy_qcos.user.security_manager.SecurityManager(user_manager)

基类:object

Enhanced security manager with advanced authentication features.

参数:

user_manager (UserManager)

static verify_password(plain_password, hashed_password)

Verify password against hash.

参数:
  • plain_password (str) -- plain text password

  • hashed_password (str) -- hashed password

返回:

True if password matches

返回类型:

bool

static get_password_hash(password)

Hash password.

参数:

password (str) -- plain text password

返回:

hashed password

返回类型:

str

check_account_lockout(user_name)

Check if account is locked due to failed login attempts.

参数:

user_name (str) -- user name

返回:

True if account is locked

返回类型:

bool

record_failed_attempt(user_name)

Record a failed login attempt.

参数:

user_name (str) -- user name

返回类型:

None

record_successful_login(user_name, ip_address, user_agent)

Record a successful login.

参数:
  • user_name (str) -- user name

  • ip_address (str) -- IP address

  • user_agent (str) -- user agent string

返回类型:

None

create_access_token(data, expires_delta=None)

Create JWT access token.

参数:
  • data (dict) -- data to encode in token

  • expires_delta (timedelta | None) -- token expiration time

返回:

JWT token string

返回类型:

str

create_refresh_token(data)

Create JWT refresh token.

参数:

data (dict) -- data to encode in token

返回:

JWT refresh token string

返回类型:

str

verify_token(token)

Verify JWT token.

参数:

token (str) -- JWT token string

返回:

decoded token data

抛出:

HTTPException -- if token is invalid

返回类型:

dict

authenticate_user(user_name, password, ip_address, user_agent)

Authenticate user with enhanced security.

参数:
  • user_name (str) -- user name

  • password (str) -- password

  • ip_address (str) -- IP address

  • user_agent (str) -- user agent string

返回:

authenticated user

抛出:

HTTPException -- if authentication fails

返回类型:

User

get_current_user(credentials=Depends(security))

Get current authenticated user.

参数:

credentials (HTTPAuthorizationCredentials) -- HTTP authorization credentials

返回:

current user

抛出:

HTTPException -- if authentication fails

返回类型:

User

get_current_active_user(current_user=Depends(get_current_user))

Get current active user.

参数:

current_user (User) -- current user

返回:

active user

抛出:

HTTPException -- if user is inactive

返回类型:

User

check_permissions(user, resource, action='call')

Check if user has permission for resource.

参数:
  • user (User) -- user

  • resource (str) -- resource to check

  • action (str) -- action to check (default: "call")

返回:

True if user has permission

返回类型:

bool

wy_qcos.user.security_manager.get_security_manager(request)

Get security manager from request state.

参数:

request -- FastAPI request

返回:

security manager

返回类型:

SecurityManager

wy_qcos.user.user_manager module

class wy_qcos.user.user_manager.UserManager(access_control_model_file, access_control_policy_file, all_api, db_session=None)

基类:object

User manager.

参数:
  • access_control_model_file (str)

  • access_control_policy_file (str)

  • db_session (Session)

get_permissions_list(policies)

Get permissions list.

参数:

policies -- permission policies

返回:

permission list

init_db()

Init database (idempotent - safe to run multiple times).

Creates default projects, roles, and admin user.

load_role_permissions()

Load all role permissions from database.

This method reads all roles and their permissions from the database and adds them to the casbin-based permission manager.

perms_enforce(sub, obj, act)

Permission enforce.

参数:
  • sub (str) -- sub

  • obj (str) -- obj

  • act (str) -- act

返回:

True, False

返回类型:

policy enforced results

perms_add_policy(sub, obj, act)

Add permission policy.

参数:
  • sub (str) -- sub

  • obj (str) -- obj

  • act (str) -- act

返回:

True, False

返回类型:

policy added results

perms_remove_policy(sub, obj=None, act=None)

Remove permission policy.

参数:
  • sub (str) -- sub

  • obj (str | None) -- obj

  • act (str | None) -- act

返回:

True, False

返回类型:

policy removed results

perms_remove_role(role_name)

Remove permission role.

参数:

role_name -- role name

返回:

True, False

返回类型:

role removed results

perms_get_for_role(role)

Get all permissions for role.

参数:

role (str) -- role

返回:

role permissions

返回类型:

list

perms_add_role_for_user(user, role)

Add permission role for user.

参数:
  • user (str) -- user

  • role (str) -- role

返回类型:

bool

perms_delete_role_for_user(user, role=None)

Delete permission role for user.

参数:
  • user (str) -- user

  • role (str | None) -- role

返回:

True, False

返回类型:

role deleted for user results

reload_role_permissions_from_db()

Reload all role permissions from database to permission system.

This method clears all policies and reloads them from the database, ensuring that any role permission changes are reflected in Casbin.

返回:

True if reload successful, False otherwise

返回类型:

bool

fetch_default_policies(role=None)

Fetch default policies based on role.

参数:

role -- role

返回:

default policies

get_default_policies(role=None, simple=False)

Get default policies based on role.

参数:
  • role -- role

  • simple -- return simple list

返回:

default policies

validate_user_name(user_name)

Validate user name.

参数:

user_name (str) -- user name

抛出:

ValueError -- if user name is invalid

返回类型:

None

validate_password(password)

Validate password.

参数:

password (str) -- password

抛出:

ValueError -- if password is invalid

返回类型:

None

validate_role_name(role_name)

Validate role name.

参数:

role_name (str) -- role name

抛出:

ValueError -- if role name is invalid

返回类型:

None

validate_description(description)

Validate description.

参数:

description (str | None) -- description

抛出:

ValueError -- if description is invalid

返回类型:

None

validate_roles(roles)

Validate roles.

参数:

roles (list[str]) -- list of roles

抛出:

ValueError -- if roles are invalid

返回类型:

None

validate_permissions(permissions)

Validate permissions.

参数:

permissions (list[str]) -- list of permissions

抛出:

ValueError -- if permissions are invalid

返回类型:

None

create_role(role_name, permissions, description=None)

Add role.

参数:
  • role_name (str) -- role name

  • permissions (list[str] | None) -- permissions

  • description (str | None) -- description

返回:

created role

返回类型:

Role

update_role(role_name, permissions, description=None)

Update role.

参数:
  • role_name (str) -- role name

  • permissions (list[str] | None) -- permissions

  • description (str | None) -- description

返回:

updated role

返回类型:

Role

get_role(role_name)

Get role by name.

参数:

role_name (str) -- name of the role

返回:

role

返回类型:

Role | None

get_roles()

Get roles keyed by role_name.

返回:

roles keyed by role_name

返回类型:

dict[str, Role]

delete_role(role_name)

Delete role.

参数:

role_name (str) -- role name

返回类型:

Role

create_user(project_id, user_name, password, roles, is_enabled, is_locked, password_expiry_days, description=None)

Create user.

参数:
  • project_id (str) -- project id (required, defaults to DEFAULT_PROJECT_ID)

  • user_name (str) -- user name

  • password (str) -- password

  • roles (list[str]) -- roles

  • is_enabled (bool) -- is enabled

  • is_locked (bool) -- is locked

  • password_expiry_days (int) -- password expiry days

  • description (str | None) -- description

返回:

created user

update_user(user_name, roles=None, is_enabled=None, is_locked=None, password_expiry_days=None, description=None)

Update user.

参数:
  • user_name (str) -- user name

  • roles (list[str] | None) -- roles

  • is_enabled (bool | None) -- is enabled

  • is_locked (bool | None) -- is locked

  • password_expiry_days (int | None) -- password expiry days

  • description (str | None) -- description

返回:

updated user

get_user(user_name=None)

Get user by name.

参数:

user_name (str | None) -- name of the user

返回:

user

返回类型:

User | None

get_user_by_id(user_id)

Get user by ID.

参数:

user_id (str) -- user ID

返回:

user

返回类型:

User | None

get_user_by_name(user_name)

Get user by name.

参数:

user_name (str) -- name of the user

返回:

user

返回类型:

User | None

get_users()

Get users.

返回:

users

返回类型:

dict[str, User]

delete_user(user_name)

Delete user.

参数:

user_name (str) -- user name

返回类型:

User

find_users_by_role(role_name)

Find users by role name.

参数:

role_name (str) -- role name

返回:

list of usernames

返回类型:

list[str]

log_login_attempt(user_name, ip_address, success, failure_reason=None, user_agent=None)

Log login attempt.

参数:
  • user_name (str) -- user name

  • ip_address (str) -- ip address

  • success (bool) -- whether login was successful

  • failure_reason (str | None) -- reason for failure if not successful

  • user_agent (str | None) -- user agent

get_login_logs(user_name=None, limit=100)

Get login logs with optional user_name filter.

参数:
  • user_name (str | None) -- Filter logs by username (optional)

  • limit (int) -- Maximum number of logs to return

返回:

login logs

clear_login_logs(user_id=None, user_name=None)

Clear login logs (all or for a specific user).

参数:
  • user_id (str | None) -- User ID (UUID) to clear logs for (optional)

  • user_name (str | None) -- User name to clear logs for (optional)

返回:

Dictionary with count of deleted logs

返回类型:

dict

add_to_blacklist(token_jti, expires_at)

Add a token to the blacklist.

参数:
  • token_jti (str) -- Unique identifier of the token (JWT 'jti' claim)

  • expires_at (datetime) -- When the token would have expired

返回类型:

None

is_blacklisted(token_jti)

Check if a token is blacklisted.

参数:

token_jti (str) -- Unique identifier of the token (JWT 'jti' claim)

返回:

True if token is blacklisted, False otherwise

返回类型:

bool

static is_password_expired(user)

Check if password has expired.

参数:

user (User) -- user

返回:

is_password_expired

返回类型:

bool

static hash_password(password)

Hash password.

参数:

password (str) -- password

返回:

hashed password

返回类型:

str

static check_password(password, password_hash)

Check if password matches hash.

参数:
  • password (str) -- password

  • password_hash (str) -- hashed password

返回:

passwords are matched

返回类型:

bool

Module contents