# -*-coding:utf-8 -* import logging logger = logging.getLogger() import bottle import hashlib import os def get_session(): return bottle.request.environ['beaker.session'] def get_username(): session = get_session() if 'username' in session: username = get_session()['username'] if username is None or username=='' : return None else: return username return None def get_roles(): session = get_session() if 'roles' in session: roles = get_session()['roles'] if roles is None or roles=='' : return None else: return roles return None def _redirect(location): if location is None : bottle.abort(401, "Sorry, access denied.") else: bottle.redirect(location) class SAuth(object): """ Simple Auth class with decorators for bottle """ def __init__(self, backend=None, backend_infos={}, hashing_algorithm=None): """Auth/Authorization/Accounting class backend: backend to be used (str) backend_infos: arguments passed to backend constructor hashing_algorithm: hashing algorith to use, sha256 by default """ if hashing_algorithm in hashlib.algorithms_available : self.hashing_algorithm = hashing_algorithm elif 'sha256' in hashlib.algorithms_available : self.hashing_algorithm = 'sha256' # Setup JsonBackend by default # Currently no other backend available if backend is None or backend == 'json': from sauth import json_backend logger.info("Selected backend : json") self._backend = json_backend.JsonBackend(**backend_infos) else: raise ValueError("Selected backend not found") def make_auth_decorator(self, username=None, role=None, fail_redirect='/login'): ''' Create a decorator to be used for authentication and authorization :param username: A resource can be protected for a specific user :param role: Minimum role level required for authorization :param fail_redirect: The URL to redirect to if a login is required. ''' session_manager = self def auth_require(username=username, role=role, fail_redirect=fail_redirect): def decorator(func): import functools @functools.wraps(func) def wrapper(*a, **ka): session_manager._require(username=username, role=role, fail_redirect=fail_redirect) return func(*a, **ka) return wrapper return decorator return(auth_require) def get_user(self, username=None): """ return a User object for the current user or a specified username return None if user nout logged in or not found """ if username is None : user = get_username() if(user is None): return None else: return self.get_user(username=user) else: return self._backend.get_user(username) def get_user_roles(self, username=None): """ return a list of the roles of user (main roles and herited) return None if not found """ if username is None : user = get_username() if(user is None): return None else: return self.get_user_roles(user) else: return self._backend.get_user_roles(username) def _require(self, username=None, role=None, fail_redirect=None): """ Teste authentification for decorator """ # Authentication user = get_username() if user is None: _redirect(fail_redirect) if(username is not None): if not (user==username): _redirect(fail_redirect) if(role is not None): roles = get_roles() if not (role in roles): _redirect(fail_redirect) return # success def login(self, username, password, success_redirect=None, fail_redirect=None): """ Login function username : str password : str success_redirect and fail_redirect : locations on which to redirect if login is successful or not """ user = self.get_user(username) ip = bottle.request.environ.get('REMOTE_ADDR') if(user is not None): pw=user.password if pw is None or not len(pw.split('/'))==3 : logger.warn("Login FAIL user {} from {}".format(username, ip)) _redirect(fail_redirect) return False pw=pw.split('/') hashing_algorithm = pw[0] salt = bytes.fromhex(pw[1]) dk = hashlib.pbkdf2_hmac(hashing_algorithm, password.encode('utf-8'), salt, 100000) pwtest = dk.hex() if(pw[2]==pwtest): get_session()['username']=username get_session()['roles']=self.get_user_roles(username) logger.info("Login OK user {} from {}".format(username, ip)) if(success_redirect is not None): _redirect(success_redirect) return True logger.warn("Login FAIL user {} from {}".format(username, ip)) _redirect(fail_redirect) return False def logout(self, redirect=None): """ Logout function. Destroys session. """ usn = get_username() ip = bottle.request.environ.get('REMOTE_ADDR') session = get_session() session['username']=None session.delete() logger.info("Logout user {} from {}".format(usn, ip)) if(redirect is not None): _redirect(redirect) return def add_user(self, username, password): """ Add a new user username: str (return False if alsready exists) password: str """ if self.get_user(username) is not None: return False logger.info("Adduser {}".format(username)) self._backend.add_user(username) self.set_password(username, password) return True def add_role(self, role, subrole=None): """ Add a role or a subrole (role given by another) """ self._backend.add_role(role, subrole=subrole) return def add_user_role(self, username, role): """ Add a role to user """ return self._backend.add_user_role(username, role) def rm_user(self, username): """ Delete a user by name """ return self._backend.rm_user(username) def rm_role(self, role, subrole=None): """ Delete a role. If subrole is not None, delete the subrole of role """ return self._backend.rm_role(role, subrole=subrole) def rm_user_role(self, username, role): """ Remove a role to user """ return self._backend.rm_user_role(username, role) def update_user(self, username=None, **kwargs): if username is None : username = get_username() self._backend.update_user(username, **kwargs) return def set_password(self, username, password): """ Set a new password to user username: username of the user (str) password: new password (str) """ logger.info("Change password {}".format(username)) salt = os.urandom(16) dk = hashlib.pbkdf2_hmac(self.hashing_algorithm, password.encode('utf-8'), salt, 100000) ssalt = salt.hex() spassword = dk.hex() s = self.hashing_algorithm + "/" + ssalt + "/" + spassword return self._backend.update_user(username, password=s) def get_all_usernames(self): return self._backend.get_users() def get_all_roles(self): return self._backend.get_roles() def refresh(self): if 'refresh' in dir(self._backend): self._backend.refresh() return return