261 lines
8.0 KiB
Python
261 lines
8.0 KiB
Python
# -*-coding:utf-8 -*
|
|
|
|
import logging
|
|
logger = logging.getLogger()
|
|
|
|
import bottle
|
|
import hashlib
|
|
import os
|
|
|
|
|
|
def get_session():
|
|
return bottle.request.environ['beaker.session']
|
|
|
|
def get_username():
|
|
session = get_session()
|
|
if 'username' in session:
|
|
username = get_session()['username']
|
|
if username is None or username=='' :
|
|
return None
|
|
else:
|
|
return username
|
|
return None
|
|
|
|
def get_roles():
|
|
session = get_session()
|
|
if 'roles' in session:
|
|
roles = get_session()['roles']
|
|
if roles is None or roles=='' :
|
|
return None
|
|
else:
|
|
return roles
|
|
return None
|
|
|
|
def _redirect(location):
|
|
if location is None :
|
|
bottle.abort(401, "Sorry, access denied.")
|
|
else:
|
|
bottle.redirect(location)
|
|
|
|
class SAuth(object):
|
|
"""
|
|
Simple Auth class with decorators for bottle
|
|
"""
|
|
|
|
def __init__(self, backend=None, backend_infos={}, hashing_algorithm=None):
|
|
"""Auth/Authorization/Accounting class
|
|
|
|
backend: backend to be used (str)
|
|
backend_infos: arguments passed to backend constructor
|
|
hashing_algorithm: hashing algorith to use, sha256 by default
|
|
"""
|
|
if hashing_algorithm in hashlib.algorithms_available :
|
|
self.hashing_algorithm = hashing_algorithm
|
|
elif 'sha256' in hashlib.algorithms_available :
|
|
self.hashing_algorithm = 'sha256'
|
|
|
|
# Setup JsonBackend by default
|
|
# Currently no other backend available
|
|
if backend is None or backend == 'json':
|
|
from sauth import json_backend
|
|
logger.info("Selected backend : json")
|
|
self._backend = json_backend.JsonBackend(**backend_infos)
|
|
else:
|
|
raise ValueError("Selected backend not found")
|
|
|
|
def make_auth_decorator(self, username=None, role=None, fail_redirect='/login'):
|
|
'''
|
|
Create a decorator to be used for authentication and authorization
|
|
|
|
:param username: A resource can be protected for a specific user
|
|
:param role: Minimum role level required for authorization
|
|
:param fail_redirect: The URL to redirect to if a login is required.
|
|
'''
|
|
session_manager = self
|
|
def auth_require(username=username, role=role, fail_redirect=fail_redirect):
|
|
def decorator(func):
|
|
import functools
|
|
|
|
@functools.wraps(func)
|
|
def wrapper(*a, **ka):
|
|
session_manager._require(username=username, role=role, fail_redirect=fail_redirect)
|
|
return func(*a, **ka)
|
|
return wrapper
|
|
return decorator
|
|
return(auth_require)
|
|
|
|
def get_user(self, username=None):
|
|
"""
|
|
return a User object for the current user or a specified username
|
|
return None if user nout logged in or not found
|
|
"""
|
|
if username is None :
|
|
user = get_username()
|
|
if(user is None):
|
|
return None
|
|
else:
|
|
return self.get_user(username=user)
|
|
else:
|
|
return self._backend.get_user(username)
|
|
|
|
def get_user_roles(self, username=None):
|
|
"""
|
|
return a list of the roles of user (main roles and herited)
|
|
return None if not found
|
|
"""
|
|
if username is None :
|
|
user = get_username()
|
|
if(user is None):
|
|
return None
|
|
else:
|
|
return self.get_user_roles(user)
|
|
else:
|
|
return self._backend.get_user_roles(username)
|
|
|
|
def _require(self, username=None, role=None, fail_redirect=None):
|
|
"""
|
|
Teste authentification for decorator
|
|
"""
|
|
# Authentication
|
|
user = get_username()
|
|
|
|
if user is None:
|
|
_redirect(fail_redirect)
|
|
|
|
if(username is not None):
|
|
if not (user==username):
|
|
_redirect(fail_redirect)
|
|
|
|
if(role is not None):
|
|
roles = get_roles()
|
|
if not (role in roles):
|
|
_redirect(fail_redirect)
|
|
|
|
return # success
|
|
|
|
|
|
def login(self, username, password, success_redirect=None, fail_redirect=None):
|
|
"""
|
|
Login function
|
|
username : str
|
|
password : str
|
|
success_redirect and fail_redirect : locations on which to redirect if
|
|
login is successful or not
|
|
"""
|
|
user = self.get_user(username)
|
|
ip = bottle.request.environ.get('REMOTE_ADDR')
|
|
if(user is not None):
|
|
pw=user.password
|
|
if pw is None or not len(pw.split('/'))==3 :
|
|
logger.warn("Login FAIL user {} from {}".format(username, ip))
|
|
_redirect(fail_redirect)
|
|
return False
|
|
pw=pw.split('/')
|
|
hashing_algorithm = pw[0]
|
|
salt = bytes.fromhex(pw[1])
|
|
dk = hashlib.pbkdf2_hmac(hashing_algorithm, password.encode('utf-8'), salt, 100000)
|
|
pwtest = dk.hex()
|
|
if(pw[2]==pwtest):
|
|
get_session()['username']=username
|
|
get_session()['roles']=self.get_user_roles(username)
|
|
logger.info("Login OK user {} from {}".format(username, ip))
|
|
if(success_redirect is not None):
|
|
_redirect(success_redirect)
|
|
return True
|
|
logger.warn("Login FAIL user {} from {}".format(username, ip))
|
|
_redirect(fail_redirect)
|
|
return False
|
|
|
|
def logout(self, redirect=None):
|
|
"""
|
|
Logout function. Destroys session.
|
|
"""
|
|
usn = get_username()
|
|
ip = bottle.request.environ.get('REMOTE_ADDR')
|
|
session = get_session()
|
|
session['username']=None
|
|
session.delete()
|
|
logger.info("Logout user {} from {}".format(usn, ip))
|
|
if(redirect is not None):
|
|
_redirect(redirect)
|
|
return
|
|
|
|
def add_user(self, username, password):
|
|
"""
|
|
Add a new user
|
|
username: str (return False if alsready exists)
|
|
password: str
|
|
"""
|
|
if self.get_user(username) is not None:
|
|
return False
|
|
logger.info("Adduser {}".format(username))
|
|
self._backend.add_user(username)
|
|
self.set_password(username, password)
|
|
return True
|
|
|
|
def add_role(self, role, subrole=None):
|
|
"""
|
|
Add a role or a subrole
|
|
(role given by another)
|
|
"""
|
|
self._backend.add_role(role, subrole=subrole)
|
|
return
|
|
|
|
def add_user_role(self, username, role):
|
|
"""
|
|
Add a role to user
|
|
"""
|
|
return self._backend.add_user_role(username, role)
|
|
|
|
def rm_user(self, username):
|
|
"""
|
|
Delete a user by name
|
|
"""
|
|
return self._backend.rm_user(username)
|
|
|
|
def rm_role(self, role, subrole=None):
|
|
"""
|
|
Delete a role.
|
|
If subrole is not None, delete the subrole of role
|
|
"""
|
|
return self._backend.rm_role(role, subrole=subrole)
|
|
|
|
def rm_user_role(self, username, role):
|
|
"""
|
|
Remove a role to user
|
|
"""
|
|
return self._backend.rm_user_role(username, role)
|
|
|
|
def update_user(self, username=None, **kwargs):
|
|
if username is None :
|
|
username = get_username()
|
|
self._backend.update_user(username, **kwargs)
|
|
return
|
|
|
|
def set_password(self, username, password):
|
|
"""
|
|
Set a new password to user
|
|
username: username of the user (str)
|
|
password: new password (str)
|
|
"""
|
|
logger.info("Change password {}".format(username))
|
|
salt = os.urandom(16)
|
|
dk = hashlib.pbkdf2_hmac(self.hashing_algorithm, password.encode('utf-8'), salt, 100000)
|
|
ssalt = salt.hex()
|
|
spassword = dk.hex()
|
|
s = self.hashing_algorithm + "/" + ssalt + "/" + spassword
|
|
return self._backend.update_user(username, password=s)
|
|
|
|
def get_all_usernames(self):
|
|
return self._backend.get_users()
|
|
|
|
def get_all_roles(self):
|
|
return self._backend.get_roles()
|
|
|
|
def refresh(self):
|
|
if 'refresh' in dir(self._backend):
|
|
self._backend.refresh()
|
|
return
|
|
return
|
|
|