#!/opt/imh-python/bin/python3
Reseller audit script. Does the following:
1) Makes sure that all resellers are owned by 'inmotion' or 'hubhost'
2) Resets reseller ACL limits and IP pools
3) Checks for orphaned accounts (accounts that have a non-existent owner)
from collections import defaultdict
from cpapis import whmapi1, CpAPIError
APIPA = '169.254.100.100' # the old moveuser used this for reseller moves
HOST = platform.node().split('.')[0]
RESELLER = 'hubhost' if rads.IMH_CLASS == 'hub' else 'inmotion'
def parse_args() -> tuple[int, bool]:
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
args = parser.parse_args()
loglevel = getattr(logging, args.loglevel)
return loglevel, args.noop
def get_dips() -> dict[str, set[str]]:
"""Get a mapping of ipaddr -> resellers from /var/cpanel/dips"""
for res_path in Path('/var/cpanel/dips').iterdir():
res_ips = set(res_path.read_text('ascii').split())
dips[ipaddr].add(res_path.name)
except FileNotFoundError:
def check_double_ip_delegations(resellers: set[str], noop: bool):
"""Check for IPs which are assigned to more than one reseller"""
for ipaddr, resellers in get_dips().items()
auto_fix_double_dips(resellers, double_delegations, noop)
if not double_delegations:
logging.warning("Double-delegated IP addresses detected - sending ticket")
logging.debug('double delegations: %r', double_delegations)
"The following IP addresses were detected as being delegated to "
"more than one reseller and must be corrected:\n"
for ip_addr, res in double_delegations.items():
body = f"{body}\n{ip_addr}: {', '.join(res)}"
to_addr="str@imhadmin.net",
subject="Reseller IP delegation conflict",
def auto_fix_double_dips(
resellers: set[str], double_delegations: dict[str, set[str]], noop: bool
"""Attempt to automatically fix IP double-delegations by checking if the IP
is actually in use, and removing it from resellers which aren't using it"""
user_ips: dict[str, str] = yaml.load(
Path('/etc/userips').read_text('ascii'), rads.DumbYamlLoader
user_resellers: dict[str, str] = yaml.load(
Path('/etc/trueuserowners').read_text('ascii'), rads.DumbYamlLoader
k: k if k in resellers else v for k, v in user_resellers.items()
for ipaddr, res in double_delegations.copy().items():
if res.intersection(rads.OUR_RESELLERS):
# if there's a conflict involving one of our resellers, don't try
# collect resellers actually using the IP
{user_resellers[k] for k, v in user_ips.items() if v == ipaddr}
continue # legit conflict. don't auto-fix
# No one is using this IP. Take it away from all but one reseller.
# If this takes away any reseller's last IP, the next run of this
for remove in list(res)[1:]:
remove_dip(ipaddr, remove, double_delegations, noop)
# else one reseller is using it but it's delegated to multiple
remove_dip(ipaddr, remove, double_delegations, noop)
double_delegations: dict[str, set[str]],
"""Remove an IP from a reseller's pool to fix a double delegation"""
# make sure it wasn't their main. the calling function already checked that
# the reseller didn't have it assigned
main_ip = Path('/var/cpanel/mainips', reseller).read_text('ascii').strip()
logging.warning("removing %s from %s's IP pool", ipaddr, reseller)
pool = whmapi1.getresellerips(reseller)['ip']
# but the previous lookup had it?
logging.error("Could not remove %s from %s's IP pool", ipaddr, reseller)
whmapi1.setresellerips(reseller, pool, delegate=True)
except CpAPIError as exc:
"Could not remove %s from %s's IP pool: %s",
double_delegations[ipaddr].remove(reseller)
if len(double_delegations[ipaddr]) < 2:
double_delegations.pop(ipaddr)
class CpanelConf(configparser.ConfigParser):
"""Handles reading /var/cpanel/users and /var/cpanel/packages files"""
def __init__(self, path: Path):
super().__init__(allow_no_value=True, interpolation=None, strict=False)
self.read_string(f"[config]\n{path.read_text('utf-8')}")
logging.error('%s - %s: %s', path, type(exc).__name__, exc)
def user_conf(cls, user: str):
"""Read /var/cpanel/users/{user}"""
return cls(Path('/var/cpanel/users', user))
def pkg_conf(cls, pkg: str):
"""Read /var/cpanel/packages/{pkg}"""
return cls(Path('/var/cpanel/packages', pkg))
def res_limits(self) -> dict[str, str]:
"""Read imh custom reseller limits from a cPanel package
(use only with pkg_conf)"""
'enable_resource_limits',
'enable_overselling_bandwidth',
'enable_overselling_diskspace',
x: self.get('config', f'imh_{x}', fallback='') for x in imh_keys
def get_main_ips() -> set[str]:
"""Collect IPs from /var/cpanel/mainip and /var/cpanel/mainips/root"""
with open('/var/cpanel/mainip', encoding='ascii') as ip_file:
ips = set(ip_file.read().split())
with open('/var/cpanel/mainips/root', encoding='ascii') as ip_file:
ips.update(ip_file.read().split())
except FileNotFoundError:
"""Get an IP which is not already in use"""
with open('/etc/ipaddrpool', encoding='ascii') as pool:
# not assigned as dedicated, but may be in a reseller pool
unassigned = pool.read().split()
for ip_addr in unassigned:
if not assigned_to_res(ip_addr):
def assigned_to_res(ip_addr):
"""Determine if an IP is already delegated to a reseller"""
for entry in Path('/var/cpanel/dips').iterdir():
with entry.open('r', encoding='ascii') as dips:
if ip_addr in dips.read().split():
def non_res_checks(noop: bool):
"""Reseller-owner checks on non-reseller servers"""
for path in Path('/var/cpanel/users').iterdir():
logging.warning('%s exists. Skipping.', path)
if user in rads.OUR_RESELLERS:
whmapi1.set_owner(user, 'root')
except CpAPIError as exc:
"Error changing owner of %s to root: %s", user, exc
user_conf = CpanelConf.user_conf(user)
owner = user_conf.get('config', 'owner')
except configparser.NoOptionError:
'%s is missing OWNER and may not be a valid CPanel user file',
set_owner(user, owner, RESELLER, noop)
def get_resellers() -> set[str]:
"""Read resellers from /var/cpanel/resellers"""
with open('/var/cpanel/resellers', encoding='utf-8') as res_file:
if res := line.split(':', maxsplit=1)[0]:
loglevel, noop = parse_args()
logfmt = '%(asctime)s %(levelname)s NOOP %(message)s'
logfmt = '%(asctime)s %(levelname)s %(message)s'
path=None, loglevel=loglevel, fmt=logfmt, print_out=sys.stdout
if rads.IMH_ROLE != 'shared':
logging.critical("rads.IMH_CLASS=%r", rads.IMH_ROLE)
if 'res' in HOST and rads.IMH_CLASS != 'reseller':
"hostname=%r but rads.IMH_CLASS=%r", HOST, rads.IMH_CLASS
resellers = get_resellers()
all_res = resellers | set(rads.OUR_RESELLERS) | {"system", rads.SECURE_USER}
if rads.IMH_CLASS == 'reseller':
main_ips = get_main_ips()
for reseller in resellers:
res_checks(reseller, main_ips, noop)
orphan_storage = defaultdict(list)
term_fails = defaultdict(list)
for entry in Path("/var/cpanel/users").iterdir():
logging.warning("Removing erroneous file at %s", entry)
check_orphans(user, main_ips, orphan_storage, term_fails, noop)
for reseller, orphans in orphan_storage.items():
orphans_notify(reseller, orphans, noop)
for reseller, orphans in term_fails.items():
term_fail_notice(reseller, orphans, noop)
cleanup_delegations(all_res, noop)
check_double_ip_delegations(resellers, noop)
def cleanup_delegations(all_res: set[str], noop: bool):
"""Remove /var/cpanel/dips (ip delegation) files for deleted resellers"""
for entry in Path('/var/cpanel/dips').iterdir():
if entry.name not in all_res:
logging.debug('deleting %s', entry)
orphan_storage: defaultdict[list],
term_fails: defaultdict[list],
"""Find orphaned accounts (accounts that have no existing owner)"""
user_conf = CpanelConf.user_conf(user)
owner = user_conf.get('config', 'owner', fallback=None)
ip_address = user_conf.get('config', 'ip', fallback=None)
not Path('/var/cpanel/users', owner).exists()
or owner in rads.OUR_RESELLERS
# this is an orphaned account
susp_time = Path('/var/cpanel/suspended', user).stat().st_mtime
except FileNotFoundError:
# the orphaned account is not suspended
orphan_storage[owner].append(user)
# If the orphan is suspended for more than 14 days, terminate it
if time.time() - susp_time > 14 * 86400:
logging.info("Terminating suspended orphan user %s", user)
whmapi1.removeacct(user, keepdns=False)
except CpAPIError as exc:
logging.warning("Failed to terminate user %s: %s", user, exc)
term_fails[owner].append(user)
"Orphaned user %s has not been suspended long "
"enough for auto-terminate",
# This is a non-orphaned, child account.
# While we're here, make sure the user's IP is correct.
if not ip_address or ip_address in main_ips:
# Assign the user their owner's IP
set_child_owner_ip(user, owner, noop)
def orphans_notify(reseller: str, orphans: list[str], noop: bool) -> None:
"""Notify for unsuspended orphan accounts"""
'%s orphaned accounts exist under the reseller %s. Sending STR.',
logging.debug('Orphans under %s: %r', reseller, orphans)
The following orphan accounts have been located under owner {reseller}:
They appear to have an owner that does not exist, or is a reseller missing
reseller privileges. If the orphan's owner exists in PowerPanel, please set
their owner to 'inmotion' or 'hubhost' as appropriate. If the orphan's owner is
a reseller, add reseller privileges. If the orphan account does not exist,
please suspend them on the server with the command
"for orphan in {' '.join(orphans)}; do suspend_user $orphan -r orphan; done"
to_addr="str@imhadmin.net",
subject=f"Orphan accounts on {HOST} with owner {reseller}",
def term_fail_notice(reseller: str, orphans: list[str], noop: bool) -> None:
"""Separate notification for orphans that failed to auto-term, because
suspending them again won't fix the problem"""
"%s orphaned accounts failed to auto-terminate under the reseller %s. "
logging.debug("terms failed for %r", orphans)
The following orphan accounts were found under owner {reseller} and were
suspended long enough to auto-terminate, but auto-termination failed:
Please investigate and if appropriate, run removeacct on the orphan accounts.
to_addr="str@imhadmin.net",
subject=f"Failed to auto-term orphans on {HOST} with owner {reseller}",
def set_child_owner_ip(user: str, owner: str, noop: bool) -> None:
"""Assign the user their owner's IP"""
owner_conf = CpanelConf.user_conf(owner)
owner_ipaddr = owner_conf.get('config', 'ip')
"User %s has shared IP, but couldn't determine the IP of "
"the owner %s to assign it to the child account",
"User %s has shared IP. Changing to owner %s's IP of %s",
whmapi1.setsiteip(user, owner_ipaddr)
except CpAPIError as exc:
"Error changing IP of %s to %s: %s", user, owner_ipaddr, exc
def set_owner(user: str, old: str, new: str, noop: bool):
"""Change user owner and log"""
logging.info("Changing ownership of %s from %s to %s", user, old, new)
whmapi1.set_owner(user, new)
except CpAPIError as exc:
"Error changing ownership of %s to %s: %s", user, new, exc