#! /opt/imh-python/bin/python3
""" Database functions for working with CMS. """
from rads import prompt_y_n
from cms_tools.helpers import (
from cms_tools.cms import CMSStatus, CMSError
LOGGER = logging.getLogger(__name__)
def import_cms_db(the_cms, database_file):
Import database, using known credentials.
Also, make backup if necessary.
"Attempting to import %s into %s", database_file, the_cms.db_name
database = the_cms.db_name
if database not in the_cms.modified_dbs:
CMSStatus.critical, "Unable to backup %s" % the_cms.db_name
raise CMSError("Unable to backup %s" % the_cms.db_name)
LOGGER.info("Modifying database %s", database)
the_cms.modified_dbs[database] = dump_file
the_cms.db_user, the_cms.db_pass, the_cms.db_name, database_file
def test_db_connection(the_cms):
Test the database connection and return errors
LOGGER.debug("Testing db connection")
password=the_cms.db_pass,
database=the_cms.db_name,
with conn.cursor() as cursor:
if cursor.execute("SHOW TABLES") == 0:
return None # No tables, but connected
except pymysql.Error as err:
LOGGER.debug("Connection error")
def simple_query(the_cms, field, table, search_field='', search_pattern=''):
Search database returning specific field with optional search parameters
if the_cms.status < CMSStatus.db_has_matching_tables:
"Database %s has not yet been confrmed working, "
LOGGER.warning("Search field given, but no pattern given")
# MySQL identifiers can't be escaped by execute() like literals can
prefix = the_cms.db_pref.replace('`', '``')
escaped_table = f"`{prefix}{table.replace('`', '``')}`"
escaped_field = f"`{field.replace('`', '``')}`"
query = f"SELECT {escaped_field} FROM {escaped_table};"
escaped_search = f"`{search_field.replace('`', '``')}`"
f"SELECT {escaped_field} FROM {escaped_table} "
f"WHERE {escaped_search} LIKE %s"
password=the_cms.db_pass,
database=the_cms.db_name,
with conn.cursor() as cursor:
result = cursor.execute(query, args)
if result < 1: # No tables, but connected
return cursor.fetchall()[0]
except pymysql.Error as err:
def check_db_auth(the_cms):
Check whether the database user and password is correct
# First, see whether the name uses a valid format
"%s_[a-z0-9]{1,%d}" % (the_cms.dbprefix, 15 - len(the_cms.dbprefix)),
LOGGER.info("Database username '%s' is not correct", the_cms.db_user)
new_name = make_valid_db_name(
the_cms.cpuser, the_cms.dbprefix, the_cms.db_user, name_type="user"
# If we did not get a new name, allow the user to make one
new_name = common_get_string(
"What new name would you like? ",
% (the_cms.dbprefix, 15 - len(the_cms.dbprefix)),
if not the_cms.set_variable('db_user', new_name):
the_cms.db_user = the_cms.get_variable('db_user')
LOGGER.info("Username set to %s", the_cms.db_user)
LOGGER.error("Username '%s' not reset!", the_cms.db_user)
# We got a new name. Prompt to set it
if the_cms.ilevel < 1 or prompt_y_n("Set name to %s? " % new_name):
if not the_cms.set_variable('db_user', new_name):
the_cms.db_user = the_cms.get_variable('db_user')
LOGGER.info("Username set to %s", the_cms.db_user)
new_name = common_get_string(
"Use what database user name: ", 'database'
if not the_cms.set_variable('db_user', new_name):
the_cms.db_user = the_cms.get_variable('db_user')
LOGGER.info("Username set to %s", the_cms.db_user)
# Username is a valid format
# Does it exist and work?
# We can just check whether it exists, and if not, create it
if not db_user_exists(the_cms.cpuser, the_cms.db_user):
if the_cms.ilevel < 1 or prompt_y_n(
"Database user '%s' does not exist. Create it?" % the_cms.db_user
create_db_user(the_cms.cpuser, the_cms.db_user, the_cms.db_pass)
# Check just in case it's not really added
if not db_user_exists(the_cms.cpuser, the_cms.db_user):
"Failed to create database user '%s'" % the_cms.db_user,
CMSStatus.error, "Could not create %s" % the_cms.db_user
# So, the db user exists. Does the pw match?
result = test_db_connection(the_cms)
LOGGER.debug("Authorization fixed")
if the_cms.ilevel < 1 or prompt_y_n(
"Password for user '%s' doesn't match. Reset it?" % the_cms.db_user
the_cms.cpuser, the_cms.db_user, the_cms.db_pass
LOGGER.error("Could reset password for %s.", the_cms.db_user)
LOGGER.error("Could not fix password for %s.", the_cms.db_user)
# The user isn't associated, but this confirms auth worked
# Some other error, so we'll assume this is not the issue
(errno, sterror) = result
"Database connection failing. "
"Receiving error:\n(%d): %s",
def check_db_access(the_cms):
Check whether the database exists and the user has privileges
# First, see whether the name uses a valid format
"%s_[a-z0-9]{1,%d}" % (the_cms.dbprefix, 15 - len(the_cms.dbprefix)),
LOGGER.info("Database name '%s' is not correct", the_cms.db_name)
new_name = make_valid_db_name(
the_cms.cpuser, the_cms.dbprefix, the_cms.db_name
# If we did not get a new name, allow the user to make one
new_name = common_get_string(
"What new database name would you like? ",
% (the_cms.dbprefix, 15 - len(the_cms.dbprefix)),
if not the_cms.set_variable('db_name', new_name):
the_cms.db_name = the_cms.get_variable('db_name')
LOGGER.info("Database name set to %s", the_cms.db_name)
the_cms.set_status(CMSStatus.error, "Database name not correct")
# We got a new name. Prompt to set it
if the_cms.ilevel < 1 or prompt_y_n("Set name to %s?" % new_name):
if not the_cms.set_variable('db_name', new_name):
the_cms.db_name = the_cms.get_variable('db_name')
LOGGER.info("Database name set to %s", the_cms.db_name)
new_name = common_get_string(
"Use what database name: ", 'database'
if not the_cms.set_variable('db_name', new_name):
the_cms.db_name = the_cms.get_variable('db_name')
LOGGER.info("Database name set to %s", the_cms.db_name)
# Database name is a valid format
if not db_exists(the_cms.cpuser, the_cms.db_name):
if the_cms.ilevel < 1 or prompt_y_n(
"Database '%s' does not exist. Create it?" % the_cms.db_name
create_db(the_cms.cpuser, the_cms.db_name)
if not db_exists(the_cms.cpuser, the_cms.db_name):
"Failed to create database '%s'" % the_cms.db_name,
the_cms.set_status(CMSStatus.error, "Database could not be created")
# Did adding the database fix the problem?
result = test_db_connection(the_cms)
LOGGER.debug("Database connection fixed.")
errno, sterror = get_mysql_err(result)
if errno not in (1044, 1049):
# Not certain, but not the same error, so pretend that it did.
"Still could not connect to '%s'. New error: %d: %s",
"The database exists, but the user cannot access the database."
# If we've made it here, we can assign the user
if the_cms.ilevel < 1 or prompt_y_n(
"Associate database user '%s' with database '%s'?"
% (the_cms.db_user, the_cms.db_name)
associate_db_user(the_cms.cpuser, the_cms.db_name, the_cms.db_user)
CMSStatus.error, "Could not associate database user."
LOGGER.warning("Could not associate database user.")
def check_db_error(the_cms):
Check for database connection errors.
Return None if no error or number if there was an errror
# Make sure everything was set up first
if the_cms.status < CMSStatus.db_is_set:
"Database credentials haven't been set. Last status: %s",
# Make sure that we're checking the local db first
if 'localhost' != the_cms.db_host:
LOGGER.info("Databse host is set to '%s'.", the_cms.db_host)
if the_cms.ilevel < 1 or prompt_y_n("Set database host to localhost?"):
if not the_cms.set_variable('db_host', "localhost"):
the_cms.db_host = the_cms.get_variable('db_host')
LOGGER.debug("Database host has been fixed")
result = test_db_connection(the_cms)
LOGGER.debug("Database connection working")
return get_mysql_err(result)[0]
def fix_db_error(the_cms, error_number):
Check whether the database connection is working
LOGGER.info("There was a database error for %s", the_cms.db_name)
LOGGER.info("The username or password is incorrect")
return check_db_auth(the_cms)
if error_number in (1044, 1049):
LOGGER.info("The user cannot access the database")
return check_db_access(the_cms)
"MySQL server has gone away. May need to be researched manually"
LOGGER.error("Unknown error.")
LOGGER.error(error_number)
Check whether the database connection is working
# Make sure everything was set up first
if the_cms.status < CMSStatus.db_is_set:
"Database credentials haven't been set. Last status: %s",
# Make sure that we're checking the local db first
if 'localhost' != the_cms.db_host:
LOGGER.info("Databse host is set to '%s'.", the_cms.db_host)
if the_cms.ilevel < 1 or prompt_y_n("Set database host to localhost?"):
if not the_cms.set_variable('db_host', "localhost"):
the_cms.db_host = the_cms.get_variable('db_host')
LOGGER.debug("Database host has been fixed")
db_error = check_db_error(the_cms)
while None is not db_error:
if not fix_db_error(the_cms, db_error):
LOGGER.info("Could not resolve database error %s", db_error)
LOGGER.error("Too many database errors. Giving up.")
db_error = check_db_error(the_cms)
CMSStatus.db_is_connecting, "Database confirmed connected"
def check_db_data(the_cms):
Check database to ensure that is not empty, and that it has tables
matching the prefix. If not, attempt to import.
if the_cms.status < CMSStatus.db_is_connecting:
if not check_db(the_cms):
"Database has not been confirmed to connect. "
"Cannot check database data."
password=the_cms.db_pass,
database=the_cms.db_name,
with conn.cursor() as cursor:
if cursor.execute("SHOW TABLES") == 0:
LOGGER.info("No tables in '%s'", the_cms.db_name)
return fix_empty_db(the_cms)
CMSStatus.db_has_tables, "Database has tables"
"SHOW TABLES LIKE %s%%", (the_cms.db_pref,)
"Database '%s' has tables, "
"but none matching the '%s' prefix.",
return fix_empty_db(the_cms)
except pymysql.Error as err:
raise CMSError(f"Database query error: {err}") from err