mirror of
https://github.com/tmdinosaurcenter/kiosk-guestbook.git
synced 2026-06-04 01:28:46 -06:00
272 lines
9.8 KiB
Python
272 lines
9.8 KiB
Python
from flask import Flask, render_template, request, redirect, url_for, jsonify, abort, Response
|
|
from flask_limiter import Limiter
|
|
from flask_limiter.util import get_remote_address
|
|
from email_validator import validate_email, EmailNotValidError
|
|
from functools import wraps
|
|
import sqlite3
|
|
import logging
|
|
import os
|
|
import re
|
|
|
|
# Set up logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = Flask(__name__)
|
|
DATABASE = os.environ.get('DATABASE_PATH', 'guestbook.db')
|
|
limiter = Limiter(get_remote_address, app=app, default_limits=[])
|
|
|
|
def load_banned_words():
|
|
banned_words = set()
|
|
file_path = os.path.join(os.path.dirname(__file__), 'en.txt')
|
|
if os.path.exists(file_path):
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
word = line.strip().lower()
|
|
if word:
|
|
banned_words.add(word)
|
|
logger.info("Loaded %d banned words from file.", len(banned_words))
|
|
except Exception as e:
|
|
logger.error("Error reading banned words file: %s", e)
|
|
banned_words = {"fuck", "shit", "damn", "bitch", "asshole", "cunt", "dick", "piss", "crap", "hell"}
|
|
else:
|
|
logger.warning("Banned words file not found. Using fallback list.")
|
|
banned_words = {"fuck", "shit", "damn", "bitch", "asshole", "cunt", "dick", "piss", "crap", "hell"}
|
|
return banned_words
|
|
|
|
BANNED_WORDS = load_banned_words()
|
|
|
|
def contains_banned_words(text):
|
|
lower = text.lower()
|
|
# Whole-word check (punctuation-stripped) — catches exact matches
|
|
for word in lower.split():
|
|
if word.strip(".,!?;:\"'") in BANNED_WORDS:
|
|
return True
|
|
# Normalized substring check — catches spacing tricks (f u c k) and
|
|
# embedded forms (fucking). Note: may produce false positives on words
|
|
# that contain a banned word as a substring (e.g. "classic" → "ass").
|
|
normalized = re.sub(r'[^a-z]', '', lower)
|
|
for banned in BANNED_WORDS:
|
|
if banned in normalized:
|
|
return True
|
|
return False
|
|
|
|
# Each entry is a list of SQL statements for that schema version.
|
|
# To add a column or index in the future, append a new list — never modify existing entries.
|
|
MIGRATIONS = [
|
|
# v1 — initial schema
|
|
[
|
|
'''CREATE TABLE IF NOT EXISTS guests (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
first_name TEXT NOT NULL,
|
|
last_name TEXT NOT NULL,
|
|
email TEXT,
|
|
location TEXT NOT NULL,
|
|
comment TEXT,
|
|
newsletter_opt_in BOOLEAN DEFAULT 1,
|
|
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
|
|
)''',
|
|
'CREATE INDEX IF NOT EXISTS idx_guests_id ON guests (id DESC)',
|
|
'CREATE INDEX IF NOT EXISTS idx_guests_email ON guests (email)',
|
|
],
|
|
]
|
|
|
|
def migrate_db():
|
|
conn = sqlite3.connect(DATABASE)
|
|
c = conn.cursor()
|
|
|
|
# Bootstrap the version table and seed it at 0 if empty
|
|
c.execute('CREATE TABLE IF NOT EXISTS schema_version (version INTEGER NOT NULL)')
|
|
if c.execute('SELECT COUNT(*) FROM schema_version').fetchone()[0] == 0:
|
|
c.execute('INSERT INTO schema_version VALUES (0)')
|
|
conn.commit()
|
|
|
|
current = c.execute('SELECT version FROM schema_version').fetchone()[0]
|
|
pending = MIGRATIONS[current:]
|
|
|
|
if not pending:
|
|
logger.info("Database schema is up to date at v%d.", current)
|
|
conn.close()
|
|
return
|
|
|
|
for statements in pending:
|
|
current += 1
|
|
logger.info("Applying migration v%d...", current)
|
|
for sql in statements:
|
|
c.execute(sql)
|
|
c.execute('UPDATE schema_version SET version = ?', (current,))
|
|
conn.commit()
|
|
|
|
logger.info("Database migrated to v%d.", current)
|
|
conn.close()
|
|
|
|
def is_valid_email(email):
|
|
try:
|
|
validate_email(email, check_deliverability=False)
|
|
return True
|
|
except EmailNotValidError:
|
|
return False
|
|
|
|
with app.app_context():
|
|
migrate_db()
|
|
|
|
@app.route('/', methods=['GET', 'POST'])
|
|
@limiter.limit("5 per minute", methods=["POST"])
|
|
def index():
|
|
error = None
|
|
if request.method == 'POST':
|
|
logger.info("Received POST request.")
|
|
first_name = request.form.get('first_name', '').strip()
|
|
last_name = request.form.get('last_name', '').strip()
|
|
email = request.form.get('email', '').strip()
|
|
location = request.form.get('location', '').strip()
|
|
comment = request.form.get('comment', '').strip()
|
|
newsletter_opt_in = request.form.get('newsletter_opt_in') == 'on'
|
|
|
|
if not (first_name and last_name and location):
|
|
error = "First name, last name, and location are required."
|
|
logger.warning("Missing required fields.")
|
|
elif email and not is_valid_email(email):
|
|
error = "Invalid email address."
|
|
logger.warning("Invalid email: %s", email)
|
|
elif comment and contains_banned_words(comment):
|
|
error = "Your comment contains inappropriate language. Please revise."
|
|
logger.warning("Profanity detected in comment.")
|
|
|
|
if error:
|
|
try:
|
|
conn = sqlite3.connect(DATABASE)
|
|
c = conn.cursor()
|
|
c.execute('SELECT first_name, location FROM guests ORDER BY id DESC LIMIT 100')
|
|
guests = c.fetchall()
|
|
conn.close()
|
|
except sqlite3.Error as e:
|
|
logger.error("Database error loading guests: %s", e)
|
|
guests = []
|
|
return render_template('index.html', error=error, guests=guests)
|
|
|
|
try:
|
|
conn = sqlite3.connect(DATABASE)
|
|
c = conn.cursor()
|
|
c.execute(
|
|
'''
|
|
INSERT INTO guests (first_name, last_name, email, location, comment, newsletter_opt_in)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
''',
|
|
(first_name, last_name, email, location, comment, newsletter_opt_in)
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
except sqlite3.Error as e:
|
|
logger.error("Database error saving guest: %s", e)
|
|
return render_template('index.html',
|
|
error="Unable to save your entry. Please try again.",
|
|
guests=[])
|
|
logger.info("Added guest: %s %s from %s", first_name, last_name, location)
|
|
return redirect(url_for('index'))
|
|
|
|
try:
|
|
conn = sqlite3.connect(DATABASE)
|
|
c = conn.cursor()
|
|
c.execute('SELECT first_name, location FROM guests ORDER BY id DESC LIMIT 100')
|
|
guests = c.fetchall()
|
|
conn.close()
|
|
except sqlite3.Error as e:
|
|
logger.error("Database error loading guests: %s", e)
|
|
guests = []
|
|
logger.info("Rendering index with %d guests.", len(guests))
|
|
return render_template('index.html', error=error, guests=guests)
|
|
|
|
def require_admin_auth(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
auth = request.authorization
|
|
admin_user = os.environ.get('ADMIN_USER', '')
|
|
admin_password = os.environ.get('ADMIN_PASSWORD', '')
|
|
if not auth or auth.username != admin_user or auth.password != admin_password:
|
|
return Response(
|
|
'Authentication required.',
|
|
401,
|
|
{'WWW-Authenticate': 'Basic realm="Admin"'}
|
|
)
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
@app.route('/admin')
|
|
@require_admin_auth
|
|
def admin():
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 25
|
|
offset = (page - 1) * per_page
|
|
try:
|
|
conn = sqlite3.connect(DATABASE)
|
|
c = conn.cursor()
|
|
total = c.execute('SELECT COUNT(*) FROM guests').fetchone()[0]
|
|
c.execute('''
|
|
SELECT id, first_name, last_name, email, location, comment, newsletter_opt_in, timestamp
|
|
FROM guests ORDER BY id DESC LIMIT ? OFFSET ?
|
|
''', (per_page, offset))
|
|
guests = c.fetchall()
|
|
conn.close()
|
|
except sqlite3.Error as e:
|
|
logger.error("Database error in admin: %s", e)
|
|
guests = []
|
|
total = 0
|
|
total_pages = (total + per_page - 1) // per_page
|
|
return render_template('admin.html', guests=guests, page=page, total_pages=total_pages, total=total)
|
|
|
|
@app.route('/admin/delete/<int:entry_id>', methods=['POST'])
|
|
@require_admin_auth
|
|
def admin_delete(entry_id):
|
|
try:
|
|
conn = sqlite3.connect(DATABASE)
|
|
c = conn.cursor()
|
|
c.execute('DELETE FROM guests WHERE id = ?', (entry_id,))
|
|
conn.commit()
|
|
conn.close()
|
|
logger.info("Admin deleted guest entry id=%d", entry_id)
|
|
except sqlite3.Error as e:
|
|
logger.error("Database error deleting guest %d: %s", entry_id, e)
|
|
return redirect(url_for('admin', page=request.args.get('page', 1)))
|
|
|
|
@app.route('/api/guests', methods=['GET'])
|
|
def api_guests():
|
|
api_key = request.headers.get('X-API-Key')
|
|
if api_key != os.environ.get("API_KEY"):
|
|
abort(403)
|
|
|
|
try:
|
|
conn = sqlite3.connect(DATABASE)
|
|
c = conn.cursor()
|
|
c.execute('''
|
|
SELECT first_name, last_name, email, location, comment, newsletter_opt_in, timestamp
|
|
FROM guests
|
|
WHERE email IS NOT NULL AND email != ''
|
|
ORDER BY id DESC
|
|
''')
|
|
rows = c.fetchall()
|
|
conn.close()
|
|
except sqlite3.Error as e:
|
|
logger.error("Database error in api_guests: %s", e)
|
|
return jsonify({"error": "Database unavailable"}), 503
|
|
|
|
guests = [
|
|
{
|
|
"first_name": row[0],
|
|
"last_name": row[1],
|
|
"email": row[2],
|
|
"location": row[3],
|
|
"comment": row[4],
|
|
"newsletter_opt_in": bool(row[5]),
|
|
"timestamp": row[6]
|
|
}
|
|
for row in rows
|
|
]
|
|
return jsonify(guests)
|
|
|
|
if __name__ == '__main__':
|
|
migrate_db()
|
|
logger.info("Starting development server at http://0.0.0.0:8000")
|
|
app.run(host='0.0.0.0', port=8000)
|