mirror of
https://github.com/tmdinosaurcenter/kiosk-guestbook.git
synced 2026-06-04 03:50:14 -06:00
feat: add role-based access control with database-backed users
This commit is contained in:
@@ -1,7 +1,8 @@
|
||||
from flask import Flask, render_template, request, redirect, url_for, jsonify, abort, Response
|
||||
from flask import Flask, render_template, request, redirect, url_for, jsonify, abort, Response, g
|
||||
from flask_limiter import Limiter
|
||||
from flask_limiter.util import get_remote_address
|
||||
from email_validator import validate_email, EmailNotValidError
|
||||
from werkzeug.security import generate_password_hash, check_password_hash
|
||||
from functools import wraps
|
||||
import sqlite3
|
||||
import logging
|
||||
@@ -70,6 +71,15 @@ MIGRATIONS = [
|
||||
'CREATE INDEX IF NOT EXISTS idx_guests_id ON guests (id DESC)',
|
||||
'CREATE INDEX IF NOT EXISTS idx_guests_email ON guests (email)',
|
||||
],
|
||||
# v2 — user accounts for admin interface (role: 'admin' or 'viewer')
|
||||
[
|
||||
'''CREATE TABLE IF NOT EXISTS users (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
username TEXT NOT NULL UNIQUE,
|
||||
password_hash TEXT NOT NULL,
|
||||
role TEXT NOT NULL CHECK(role IN ('admin', 'viewer'))
|
||||
)''',
|
||||
],
|
||||
]
|
||||
|
||||
def migrate_db():
|
||||
@@ -178,26 +188,64 @@ def index():
|
||||
logger.info("Rendering index with %d guests.", len(guests))
|
||||
return render_template('index.html', error=error, guests=guests)
|
||||
|
||||
def require_admin_auth(f):
|
||||
def _authenticate():
|
||||
"""Returns (username, role) for the current request, or None if unauthenticated.
|
||||
Role is 'superadmin', 'admin', or 'viewer'."""
|
||||
auth = request.authorization
|
||||
if not auth:
|
||||
return None
|
||||
admin_user = os.environ.get('ADMIN_USER')
|
||||
admin_password = os.environ.get('ADMIN_PASSWORD')
|
||||
if admin_user and auth.username == admin_user and auth.password == admin_password:
|
||||
return (auth.username, 'superadmin')
|
||||
try:
|
||||
conn = sqlite3.connect(DATABASE)
|
||||
c = conn.cursor()
|
||||
row = c.execute(
|
||||
'SELECT password_hash, role FROM users WHERE username = ?', (auth.username,)
|
||||
).fetchone()
|
||||
conn.close()
|
||||
if row and check_password_hash(row[0], auth.password):
|
||||
return (auth.username, row[1])
|
||||
except sqlite3.Error as e:
|
||||
logger.error("Database error during authentication: %s", e)
|
||||
return None
|
||||
|
||||
def _unauthorized():
|
||||
return Response('Authentication required.', 401, {'WWW-Authenticate': 'Basic realm="Admin"'})
|
||||
|
||||
def require_any_auth(f):
|
||||
"""Allows superadmin, admin, and viewer roles."""
|
||||
@wraps(f)
|
||||
def decorated(*args, **kwargs):
|
||||
if not os.environ.get('ADMIN_USER') or not os.environ.get('ADMIN_PASSWORD'):
|
||||
logger.error("ADMIN_USER and ADMIN_PASSWORD must be set to enable the admin interface.")
|
||||
abort(503)
|
||||
user = _authenticate()
|
||||
if user is None:
|
||||
return _unauthorized()
|
||||
g.current_user, g.current_role = user
|
||||
return f(*args, **kwargs)
|
||||
return decorated
|
||||
|
||||
def require_superadmin(f):
|
||||
"""Allows only the bootstrap superadmin."""
|
||||
@wraps(f)
|
||||
def decorated(*args, **kwargs):
|
||||
admin_user = os.environ.get('ADMIN_USER')
|
||||
admin_password = os.environ.get('ADMIN_PASSWORD')
|
||||
if not admin_user or not admin_password:
|
||||
logger.error("ADMIN_USER and ADMIN_PASSWORD must be set to enable the admin interface.")
|
||||
abort(503)
|
||||
auth = request.authorization
|
||||
if not auth or auth.username != admin_user or auth.password != admin_password:
|
||||
return Response(
|
||||
'Authentication required.',
|
||||
401,
|
||||
{'WWW-Authenticate': 'Basic realm="Admin"'}
|
||||
)
|
||||
return _unauthorized()
|
||||
g.current_user = auth.username
|
||||
g.current_role = 'superadmin'
|
||||
return f(*args, **kwargs)
|
||||
return decorated
|
||||
|
||||
@app.route('/admin')
|
||||
@require_admin_auth
|
||||
@require_any_auth
|
||||
def admin():
|
||||
page = request.args.get('page', 1, type=int)
|
||||
per_page = 25
|
||||
@@ -217,11 +265,14 @@ def admin():
|
||||
guests = []
|
||||
total = 0
|
||||
total_pages = (total + per_page - 1) // per_page
|
||||
return render_template('admin.html', guests=guests, page=page, total_pages=total_pages, total=total)
|
||||
return render_template('admin.html', guests=guests, page=page, total_pages=total_pages,
|
||||
total=total, current_role=g.current_role)
|
||||
|
||||
@app.route('/admin/delete/<int:entry_id>', methods=['POST'])
|
||||
@require_admin_auth
|
||||
@require_any_auth
|
||||
def admin_delete(entry_id):
|
||||
if g.current_role == 'viewer':
|
||||
abort(403)
|
||||
try:
|
||||
conn = sqlite3.connect(DATABASE)
|
||||
c = conn.cursor()
|
||||
@@ -233,6 +284,57 @@ def admin_delete(entry_id):
|
||||
logger.error("Database error deleting guest %d: %s", entry_id, e)
|
||||
return redirect(url_for('admin', page=request.args.get('page', 1)))
|
||||
|
||||
@app.route('/admin/users')
|
||||
@require_superadmin
|
||||
def admin_users():
|
||||
try:
|
||||
conn = sqlite3.connect(DATABASE)
|
||||
c = conn.cursor()
|
||||
users = c.execute('SELECT id, username, role FROM users ORDER BY username').fetchall()
|
||||
conn.close()
|
||||
except sqlite3.Error as e:
|
||||
logger.error("Database error in admin_users: %s", e)
|
||||
users = []
|
||||
return render_template('admin_users.html', users=users)
|
||||
|
||||
@app.route('/admin/users/add', methods=['POST'])
|
||||
@require_superadmin
|
||||
def admin_users_add():
|
||||
username = request.form.get('username', '').strip()
|
||||
password = request.form.get('password', '').strip()
|
||||
role = request.form.get('role', '').strip()
|
||||
if not username or not password or role not in ('admin', 'viewer'):
|
||||
return redirect(url_for('admin_users'))
|
||||
try:
|
||||
conn = sqlite3.connect(DATABASE)
|
||||
c = conn.cursor()
|
||||
c.execute(
|
||||
'INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)',
|
||||
(username, generate_password_hash(password), role)
|
||||
)
|
||||
conn.commit()
|
||||
conn.close()
|
||||
logger.info("Superadmin added user '%s' with role '%s'", username, role)
|
||||
except sqlite3.IntegrityError:
|
||||
logger.warning("Attempted to add duplicate username '%s'", username)
|
||||
except sqlite3.Error as e:
|
||||
logger.error("Database error adding user: %s", e)
|
||||
return redirect(url_for('admin_users'))
|
||||
|
||||
@app.route('/admin/users/delete/<int:user_id>', methods=['POST'])
|
||||
@require_superadmin
|
||||
def admin_users_delete(user_id):
|
||||
try:
|
||||
conn = sqlite3.connect(DATABASE)
|
||||
c = conn.cursor()
|
||||
c.execute('DELETE FROM users WHERE id = ?', (user_id,))
|
||||
conn.commit()
|
||||
conn.close()
|
||||
logger.info("Superadmin deleted user id=%d", user_id)
|
||||
except sqlite3.Error as e:
|
||||
logger.error("Database error deleting user %d: %s", user_id, e)
|
||||
return redirect(url_for('admin_users'))
|
||||
|
||||
@app.route('/api/guests', methods=['GET'])
|
||||
def api_guests():
|
||||
api_key = request.headers.get('X-API-Key')
|
||||
|
||||
Reference in New Issue
Block a user