#!/opt/imh-python/bin/python3
"""Disk Move Generator - generates disk move tickets
according to arguments and exclusions"""
from operator import itemgetter
from platform import node
from concurrent.futures import ThreadPoolExecutor, as_completed
from tabulate import tabulate
EXCLUSION_LIST = Path('/var/log/disk_exclude')
TIMER = 30 # days before a user is removed from the exclusion list
parser = argparse.ArgumentParser(description=__doc__)
group = parser.add_mutually_exclusive_group()
help="Add user to exclusion list. Entries survive 30 days",
help="Minimum size of account to migrate in GB, default 8",
'-x', '--max', type=int, help="Maximum size of account to migrate in GB"
help="Lists several eligible accounts whose size totals up to X GB",
help="List of users to exclude alongside exclusion list",
help="Do not use exclusion list",
help="Print list of eligible accounts",
help="Email eligible accounts to the disk moves queue",
args = parser.parse_args()
# one of l, d, or a must be picked
if args.ticket is False and args.listaccount is False and not args.add_user:
"--ticket (-d), --listaccount (-l),",
"or --add (-a) [user] is required",
"""Parse /etc/trueuserowners"""
with open('/etc/trueuserowners', encoding='utf-8') as handle:
user_owners = yaml.load(handle, rads.DumbYamlLoader)
# If user to be added to exclusion list
# and list or email also selected
if args.listaccount or args.ticket:
print("Adding to exclusion list first...")
for user in args.add_user:
if not rads.is_cpuser(user):
print(f"{user} is not a valid cpanel user")
args.add_user.remove(user)
add_excluded_users(args.add_user)
if args.listaccount or args.ticket:
# collect a list of lists containing the eligible users
accounts = collect_accounts(
args.min, args.max, args.total, args.noexclude, args.exc_user
def get_exclusion_list() -> dict:
'''Read from the exclusion list and return it as a dict'''
with open(EXCLUSION_LIST, encoding='ascii') as exclusionlist:
data: dict = yaml.load(exclusionlist)
if not isinstance(data, dict):
print("Error in exclusion list, rebuilding")
write_exclusion_list(data)
except (yaml.YAMLError, OSError) as exc:
print(type(exc).__name__, exc, sep=': ')
print('Recreating', EXCLUSION_LIST)
write_exclusion_list(data)
def add_excluded_users(users: list[str]):
'''Format user information and timestamp for the exclusion list'''
exclusion_list = get_exclusion_list()
exclusion_list[user] = arrow.now().int_timestamp
write_exclusion_list(exclusion_list)
print(f"{user} added to exclusion list")
def write_exclusion_list(exclusion_list: dict[str, int]) -> None:
'''Write to the exclusion list'''
with open(EXCLUSION_LIST, 'w', encoding='ascii') as outfile:
yaml.dump(exclusion_list, outfile, indent=4)
def refresh_exclusion_list() -> None:
'''If a timeout has expired, remove the user'''
timeouts = get_exclusion_list()
if arrow.now().int_timestamp - timeouts[user] < int(
datetime.timedelta(days=TIMER).total_seconds()
new_dict[user] = timeouts[user]
write_exclusion_list(new_dict)
exclusion_list: list[str],
) -> tuple[str, Union[float, None]]:
'''Run the user through the first gamut to determine if
# knock out ineligible accounts
if rads.cpuser_safe(user):
if Path('/var/cpanel/suspended', user).is_file():
if not noexclude and user in exclusion_list:
size_gb: float = rads.QuotaCtl().getquota(user) / 2**30
# check for eligibility based on size
if max_size and size_gb > max_size:
# whatever's left after that, add to accounts list
except KeyboardInterrupt:
# drop child proc if killed
) -> list[tuple[str, str, float]]:
'''Get a list of users, and then eliminate them based on suspension
status, size, and eligibility based on options provided'''
# initializing everything
exclusion_list = list(get_exclusion_list().keys())
print(f"Skipping exception file - {type(exc).__name__}: {exc}")
exclusion_list += exclude
# create child processes to run through the eligibility checks
exclusion_list=exclusion_list,
user_owners = get_user_owners()
with ThreadPoolExecutor(max_workers=4) as pool:
for user, owner in user_owners.items():
jobs.append(pool.submit(initial_disqualify, user, **kwargs))
for future in as_completed(jobs):
user, size_gb = future.result()
owner = user_owners[user]
accounts.append((user, owner, size_gb))
except KeyboardInterrupt:
print("Caught KeyboardInterrupt.")
pool.shutdown(wait=False, cancel_futures=True)
# if anything survived those criteria...
accounts.sort(key=itemgetter(2), reverse=True) # sort by size, descending
# get a list of accounts of size > total
if len(eligible_accounts) < 3 or size_total < total_gb:
eligible_accounts.append(account)
accounts = eligible_accounts
final_list = accounts[:25]
def list_accounts(accounts):
'''Print the list of eligible accounts in a pretty table'''
print("No accounts match criteria.")
headers=["User", "Owner", "Size (GB)"],
if total := sum(x[2] for x in accounts):
print(f"Total size of matching accounts: {total:.2f} GB")
def email_accounts(accounts: list[tuple[str, str, float]]):
'''Send an email for each user in accounts'''
server = node().split(".")[0]
for user, _, size_gb in accounts:
mail_disk_move(user, server, size_gb)
add_excluded_users(exclude)
def mail_disk_move(username: str, server: str, size_gb: float):
'''Sends email to the disk moves queue'''
to_addr = "moves@imhadmin.net"
subject = f"DISK MOVE: {username} @ {server}"
A new server disk move is required for {username} @ {server}
Move Username: {username}
Account Size: {size_gb} GiB
Please review the account to determine if they are eligible for a migration:
* hasn't been moved recently (e.g. in the past year/no current move scheduled)
* is not storing ToS content
* is not large to the point of absurdity
* other reasons left to the discretion of the administrator
If the account is not eligible for a migration, please add them to the
exception list to prevent further tickets being generated for their account:
move_generator.py -a {username}
If the account is not eligible for reasons of ToS content, please respond
to this ticket with the relevant information and leave it open to address
further. For convenience, you may also update the subject line with the
date on which this should be addressed again and/or notice count.
if rads.send_email(to_addr, subject, body):
print(f"Disk move tickets sent for {username}.")
"Sending of disk_move ticket failed. You can use the -l option to",
"view eligible accounts to move",
if __name__ == "__main__":