mybuddy/babybuddy/management/commands/createuser.py

175 lines
5.8 KiB
Python

# -*- coding: utf-8 -*-
"""
Management utility to create users
Example usage:
manage.py createuser \
--username test \
--email test@test.test \
"""
import sys
import getpass
from django.contrib.auth import get_user_model, models
from django.contrib.auth.password_validation import validate_password
from django.core import exceptions
from django.core.management.base import BaseCommand, CommandError
from django.db import DEFAULT_DB_ALIAS
from django.utils.functional import cached_property
from django.utils.text import capfirst
class NotRunningInTTYException(Exception):
pass
class Command(BaseCommand):
help = "Used to create a user"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.UserModel = get_user_model()
self.username_field = self.UserModel._meta.get_field(
self.UserModel.USERNAME_FIELD
)
def add_arguments(self, parser):
parser.add_argument(
f"--{self.UserModel.USERNAME_FIELD}",
help="Specifies the login for a user.",
)
parser.add_argument(
"--email",
dest="email",
default="",
help="Specifies the email for the user. Optional.",
)
parser.add_argument(
"--password",
dest="password",
help="Specifies the password for the user. Optional.",
)
parser.add_argument(
"--read-only",
action="store_true",
default=False,
help="Specifies read-only privileges for the user. Default is False.",
)
parser.add_argument(
"--is-staff",
dest="is_staff",
action="store_true",
default=False,
help="Specifies the staff status for the user. Default is False.",
)
def handle(self, *args, **options):
username = options.get(self.UserModel.USERNAME_FIELD)
password = options.get("password")
is_read_only = options.get("read_only")
user_data = {}
user_password = options.get("password")
verbose_field_name = self.username_field.verbose_name
try:
error_msg = self._validate_username(
username, verbose_field_name, DEFAULT_DB_ALIAS
)
if error_msg:
raise CommandError(error_msg)
user_data[self.UserModel.USERNAME_FIELD] = username
# Prompt for a password interactively (if password not set via arg)
while password is None:
password = getpass.getpass()
password2 = getpass.getpass("Password (again): ")
if password.strip() == "":
self.stderr.write("Error: Blank passwords aren't allowed.")
password = None
continue
if password != password2:
self.stderr.write("Error: Your passwords didn't match.")
password = None
continue
try:
validate_password(password, self.UserModel(**user_data))
except exceptions.ValidationError as err:
self.stderr.write("\n".join(err.messages))
response = input(
"Bypass password validation and create user anyway? [y/N]: "
)
if response.lower() != "y":
password = None
continue
user_password = password
user = self.UserModel._default_manager.db_manager(
DEFAULT_DB_ALIAS
).create_user(**user_data, password=user_password)
user.email = options.get("email")
user.is_staff = options.get("is_staff")
if is_read_only:
user.is_superuser = False
user.save()
group = models.Group.objects.get(name="read_only")
user.groups.add(group)
else:
user.is_superuser = True
user.save()
if options.get("verbosity") > 0:
self.stdout.write(f"User {username} created successfully.")
except KeyboardInterrupt:
self.stderr.write("\nOperation cancelled.")
sys.exit(1)
except exceptions.ValidationError as e:
raise CommandError("; ".join(e.messages))
except NotRunningInTTYException:
self.stdout.write(
"User creation skipped due to not running in a TTY. "
"You can run `manage.py createuser` in your project "
"to create one manually."
)
@cached_property
def username_is_unique(self):
"""
Check if username is unique.
"""
if self.username_field.unique:
return True
return any(
len(unique_constraint.fields) == 1
and unique_constraint.fields[0] == self.username_field.name
for unique_constraint in self.UserModel._meta.total_unique_constraints
)
def _validate_username(self, username, verbose_field_name, database):
"""
Validate username. If invalid, return a string error message.
"""
if self.username_is_unique:
try:
self.UserModel._default_manager.db_manager(database).get_by_natural_key(
username
)
except self.UserModel.DoesNotExist:
pass
else:
return f"Error: The {verbose_field_name} is already taken."
if not username:
return f"{capfirst(verbose_field_name)} cannot be blank."
try:
self.username_field.clean(username, None)
except exceptions.ValidationError as e:
return "; ".join(e.messages)