import logging import os import json from dataclasses import dataclass from common.ddb_service.client import DynamoDbUtilsService from _types import Role, PARTITION_KEYS from common.response import bad_request, ok from utils import check_user_existence, get_permissions_by_username, get_user_roles user_table = os.environ.get('MULTI_USER_TABLE') logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) ddb_service = DynamoDbUtilsService(logger=logger) @dataclass class UpsertRoleEvent: role_name: str permissions: [str] creator: str # POST /role def upsert_role(raw_event, ctx): event = UpsertRoleEvent(**json.loads(raw_event['body'])) # check if creator exist if check_user_existence(ddb_service=ddb_service, user_table=user_table, username=event.creator): return bad_request(message=f'creator {event.creator} not exist') if not ctx or 'from_sd_local' not in vars(ctx): # should check the creator permission contains role:all creator_permissions = get_permissions_by_username(ddb_service, user_table, event.creator) if 'role' not in creator_permissions or \ ('all' not in creator_permissions['role'] and 'create' not in creator_permissions['role']): return bad_request(message=f'creator {event.creator} not have permission to create role') if 'all' not in creator_permissions['role']: target_role = _get_role_by_name(event.role_name) if target_role and target_role.creator != event.creator: return bad_request( message=f'creator {event.creator} not have permission to update role {target_role.sort_key}') for permission_str in event.permissions: permission_parts = permission_str.split(':') resource = permission_parts[0] action = permission_parts[1] if resource not in creator_permissions or \ ('all' not in creator_permissions[resource] and action not in creator_permissions[resource]): return bad_request( message=f'creator {event.creator} have not permission to create role with permission: [{permission_str}]') ddb_service.put_items(user_table, Role( kind=PARTITION_KEYS.role, sort_key=event.role_name, permissions=event.permissions, creator=event.creator, ).__dict__) data = { 'role_name': event.role_name, 'permissions': event.permissions, 'creator': event.creator, } return ok(message='role created', data=data) # GET /roles?last_evaluated_key=xxx&limit=10&role=ROLE_NAME&filter=key:value,key:value def list_roles(event, ctx): logger.info(json.dumps(event)) _filter = {} parameters = event['queryStringParameters'] requestor_name = event['requestContext']['authorizer']['username'] requestor_permissions = get_permissions_by_username(ddb_service, user_table, requestor_name) requestor_roles = get_user_roles(ddb_service=ddb_service, user_table_name=user_table, username=requestor_name) role = 0 if parameters: role = parameters['role'] if 'role' in parameters and parameters['role'] else 0 last_token = None if not role: result = ddb_service.query_items(user_table, key_values={ 'kind': PARTITION_KEYS.role }) scan_rows = result if type(result) is tuple: scan_rows = result[0] last_token = result[1] else: scan_rows = ddb_service.query_items(user_table, key_values={ 'kind': PARTITION_KEYS.role, 'sort_key': role }) result = [] for row in scan_rows: r = Role(**(ddb_service.deserialize(row))) role_dto = { 'role_name': r.sort_key, 'creator': r.creator, 'permissions': r.permissions } if 'role' in requestor_permissions and 'all' in requestor_permissions['role']: result.append(role_dto) elif 'role' in requestor_permissions and \ 'list' in requestor_permissions['role'] and r.creator == requestor_name: result.append(role_dto) elif r.sort_key in requestor_roles and 'role' in requestor_permissions and \ 'list' in requestor_permissions['role']: result.append(role_dto) data = { 'roles': result, 'previous_evaluated_key': 'not_applicable', 'last_evaluated_key': last_token } return ok(data=data) def _get_role_by_name(role_name): role_raw = ddb_service.query_items(table=user_table, key_values={ 'kind': PARTITION_KEYS.role, 'sort_key': role_name, }) if not role_raw or len(role_raw) == 0: return None return Role(**(ddb_service.deserialize(role_raw)))