refactor: migrate admin auth from HTTP Basic to Flask-Login sessions

Replaces browser-cached Basic Auth credentials with proper server-side
session management. Logout now fully invalidates the session. Adds an
HTML login form at /admin/login, SECRET_KEY env var support, and updates
README with key generation instructions and role table.
This commit is contained in:
2026-03-10 11:41:16 -06:00
parent 94d6690e57
commit 2d4eac6583
6 changed files with 214 additions and 82 deletions
+136 -74
View File
@@ -1,13 +1,16 @@
from flask import Flask, render_template, request, redirect, url_for, jsonify, abort, Response, g
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from email_validator import validate_email, EmailNotValidError
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps
import sqlite3
import logging
import os
import re
import sqlite3
from email_validator import validate_email, EmailNotValidError
from flask import Flask, render_template, request, redirect, url_for, jsonify, abort
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_login import (
LoginManager, UserMixin, login_user, logout_user, login_required, current_user
)
from werkzeug.security import generate_password_hash, check_password_hash
# Set up logging
logging.basicConfig(level=logging.INFO)
@@ -15,8 +18,53 @@ logger = logging.getLogger(__name__)
app = Flask(__name__)
DATABASE = os.environ.get('DATABASE_PATH', 'guestbook.db')
app.secret_key = os.environ.get('SECRET_KEY', 'dev-secret-key-change-in-production')
limiter = Limiter(get_remote_address, app=app, default_limits=[])
login_manager = LoginManager(app)
login_manager.login_view = 'admin_login'
# ---------------------------------------------------------------------------
# User model
# ---------------------------------------------------------------------------
class User(UserMixin):
"""Lightweight user object stored in the session."""
def __init__(self, user_id, username, role):
# user_id format: 's:<username>' for superadmin, 'u:<db_id>' for DB users
self.id = user_id
self.username = username
self.role = role
@login_manager.user_loader
def load_user(user_id):
if user_id.startswith('s:'):
username = user_id[2:]
admin_user = os.environ.get('ADMIN_USER')
if admin_user and username == admin_user:
return User(user_id, username, 'superadmin')
return None
if user_id.startswith('u:'):
db_id = user_id[2:]
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
row = c.execute(
'SELECT id, username, role FROM users WHERE id = ?', (db_id,)
).fetchone()
conn.close()
if row:
return User(f'u:{row[0]}', row[1], row[2])
except sqlite3.Error as e:
logger.error("Database error in user_loader: %s", e)
return None
# ---------------------------------------------------------------------------
# Profanity filter
# ---------------------------------------------------------------------------
def load_banned_words():
banned_words = set()
file_path = os.path.join(os.path.dirname(__file__), 'en.txt')
@@ -53,6 +101,10 @@ def contains_banned_words(text):
return True
return False
# ---------------------------------------------------------------------------
# Database migrations
# ---------------------------------------------------------------------------
# Each entry is a list of SQL statements for that schema version.
# To add a column or index in the future, append a new list — never modify existing entries.
MIGRATIONS = [
@@ -121,6 +173,10 @@ def is_valid_email(email):
with app.app_context():
migrate_db()
# ---------------------------------------------------------------------------
# Public routes
# ---------------------------------------------------------------------------
@app.route('/', methods=['GET', 'POST'])
@limiter.limit("5 per minute", methods=["POST"])
def index():
@@ -188,65 +244,62 @@ def index():
logger.info("Rendering index with %d guests.", len(guests))
return render_template('index.html', error=error, guests=guests)
def _authenticate():
"""Returns (username, role) for the current request, or None if unauthenticated.
Role is 'superadmin', 'admin', or 'viewer'."""
auth = request.authorization
if not auth:
return None
admin_user = os.environ.get('ADMIN_USER')
admin_password = os.environ.get('ADMIN_PASSWORD')
if admin_user and auth.username == admin_user and auth.password == admin_password:
return (auth.username, 'superadmin')
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
row = c.execute(
'SELECT password_hash, role FROM users WHERE username = ?', (auth.username,)
).fetchone()
conn.close()
if row and check_password_hash(row[0], auth.password):
return (auth.username, row[1])
except sqlite3.Error as e:
logger.error("Database error during authentication: %s", e)
return None
# ---------------------------------------------------------------------------
# Admin auth routes
# ---------------------------------------------------------------------------
def _unauthorized():
return Response('Authentication required.', 401, {'WWW-Authenticate': 'Basic realm="Admin"'})
def _admin_configured():
return bool(os.environ.get('ADMIN_USER') and os.environ.get('ADMIN_PASSWORD'))
def require_any_auth(f):
"""Allows superadmin, admin, and viewer roles."""
@wraps(f)
def decorated(*args, **kwargs):
if not os.environ.get('ADMIN_USER') or not os.environ.get('ADMIN_PASSWORD'):
logger.error("ADMIN_USER and ADMIN_PASSWORD must be set to enable the admin interface.")
abort(503)
user = _authenticate()
if user is None:
return _unauthorized()
g.current_user, g.current_role = user
return f(*args, **kwargs)
return decorated
def require_superadmin(f):
"""Allows only the bootstrap superadmin."""
@wraps(f)
def decorated(*args, **kwargs):
@app.route('/admin/login', methods=['GET', 'POST'])
def admin_login():
if not _admin_configured():
abort(503)
if current_user.is_authenticated:
return redirect(url_for('admin'))
error = None
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
admin_user = os.environ.get('ADMIN_USER')
admin_password = os.environ.get('ADMIN_PASSWORD')
if not admin_user or not admin_password:
abort(503)
auth = request.authorization
if not auth or auth.username != admin_user or auth.password != admin_password:
return _unauthorized()
g.current_user = auth.username
g.current_role = 'superadmin'
return f(*args, **kwargs)
return decorated
# Check superadmin first
if admin_user and username == admin_user and password == admin_password:
login_user(User(f's:{username}', username, 'superadmin'))
logger.info("Superadmin '%s' logged in.", username)
return redirect(request.args.get('next') or url_for('admin'))
# Check DB users
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
row = c.execute(
'SELECT id, password_hash, role FROM users WHERE username = ?', (username,)
).fetchone()
conn.close()
if row and check_password_hash(row[1], password):
login_user(User(f'u:{row[0]}', username, row[2]))
logger.info("User '%s' (role=%s) logged in.", username, row[2])
return redirect(request.args.get('next') or url_for('admin'))
except sqlite3.Error as e:
logger.error("Database error during login: %s", e)
error = 'Invalid username or password.'
logger.warning("Failed login attempt for username '%s'.", username)
return render_template('admin_login.html', error=error)
@app.route('/admin/logout')
def admin_logout():
logout_user()
return redirect(url_for('admin_login'))
# ---------------------------------------------------------------------------
# Admin routes
# ---------------------------------------------------------------------------
@app.route('/admin')
@require_any_auth
@login_required
def admin():
if not _admin_configured():
abort(503)
page = request.args.get('page', 1, type=int)
per_page = 25
offset = (page - 1) * per_page
@@ -266,12 +319,14 @@ def admin():
total = 0
total_pages = (total + per_page - 1) // per_page
return render_template('admin.html', guests=guests, page=page, total_pages=total_pages,
total=total, current_role=g.current_role)
total=total)
@app.route('/admin/delete/<int:entry_id>', methods=['POST'])
@require_any_auth
@login_required
def admin_delete(entry_id):
if g.current_role == 'viewer':
if not _admin_configured():
abort(503)
if current_user.role == 'viewer':
abort(403)
try:
conn = sqlite3.connect(DATABASE)
@@ -284,18 +339,13 @@ def admin_delete(entry_id):
logger.error("Database error deleting guest %d: %s", entry_id, e)
return redirect(url_for('admin', page=request.args.get('page', 1)))
@app.route('/admin/logout')
def admin_logout():
return Response(
'<p style="font-family:sans-serif">You have been logged out. '
'<a href="/admin">Log in again</a></p>',
401,
{'WWW-Authenticate': 'Basic realm="Admin"', 'Content-Type': 'text/html'}
)
@app.route('/admin/users')
@require_superadmin
@login_required
def admin_users():
if not _admin_configured():
abort(503)
if current_user.role != 'superadmin':
abort(403)
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
@@ -307,8 +357,12 @@ def admin_users():
return render_template('admin_users.html', users=users)
@app.route('/admin/users/add', methods=['POST'])
@require_superadmin
@login_required
def admin_users_add():
if not _admin_configured():
abort(503)
if current_user.role != 'superadmin':
abort(403)
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
role = request.form.get('role', '').strip()
@@ -331,8 +385,12 @@ def admin_users_add():
return redirect(url_for('admin_users'))
@app.route('/admin/users/delete/<int:user_id>', methods=['POST'])
@require_superadmin
@login_required
def admin_users_delete(user_id):
if not _admin_configured():
abort(503)
if current_user.role != 'superadmin':
abort(403)
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
@@ -344,6 +402,10 @@ def admin_users_delete(user_id):
logger.error("Database error deleting user %d: %s", user_id, e)
return redirect(url_for('admin_users'))
# ---------------------------------------------------------------------------
# API
# ---------------------------------------------------------------------------
@app.route('/api/guests', methods=['GET'])
def api_guests():
api_key = request.headers.get('X-API-Key')