Files
kiosk-guestbook/app.py
T
steve 9ad7128619 feat: add security headers, session hardening, and admin cache control
Sets X-Content-Type-Options, X-Frame-Options, and Referrer-Policy on
all responses. Prevents browsers from caching admin pages. Configures
session cookies as HttpOnly and SameSite=Lax with an 8-hour lifetime.
2026-03-28 23:23:53 -06:00

508 lines
18 KiB
Python

import logging
import os
import re
import sqlite3
import threading
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
from email_validator import validate_email, EmailNotValidError
from flask import Flask, render_template, request, redirect, url_for, jsonify, abort
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from flask_login import (
LoginManager, UserMixin, login_user, logout_user, login_required, current_user
)
from werkzeug.security import generate_password_hash, check_password_hash
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__)
DATABASE = os.environ.get('DATABASE_PATH', 'guestbook.db')
_secret_key = os.environ.get('SECRET_KEY')
if not _secret_key:
raise RuntimeError("SECRET_KEY environment variable must be set")
app.secret_key = _secret_key
limiter = Limiter(get_remote_address, app=app, default_limits=[])
app.config.update(
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='Lax',
PERMANENT_SESSION_LIFETIME=timedelta(hours=8),
)
@app.after_request
def set_security_headers(response):
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
if request.path.startswith('/admin'):
response.headers['Cache-Control'] = 'no-store'
return response
_DISPLAY_TZ = ZoneInfo('America/Denver')
@app.template_filter('localtime')
def localtime_filter(value):
if not value:
return value
try:
dt = datetime.strptime(str(value), '%Y-%m-%d %H:%M:%S')
dt = dt.replace(tzinfo=timezone.utc)
return dt.astimezone(_DISPLAY_TZ).strftime('%Y-%m-%d %H:%M')
except ValueError:
return value
login_manager = LoginManager(app)
login_manager.login_view = 'admin_login'
# ---------------------------------------------------------------------------
# User model
# ---------------------------------------------------------------------------
class User(UserMixin):
"""Lightweight user object stored in the session."""
def __init__(self, user_id, username, role):
# user_id format: 's:<username>' for superadmin, 'u:<db_id>' for DB users
self.id = user_id
self.username = username
self.role = role
@login_manager.user_loader
def load_user(user_id):
if user_id.startswith('s:'):
username = user_id[2:]
admin_user = os.environ.get('ADMIN_USER')
if admin_user and username == admin_user:
return User(user_id, username, 'superadmin')
return None
if user_id.startswith('u:'):
db_id = user_id[2:]
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
row = c.execute(
'SELECT id, username, role FROM users WHERE id = ?', (db_id,)
).fetchone()
conn.close()
if row:
return User(f'u:{row[0]}', row[1], row[2])
except sqlite3.Error as e:
logger.error("Database error in user_loader: %s", e)
return None
# ---------------------------------------------------------------------------
# Profanity filter
# ---------------------------------------------------------------------------
def load_banned_words():
banned_words = set()
file_path = os.path.join(os.path.dirname(__file__), 'en.txt')
if os.path.exists(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
word = line.strip().lower()
if word:
banned_words.add(word)
logger.info("Loaded %d banned words from file.", len(banned_words))
except Exception as e:
logger.error("Error reading banned words file: %s", e)
banned_words = {"fuck", "shit", "damn", "bitch", "asshole", "cunt", "dick", "piss", "crap", "hell"}
else:
logger.warning("Banned words file not found. Using fallback list.")
banned_words = {"fuck", "shit", "damn", "bitch", "asshole", "cunt", "dick", "piss", "crap", "hell"}
return banned_words
BANNED_WORDS = load_banned_words()
def contains_banned_words(text):
lower = text.lower()
# Whole-word check (punctuation-stripped) — catches exact matches
for word in lower.split():
if word.strip(".,!?;:\"'") in BANNED_WORDS:
return True
# Normalized substring check — catches spacing tricks (f u c k) and
# embedded forms (fucking). Note: may produce false positives on words
# that contain a banned word as a substring (e.g. "classic" → "ass").
normalized = re.sub(r'[^a-z]', '', lower)
for banned in BANNED_WORDS:
if banned in normalized:
return True
return False
# ---------------------------------------------------------------------------
# Database migrations
# ---------------------------------------------------------------------------
# Each entry is a list of SQL statements for that schema version.
# To add a column or index in the future, append a new list — never modify existing entries.
MIGRATIONS = [
# v1 — initial schema
[
'''CREATE TABLE IF NOT EXISTS guests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
first_name TEXT NOT NULL,
last_name TEXT NOT NULL,
email TEXT,
location TEXT NOT NULL,
comment TEXT,
newsletter_opt_in BOOLEAN DEFAULT 1,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)''',
'CREATE INDEX IF NOT EXISTS idx_guests_id ON guests (id DESC)',
'CREATE INDEX IF NOT EXISTS idx_guests_email ON guests (email)',
],
# v2 — user accounts for admin interface (role: 'admin' or 'viewer')
[
'''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
role TEXT NOT NULL CHECK(role IN ('admin', 'viewer'))
)''',
],
]
def migrate_db():
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
# Bootstrap the version table and seed it at 0 if empty
c.execute('CREATE TABLE IF NOT EXISTS schema_version (version INTEGER NOT NULL)')
if c.execute('SELECT COUNT(*) FROM schema_version').fetchone()[0] == 0:
c.execute('INSERT INTO schema_version VALUES (0)')
conn.commit()
current = c.execute('SELECT version FROM schema_version').fetchone()[0]
pending = MIGRATIONS[current:]
if not pending:
logger.info("Database schema is up to date at v%d.", current)
conn.close()
return
for statements in pending:
current += 1
logger.info("Applying migration v%d...", current)
for sql in statements:
c.execute(sql)
c.execute('UPDATE schema_version SET version = ?', (current,))
conn.commit()
logger.info("Database migrated to v%d.", current)
conn.close()
def is_valid_email(email):
try:
validate_email(email, check_deliverability=False)
return True
except EmailNotValidError:
return False
with app.app_context():
migrate_db()
# ---------------------------------------------------------------------------
# Webhook
# ---------------------------------------------------------------------------
def _fire_webhook(payload):
url = os.environ.get("WEBHOOK_URL", "")
if not url:
return
try:
import urllib.request, json as _json
data = _json.dumps(payload).encode()
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
urllib.request.urlopen(req, timeout=5)
except Exception as e:
logger.warning("Webhook delivery failed: %s", e)
# ---------------------------------------------------------------------------
# Public routes
# ---------------------------------------------------------------------------
@app.route('/', methods=['GET', 'POST'])
@limiter.limit("5 per minute", methods=["POST"])
def index():
error = None
if request.method == 'POST':
logger.info("Received POST request.")
first_name = request.form.get('first_name', '').strip()
last_name = request.form.get('last_name', '').strip()
email = request.form.get('email', '').strip()
location = request.form.get('location', '').strip()
comment = request.form.get('comment', '').strip()
newsletter_opt_in = request.form.get('newsletter_opt_in') == 'on'
if not (first_name and last_name and location):
error = "First name, last name, and location are required."
logger.warning("Missing required fields.")
elif email and not is_valid_email(email):
error = "Invalid email address."
logger.warning("Invalid email: %s", email)
elif comment and contains_banned_words(comment):
error = "Your comment contains inappropriate language. Please revise."
logger.warning("Profanity detected in comment.")
if error:
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute('SELECT first_name, location FROM guests ORDER BY id DESC LIMIT 100')
guests = c.fetchall()
conn.close()
except sqlite3.Error as e:
logger.error("Database error loading guests: %s", e)
guests = []
return render_template('index.html', error=error, guests=guests)
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute(
'''
INSERT INTO guests (first_name, last_name, email, location, comment, newsletter_opt_in)
VALUES (?, ?, ?, ?, ?, ?)
''',
(first_name, last_name, email, location, comment, newsletter_opt_in)
)
conn.commit()
conn.close()
except sqlite3.Error as e:
logger.error("Database error saving guest: %s", e)
return render_template('index.html',
error="Unable to save your entry. Please try again.",
guests=[])
logger.info("Added guest: %s %s from %s", first_name, last_name, location)
threading.Thread(target=_fire_webhook, args=({
"first_name": first_name,
"last_name": last_name,
"email": email,
"location": location,
"newsletter_opt_in": newsletter_opt_in,
},), daemon=True).start()
return redirect(url_for('index'))
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute('SELECT first_name, location FROM guests ORDER BY id DESC LIMIT 100')
guests = c.fetchall()
conn.close()
except sqlite3.Error as e:
logger.error("Database error loading guests: %s", e)
guests = []
logger.info("Rendering index with %d guests.", len(guests))
return render_template('index.html', error=error, guests=guests)
# ---------------------------------------------------------------------------
# Admin auth routes
# ---------------------------------------------------------------------------
def _admin_configured():
return bool(os.environ.get('ADMIN_USER') and os.environ.get('ADMIN_PASSWORD'))
@app.route('/admin/login', methods=['GET', 'POST'])
@limiter.limit("10 per minute", methods=["POST"])
def admin_login():
if not _admin_configured():
abort(503)
if current_user.is_authenticated:
return redirect(url_for('admin'))
error = None
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
admin_user = os.environ.get('ADMIN_USER')
admin_password = os.environ.get('ADMIN_PASSWORD')
# Check superadmin first
if admin_user and username == admin_user and password == admin_password:
login_user(User(f's:{username}', username, 'superadmin'))
logger.info("Superadmin '%s' logged in.", username)
return redirect(request.args.get('next') or url_for('admin'))
# Check DB users
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
row = c.execute(
'SELECT id, password_hash, role FROM users WHERE username = ?', (username,)
).fetchone()
conn.close()
if row and check_password_hash(row[1], password):
login_user(User(f'u:{row[0]}', username, row[2]))
logger.info("User '%s' (role=%s) logged in.", username, row[2])
return redirect(request.args.get('next') or url_for('admin'))
except sqlite3.Error as e:
logger.error("Database error during login: %s", e)
error = 'Invalid username or password.'
logger.warning("Failed login attempt for username '%s'.", username)
return render_template('admin_login.html', error=error)
@app.route('/admin/logout')
def admin_logout():
logout_user()
return redirect(url_for('admin_login'))
# ---------------------------------------------------------------------------
# Admin routes
# ---------------------------------------------------------------------------
@app.route('/admin')
@login_required
def admin():
if not _admin_configured():
abort(503)
page = request.args.get('page', 1, type=int)
per_page = 25
offset = (page - 1) * per_page
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
total = c.execute('SELECT COUNT(*) FROM guests').fetchone()[0]
c.execute('''
SELECT id, first_name, last_name, email, location, comment, newsletter_opt_in, timestamp
FROM guests ORDER BY id DESC LIMIT ? OFFSET ?
''', (per_page, offset))
guests = c.fetchall()
conn.close()
except sqlite3.Error as e:
logger.error("Database error in admin: %s", e)
guests = []
total = 0
total_pages = (total + per_page - 1) // per_page
return render_template('admin.html', guests=guests, page=page, total_pages=total_pages,
total=total)
@app.route('/admin/delete/<int:entry_id>', methods=['POST'])
@login_required
def admin_delete(entry_id):
if not _admin_configured():
abort(503)
if current_user.role == 'viewer':
abort(403)
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute('DELETE FROM guests WHERE id = ?', (entry_id,))
conn.commit()
conn.close()
logger.info("Admin deleted guest entry id=%d", entry_id)
except sqlite3.Error as e:
logger.error("Database error deleting guest %d: %s", entry_id, e)
return redirect(url_for('admin', page=request.args.get('page', 1)))
@app.route('/admin/users')
@login_required
def admin_users():
if not _admin_configured():
abort(503)
if current_user.role != 'superadmin':
abort(403)
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
users = c.execute('SELECT id, username, role FROM users ORDER BY username').fetchall()
conn.close()
except sqlite3.Error as e:
logger.error("Database error in admin_users: %s", e)
users = []
return render_template('admin_users.html', users=users)
@app.route('/admin/users/add', methods=['POST'])
@login_required
def admin_users_add():
if not _admin_configured():
abort(503)
if current_user.role != 'superadmin':
abort(403)
username = request.form.get('username', '').strip()
password = request.form.get('password', '').strip()
role = request.form.get('role', '').strip()
if not username or not password or role not in ('admin', 'viewer'):
return redirect(url_for('admin_users'))
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute(
'INSERT INTO users (username, password_hash, role) VALUES (?, ?, ?)',
(username, generate_password_hash(password), role)
)
conn.commit()
conn.close()
logger.info("Superadmin added user '%s' with role '%s'", username, role)
except sqlite3.IntegrityError:
logger.warning("Attempted to add duplicate username '%s'", username)
except sqlite3.Error as e:
logger.error("Database error adding user: %s", e)
return redirect(url_for('admin_users'))
@app.route('/admin/users/delete/<int:user_id>', methods=['POST'])
@login_required
def admin_users_delete(user_id):
if not _admin_configured():
abort(503)
if current_user.role != 'superadmin':
abort(403)
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute('DELETE FROM users WHERE id = ?', (user_id,))
conn.commit()
conn.close()
logger.info("Superadmin deleted user id=%d", user_id)
except sqlite3.Error as e:
logger.error("Database error deleting user %d: %s", user_id, e)
return redirect(url_for('admin_users'))
# ---------------------------------------------------------------------------
# API
# ---------------------------------------------------------------------------
@app.route('/api/guests', methods=['GET'])
@limiter.limit("100 per hour")
def api_guests():
api_key = request.headers.get('X-API-Key')
if api_key != os.environ.get("API_KEY"):
abort(403)
try:
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute('''
SELECT first_name, last_name, email, location, comment, newsletter_opt_in, timestamp
FROM guests
WHERE email IS NOT NULL AND email != ''
ORDER BY id DESC
''')
rows = c.fetchall()
conn.close()
except sqlite3.Error as e:
logger.error("Database error in api_guests: %s", e)
return jsonify({"error": "Database unavailable"}), 503
guests = [
{
"first_name": row[0],
"last_name": row[1],
"email": row[2],
"location": row[3],
"comment": row[4],
"newsletter_opt_in": bool(row[5]),
"timestamp": row[6]
}
for row in rows
]
return jsonify(guests)
if __name__ == '__main__':
migrate_db()
logger.info("Starting development server at http://0.0.0.0:8000")
app.run(host='0.0.0.0', port=8000)