#!/opt/imh-python/bin/python3
"""Grand Unified Disk Scanner.
For More Information and Usage:
http://wiki.inmotionhosting.com/index.php?title=RADS#disk_cleanup.py"""
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from cpapis import whmapi1, CpAPIError
from guds_modules.change import run_disk_change
from guds_modules.aux import run_aux
from guds_modules.base import ModuleBase
from guds_modules.cli_args import get_args
__version__ = 'v1.0 Grand Unified Disk Scanner'
__author__ = 'SeanC, MadeleineF'
TOPDIR = Path(__file__).parent.resolve()
DELETER_PATH = TOPDIR / "guds_modules/deleters"
NOTIFIER_PATH = TOPDIR / "guds_modules/notifiers"
USER_TIMEOUT = int(timedelta(days=30).total_seconds()) # 30 days in seconds
SPAM_TIMER_LIST = "/var/log/guds_timer"
"""Automates a combination of techniques used to reclaim disk space
delete: dict[str, type[ModuleBase]],
note: dict[str, type[ModuleBase]],
"""Initialize the DiskCleaner Object"""
self.logger = logging.getLogger('disk_cleanup.py')
loglevel=args['loglevel'],
print_out=args['output'],
# Flag to toggle deletion/notification of users on run
self.dry_run: bool = args['dry_run']
# Command to run {delete,note,aux,change}
self.command: str = args['command']
# Modules to run on command guds_modules{deleters,notifiers}
# List of cPanel users to run cleaners on
self.users = rads.all_cpusers()
# Number of days to look back for 'change' command
# parallel for delete and note
self.threads = 1 # changed later
# establish command specific cleaner object attriutes
if self.command == 'change':
elif self.command in ('delete', 'note'):
self.threads = args['threads']
if not len(modules) == 0:
# initialize Module objects
for name, mod in delete.items():
delete[name] = mod(self.dry_run, self.logger)
for name, mod in note.items():
note[name] = mod(self.dry_run, self.logger)
self.modules = {'delete': delete, 'note': note}
self.logger.warning('action=main warning=no modules selected')
'Please select modules with ',
f'`disk_cleanup.py {self.command}` as shown above.',
# Timeout list containing users who have already been notified
def add_timeout_list(self, reason, user):
"""Format user information and timestamp for the timeout list"""
if user in self.timeout_list:
self.timeout_list[user].update({reason: int(time.time())})
self.timeout_list[user] = {reason: int(time.time())}
'user=%s action=add_timeout_list timeout=%s', user, reason
self.write_timeout_list()
def load_timeout_list(self, target_file):
"""Returns timeout list from specified file in dict format
:param target_file: - file to read timeout data from"""
# timeout list open (re-create if invalid or missing)
with open(target_file, encoding='ascii') as timeoutlist:
self.timeout_list: dict = yaml.load(
timeoutlist, yaml.SafeLoader
assert isinstance(self.timeout_list, dict)
self.logger.debug('timeout_list=%s', self.timeout_list)
except (AssertionError, OSError):
self.logger.error('error=invalid timeout list')
with open(target_file, 'w', encoding='ascii') as outfile:
yaml.dump({}, outfile, indent=4)
self.logger.info('new empty timeout list created')
self.logger.debug('timeout_list=%s', self.timeout_list)
# timeout list refresh (remove people who are on longer on timeout)
for user, data in list(self.timeout_list.items()):
self.timeout_list[user] = {
for cleaner, timer in data.items()
if int(time.time()) - timer < USER_TIMEOUT
if self.timeout_list[user] == {}:
del self.timeout_list[user]
# write refreshed timeout list to target_file
with open(target_file, 'w', encoding='ascii') as outfile:
yaml.dump(self.timeout_list, outfile, indent=4)
'action=load_timeout_list status=/var/log/guds_timer '
def iter_mods(path: Path):
"""Yield names of modules in a directory"""
for entry in path.iterdir():
if entry.name.endswith('.py') and not entry.name.startswith('_'):
def load_submodules() -> tuple[
dict[str, type[ModuleBase]], dict[str, type[ModuleBase]]
"""Import submodules. Submodules are added to available arguments"""
# Gather and Import Deleter Mods
deleter_mod_names = list(DiskCleaner.iter_mods(DELETER_PATH))
guds_d_modules = __import__(
'guds_modules.deleters', globals(), locals(), deleter_mod_names, 0
for mod_name in deleter_mod_names:
deleters[mod_name] = getattr(guds_d_modules, mod_name).Module
# Gather and Import Notifier Mods
notifier_mod_names = list(DiskCleaner.iter_mods(NOTIFIER_PATH))
guds_n_modules = __import__(
'guds_modules.notifiers', globals(), locals(), notifier_mod_names, 0
for mod_name in notifier_mod_names:
notifiers[mod_name] = getattr(guds_n_modules, mod_name).Module
return deleters, notifiers
def notify_user(self, msgpack: dict, user: str):
"""Unpack the message and stuff it into the pp_api to notify the user"""
if rads.IMH_CLASS == 'reseller':
resellers = whmapi1.listresellers()
except CpAPIError as exc:
if user not in resellers:
user = rads.get_owner(user)
if user not in resellers:
'user=%s action=notify_user status=unable to '
sys.exit('Unable to determine owner of that user')
pp_connect = pp_api.PowerPanel()
# Unpack message into the power panel email data
results = pp_connect.call(
"notification.send", cpanelUser=user, **msgpack
if not hasattr(results, 'status') or results.status != 0:
'user=%s action=notify_user status=pp api failed unexpectedly'
self.logger.info('user=%s action=notify_user status=OK', user)
def run(self, modules: list[str]):
"""DiskCleaner object main flow control function"""
'action=run command=%s modules=%s', self.command, modules
if self.command == 'aux':
elif self.command == 'change':
self.load_timeout_list(SPAM_TIMER_LIST)
modules = self.modules[self.command]
print(f"{self.command} running in {self.threads} threads", end=': ')
print(*modules, sep=', ')
with ThreadPoolExecutor(self.threads) as pool:
if not rads.cpuser_safe(user):
'user=%s action=cpuser_safe status=restricted user',
homedir = rads.get_homedir(user)
except rads.CpuserError as exc:
'user=%s action=get_homedir status=%s', user, exc
mod = self.modules[self.command][cleaner]
self.mod_thread, mod, cleaner, user, homedir
futures[future] = (user, cleaner)
for future in as_completed(futures):
user, cleaner = futures[future]
notify = future.result() # module exceptions are raised here
if not notify: # empty dict means no email
if user in self.timeout_list:
if cleaner not in self.timeout_list[user]:
self.notify_user(notify, user)
self.add_timeout_list(cleaner, user)
self.notify_user(notify, user)
self.add_timeout_list(cleaner, user)
self, mod: ModuleBase, cleaner: str, user: str, homedir: str
self.logger.debug('user=%s action=module:%s status=run', user, cleaner)
notify = mod.run_module(homedir)
# if this assertion fails, this is a bug. A subclass of ModuleBase
# returned the wrong data in run_module
assert isinstance(notify, dict)
def write_timeout_list(self):
"""Write contents of self.timeout_list() to SPAM_TIMER_LIST"""
'action=write_timeout_list update data=%s', self.timeout_list
with open(SPAM_TIMER_LIST, 'w', encoding='ascii') as outfile:
yaml.dump(self.timeout_list, outfile, indent=4)
self.logger.info('action=write_timeout_list update status=ok')
self.logger.error('action=write_timeout_list update error=%s', e)
"""Main function: get args"""
delete, note = DiskCleaner.load_submodules()
args, modules = get_args(delete, note)
cleaner = DiskCleaner(args, modules, delete, note)
if __name__ == "__main__":
assert rads.IMH_ROLE == 'shared'
except KeyboardInterrupt: