#!/opt/imh-python/bin/python3
"""Wrapper for /usr/local/cpanel/scripts/restorepkg"""
from dataclasses import dataclass
from argparse import ArgumentTypeError as BadArg
from typing import IO, Generator, Union
from cpapis import whmapi1, CpAPIError
from netaddr import IPAddress
sys.path.insert(0, '/opt/support/lib')
from arg_types import CPMOVE_RE
from server import MAIN_RESELLER, ROLE
if Path('/opt/sharedrads/hostsfilemods').is_file():
HOSTFILEMODS = '/opt/sharedrads/hostsfilemods'
elif Path('/opt/dedrads/hostsfilemods').is_file():
HOSTFILEMODS = '/opt/dedrads/hostsfilemods'
"""Type hint for get_args"""
newuser: Union[str, None]
ipaddr: Union[IPAddress, None]
package: Union[str, None]
parser = argparse.ArgumentParser(description=__doc__)
type=arg_types.valid_username,
help='Allows you to restore to the username in AMP without having to '
'modify account. Will be ignored if restoring a directory',
if not ROLE or ROLE == 'shared:reseller':
help=f'Set Ownership to a reseller. Defaults to {MAIN_RESELLER}',
help='Do not run fixperms after restoring',
help='Silence restorepkg output (it still gets logged)',
help='Print host file mod entries at the end for all restored users',
help='No Confirmation Prompts',
type=arg_types.ipaddress,
help='Set an IP address',
x.name for x in Path('/var/cpanel/packages').iterdir() if x.is_file()
help=f"Set a package type {packages!r}",
help='Path to the backup file or directory of backup files',
args = parser.parse_args()
sys.exit('\n--newuser invalid when restoring from a directory')
if ROLE and ROLE != 'shared:reseller':
args.owner = MAIN_RESELLER
args.log_dir = Path('/home/t1bin')
args.log_dir = Path('/var/log/t1bin')
def existing_reseller(user: str) -> str:
"""Argparse type: validate a user as existing with reseller permissions"""
raise BadArg('cannot be blank')
if not rads.is_cpuser(user):
raise BadArg(f'reseller {user} does not exist')
with open('/var/cpanel/resellers', encoding='ascii') as handle:
if line.startswith(f"{user}:"):
except FileNotFoundError:
print('/var/cpanel/resellers does not exist', file=sys.stderr)
raise BadArg(f"{user} not setup as a reseller")
def restorable_path(str_path: str) -> Path:
"""Argparse type: validates a path as either a cpmove file or a
return arg_types.cpmove_file_type(str_path)
path = arg_types.path_in_home(str_path)
"not a cPanel backup or directory in /home containing them"
if path == Path('/home'):
# it would work, but it's generally a bad idea
"invalid path; when restoring from a directory, "
"it must be a subdirectory of /home"
def log_print(handle: IO, msg: str, show: bool = True):
"""Writes to a log and prints to stdout"""
if not msg.endswith('\n'):
def set_owner(log_file: IO, user: str, owner: str) -> bool:
"""Change a user's owner"""
log_print(log_file, f'setting owner of {user} to {owner}')
whmapi1.set_owner(user, owner)
except CpAPIError as exc:
log_print(log_file, f"modifyacct failed: {exc}")
def set_ip(log_file: IO, user: str, ipaddr: Union[IPAddress, None]) -> bool:
log_print(log_file, f"setting IP of {user} to {ipaddr}")
whmapi1.setsiteip(user, str(ipaddr))
except CpAPIError as exc:
log_print(log_file, f"setsiteip failed: {exc}")
def set_package(log_file: IO, user: str, pkg: Union[str, None]) -> bool:
"""Set a user's cPanel package"""
log_print(log_file, f"setting package of {user} to {pkg}")
whmapi1.changepackage(user, pkg)
except CpAPIError as exc:
log_print(log_file, f"changepackage failed: {exc}")
log_file: IO, cpmove: Path, newuser: Union[str, None], print_logs: bool
cmd = ['/usr/local/cpanel/scripts/restorepkg', '--skipres']
cmd.extend(['--newuser', newuser])
log_print(log_file, line, print_logs)
if 'Account Restore Failed' in line:
log_print(log_file, f'restorepkg exit code was {proc.returncode}')
def restore_user(args: Args, cpmove: Path, user: str, log: Path) -> list[str]:
"""Restore a user (restorepkg + set owner/ip/package) and return a list of
task(s) that failed, if any"""
user = args.newuser or user
print(f'{args.owner}: You cannot set a reseller to own themselves')
print(user, 'already exists', file=sys.stderr)
print('Logging to:', log)
with log.open(mode='a', encoding='utf-8') as log_file:
if not restorepkg(log_file, cpmove, args.newuser, args.print_logs):
if not set_owner(log_file, user, args.owner):
failed.append(f'set owner to {args.owner}')
if not set_ip(log_file, user, args.ipaddr):
failed.append(f'set ip to {args.ipaddr}')
if not set_package(log_file, user, args.package):
failed.append(f'set package to {args.package}')
def iter_backups(path: Path) -> Generator[tuple[str, Path], None, None]:
"""Iterate over backups found in a directory"""
for entry in path.iterdir():
if match := CPMOVE_RE.match(entry.name):
yield match.group(1), entry
"""Wrapper around cPanel's restorepkg"""
user_fails: dict[str, list[str]] = {} # user: list of any tasks that failed
args.log_dir.mkdir(mode=770, exist_ok=True)
# restoring a folder of backups
backups: list[tuple[str, Path]] = list(iter_backups(args.path))
sys.exit(f'No backups in {args.path}')
print('The following backups will be restored:')
for user, path in backups:
print(user, path, sep=': ')
if not rads.prompt_y_n('Would you like to proceed?'):
for user, path in backups:
log = args.log_dir.joinpath(f"{user}.restore.log")
failed = restore_user(args, path, user, log)
for user, path in backups:
user_fails[user] = failed
# restoring from a single file
# it was already validated to pass this regex in get_args()
orig_user = CPMOVE_RE.match(args.path.name).group(1)
user = args.newuser if args.newuser else orig_user
log = args.log_dir.joinpath(f"{user}.restore.log")
user_fails[user] = restore_user(args, args.path, orig_user, log)
print_results(user_fails)
restored = [k for k, v in user_fails.items() if v != ['restorepkg']]
print_host_mods(restored)
def print_results(user_fails: dict[str, list[str]]):
"""Print results from each ``restore_user()``"""
print('== Restore Results ==')
for user, fails in user_fails.items():
print(user, 'failed', sep=': ', end=': ')
print(user, 'success', sep=': ')
def fixperms(restored: list[str]):
"""Runs fixperms on restored users"""
# fixperms all users in one run and only print errors
subprocess.call(['/usr/bin/fixperms', '--quiet'] + restored)
def print_host_mods(restored: list[str]):
"""Runs the command at ``HOSTFILEMODS``"""
print('Host file mod entries for all restored cPanel users:')
subprocess.call([HOSTFILEMODS, user])
if __name__ == "__main__":