SAuth/sauth/json_backend.py

134 lines
4.4 KiB
Python
Raw Permalink Normal View History

2019-01-17 18:20:13 +00:00
# -*-coding:utf-8 -*
import json
import collections
from sauth import user
class JsonBackend:
def __init__(self, directory='users', users_fname='users.json', roles_fname='roles.json'):
"""
directory: directory in which configuration files are (str)
users_fname: filename for users (str)
roles_fname: filename for roles (str)
"""
if not (directory[-1]=='/'):
directory = directory + "/"
self.users_f = directory + users_fname
self.roles_f = directory + roles_fname
self.refresh()
def get_user(self, username):
if username in self._json_users :
data = self._json_users[username]
return user.User(username, **data)
else:
return None
def get_user_roles(self, username):
if username in self._json_users :
data = self._json_users[username]
us = user.User(username, **data)
for r in us.roles:
if(r in self._json_roles):
for sr in self._json_roles[r]:
if not sr in us.roles:
us.roles.append(sr)
return us.roles
else:
return None
def get_roles(self):
return self._json_roles
def get_users(self):
return list(self._json_users.keys())
def add_role(self, role, subrole=None):
print("Role :{} subrole:{}".format(role, subrole))
if role not in self._json_roles:
print("Role added")
self._json_roles[role]=[]
if(subrole is not None):
print("Subrole added")
self._json_roles[role].append(subrole)
self._update_json_roles()
return
def add_user_role(self, username, role):
if 'roles' in self._json_users[username]:
if not role in self._json_users[username]['roles']:
self._json_users[username]['roles'].append(role)
self._update_json()
return True
else:
self._json_users[username]['roles'] = [role]
self._update_json()
return True
return False
def rm_user(self, username):
if username in self._json_users :
del self._json_users[username]
self._update_json()
2019-01-17 18:20:13 +00:00
return True
return False
def rm_role(self, role, subrole=None):
if role in self._json_roles:
if subrole is None:
del self._json_roles[role]
self._update_json_roles()
return True
else:
for i in range(len(self._json_roles[role])):
if self._json_roles[role][i]==subrole :
del self._json_roles[role][i]
self._update_json_roles()
return True
break
else:
return False
def rm_user_role(self, username, role):
if 'roles' in self._json_users[username] and role in self._json_users[username]['roles']:
for i in range(len(self._json_users[username]['roles'])):
if self._json_users[username]['roles'][i]==role:
del self._json_users[username]['roles'][i]
self._update_json()
return True
break
return False
def add_user(self, username, **kwargs):
if (username in self._json_users):
return False
self._json_users[username] = {}
self._update_json()
return True
def update_user(self, username, **kwargs):
if(username in self._json_users):
for e in kwargs:
self._json_users[username][e] = kwargs[e]
self._update_json()
return True
else:
return False
def _update_json(self):
with open(self.users_f, 'w') as f:
json.dump(self._json_users, f, indent=2, ensure_ascii=False)
return
def _update_json_roles(self):
with open(self.roles_f, 'w') as f:
json.dump(self._json_roles, f, ensure_ascii=False)
return
def refresh(self):
with open(self.users_f, 'r') as f:
self._json_users = json.load(f, object_pairs_hook=collections.OrderedDict)
with open(self.roles_f, 'r') as f:
self._json_roles = json.load(f, object_pairs_hook=collections.OrderedDict)