Edit File by line
/home/barbar84/public_h.../wp-conte.../plugins/sujqvwi/AnonR/smanonr..../opt/tier2c
File: safe_restorepkg.py
#!/opt/imh-python/bin/python3
[0] Fix | Delete
"""Wrapper for /usr/local/cpanel/scripts/restorepkg"""
[1] Fix | Delete
import os
[2] Fix | Delete
import argparse
[3] Fix | Delete
from dataclasses import dataclass
[4] Fix | Delete
from argparse import ArgumentTypeError as BadArg
[5] Fix | Delete
from pathlib import Path
[6] Fix | Delete
import subprocess
[7] Fix | Delete
import sys
[8] Fix | Delete
import time
[9] Fix | Delete
from typing import IO, Generator, Union
[10] Fix | Delete
from cpapis import whmapi1, CpAPIError
[11] Fix | Delete
from cproc import Proc
[12] Fix | Delete
from netaddr import IPAddress
[13] Fix | Delete
import rads
[14] Fix | Delete
[15] Fix | Delete
sys.path.insert(0, '/opt/support/lib')
[16] Fix | Delete
import arg_types
[17] Fix | Delete
from arg_types import CPMOVE_RE
[18] Fix | Delete
from server import MAIN_RESELLER, ROLE
[19] Fix | Delete
[20] Fix | Delete
[21] Fix | Delete
if Path('/opt/sharedrads/hostsfilemods').is_file():
[22] Fix | Delete
HOSTFILEMODS = '/opt/sharedrads/hostsfilemods'
[23] Fix | Delete
elif Path('/opt/dedrads/hostsfilemods').is_file():
[24] Fix | Delete
HOSTFILEMODS = '/opt/dedrads/hostsfilemods'
[25] Fix | Delete
else:
[26] Fix | Delete
HOSTFILEMODS = None
[27] Fix | Delete
[28] Fix | Delete
[29] Fix | Delete
@dataclass
[30] Fix | Delete
class Args:
[31] Fix | Delete
"""Type hint for get_args"""
[32] Fix | Delete
[33] Fix | Delete
newuser: Union[str, None]
[34] Fix | Delete
owner: str
[35] Fix | Delete
quiet: bool
[36] Fix | Delete
yes: bool
[37] Fix | Delete
host_mods: bool
[38] Fix | Delete
ipaddr: Union[IPAddress, None]
[39] Fix | Delete
package: Union[str, None]
[40] Fix | Delete
fixperms: bool
[41] Fix | Delete
path: Path
[42] Fix | Delete
log_dir: Path
[43] Fix | Delete
[44] Fix | Delete
[45] Fix | Delete
def get_args() -> Args:
[46] Fix | Delete
"""Parse sys.argv"""
[47] Fix | Delete
parser = argparse.ArgumentParser(description=__doc__)
[48] Fix | Delete
parser.add_argument(
[49] Fix | Delete
'--newuser',
[50] Fix | Delete
dest='newuser',
[51] Fix | Delete
type=arg_types.valid_username,
[52] Fix | Delete
help='Allows you to restore to the username in AMP without having to '
[53] Fix | Delete
'modify account. Will be ignored if restoring a directory',
[54] Fix | Delete
)
[55] Fix | Delete
if not ROLE or ROLE == 'shared:reseller':
[56] Fix | Delete
parser.add_argument(
[57] Fix | Delete
'--owner',
[58] Fix | Delete
'-o',
[59] Fix | Delete
dest='owner',
[60] Fix | Delete
default=MAIN_RESELLER,
[61] Fix | Delete
type=existing_reseller,
[62] Fix | Delete
help=f'Set Ownership to a reseller. Defaults to {MAIN_RESELLER}',
[63] Fix | Delete
)
[64] Fix | Delete
parser.add_argument(
[65] Fix | Delete
'--no-fixperms',
[66] Fix | Delete
dest='fixperms',
[67] Fix | Delete
action='store_false',
[68] Fix | Delete
help='Do not run fixperms after restoring',
[69] Fix | Delete
)
[70] Fix | Delete
parser.add_argument(
[71] Fix | Delete
'--quiet',
[72] Fix | Delete
'-q',
[73] Fix | Delete
dest='print_logs',
[74] Fix | Delete
action='store_false',
[75] Fix | Delete
help='Silence restorepkg output (it still gets logged)',
[76] Fix | Delete
)
[77] Fix | Delete
if HOSTFILEMODS:
[78] Fix | Delete
parser.add_argument(
[79] Fix | Delete
'--host-mods',
[80] Fix | Delete
'-m',
[81] Fix | Delete
dest='host_mods',
[82] Fix | Delete
action='store_true',
[83] Fix | Delete
help='Print host file mod entries at the end for all restored users',
[84] Fix | Delete
)
[85] Fix | Delete
parser.add_argument(
[86] Fix | Delete
'--yes',
[87] Fix | Delete
'-y',
[88] Fix | Delete
action='store_true',
[89] Fix | Delete
dest='yes',
[90] Fix | Delete
help='No Confirmation Prompts',
[91] Fix | Delete
)
[92] Fix | Delete
parser.add_argument(
[93] Fix | Delete
'--ip',
[94] Fix | Delete
dest='ipaddr',
[95] Fix | Delete
type=arg_types.ipaddress,
[96] Fix | Delete
help='Set an IP address',
[97] Fix | Delete
)
[98] Fix | Delete
packages = {
[99] Fix | Delete
x.name for x in Path('/var/cpanel/packages').iterdir() if x.is_file()
[100] Fix | Delete
}
[101] Fix | Delete
parser.add_argument(
[102] Fix | Delete
'--pkg',
[103] Fix | Delete
'-p',
[104] Fix | Delete
'-P',
[105] Fix | Delete
metavar='PKG',
[106] Fix | Delete
dest='package',
[107] Fix | Delete
choices=packages,
[108] Fix | Delete
help=f"Set a package type {packages!r}",
[109] Fix | Delete
)
[110] Fix | Delete
parser.add_argument(
[111] Fix | Delete
'path',
[112] Fix | Delete
type=restorable_path,
[113] Fix | Delete
help='Path to the backup file or directory of backup files',
[114] Fix | Delete
)
[115] Fix | Delete
args = parser.parse_args()
[116] Fix | Delete
if args.path.is_dir():
[117] Fix | Delete
if args.newuser:
[118] Fix | Delete
parser.print_help()
[119] Fix | Delete
sys.exit('\n--newuser invalid when restoring from a directory')
[120] Fix | Delete
if ROLE and ROLE != 'shared:reseller':
[121] Fix | Delete
args.owner = MAIN_RESELLER
[122] Fix | Delete
if not HOSTFILEMODS:
[123] Fix | Delete
args.host_mods = False
[124] Fix | Delete
if ROLE:
[125] Fix | Delete
args.log_dir = Path('/home/t1bin')
[126] Fix | Delete
else: # v/ded
[127] Fix | Delete
args.log_dir = Path('/var/log/t1bin')
[128] Fix | Delete
return args
[129] Fix | Delete
[130] Fix | Delete
[131] Fix | Delete
def existing_reseller(user: str) -> str:
[132] Fix | Delete
"""Argparse type: validate a user as existing with reseller permissions"""
[133] Fix | Delete
if not user:
[134] Fix | Delete
raise BadArg('cannot be blank')
[135] Fix | Delete
if user == 'root':
[136] Fix | Delete
return user
[137] Fix | Delete
if not rads.is_cpuser(user):
[138] Fix | Delete
raise BadArg(f'reseller {user} does not exist')
[139] Fix | Delete
try:
[140] Fix | Delete
with open('/var/cpanel/resellers', encoding='ascii') as handle:
[141] Fix | Delete
for line in handle:
[142] Fix | Delete
if line.startswith(f"{user}:"):
[143] Fix | Delete
return user
[144] Fix | Delete
except FileNotFoundError:
[145] Fix | Delete
print('/var/cpanel/resellers does not exist', file=sys.stderr)
[146] Fix | Delete
raise BadArg(f"{user} not setup as a reseller")
[147] Fix | Delete
[148] Fix | Delete
[149] Fix | Delete
def restorable_path(str_path: str) -> Path:
[150] Fix | Delete
"""Argparse type: validates a path as either a cpmove file or a
[151] Fix | Delete
directory in /home"""
[152] Fix | Delete
try:
[153] Fix | Delete
return arg_types.cpmove_file_type(str_path)
[154] Fix | Delete
except BadArg:
[155] Fix | Delete
pass
[156] Fix | Delete
try:
[157] Fix | Delete
path = arg_types.path_in_home(str_path)
[158] Fix | Delete
except BadArg as exc:
[159] Fix | Delete
raise BadArg(
[160] Fix | Delete
"not a cPanel backup or directory in /home containing them"
[161] Fix | Delete
) from exc
[162] Fix | Delete
if path == Path('/home'):
[163] Fix | Delete
# it would work, but it's generally a bad idea
[164] Fix | Delete
raise BadArg(
[165] Fix | Delete
"invalid path; when restoring from a directory, "
[166] Fix | Delete
"it must be a subdirectory of /home"
[167] Fix | Delete
)
[168] Fix | Delete
return path
[169] Fix | Delete
[170] Fix | Delete
[171] Fix | Delete
def log_print(handle: IO, msg: str, show: bool = True):
[172] Fix | Delete
"""Writes to a log and prints to stdout"""
[173] Fix | Delete
if not msg.endswith('\n'):
[174] Fix | Delete
msg = f"{msg}\n"
[175] Fix | Delete
handle.write(msg)
[176] Fix | Delete
if show:
[177] Fix | Delete
print(msg, end='')
[178] Fix | Delete
[179] Fix | Delete
[180] Fix | Delete
def set_owner(log_file: IO, user: str, owner: str) -> bool:
[181] Fix | Delete
"""Change a user's owner"""
[182] Fix | Delete
if owner == 'root':
[183] Fix | Delete
return True
[184] Fix | Delete
log_print(log_file, f'setting owner of {user} to {owner}')
[185] Fix | Delete
try:
[186] Fix | Delete
whmapi1.set_owner(user, owner)
[187] Fix | Delete
except CpAPIError as exc:
[188] Fix | Delete
log_print(log_file, f"modifyacct failed: {exc}")
[189] Fix | Delete
return False
[190] Fix | Delete
return True
[191] Fix | Delete
[192] Fix | Delete
[193] Fix | Delete
def set_ip(log_file: IO, user: str, ipaddr: Union[IPAddress, None]) -> bool:
[194] Fix | Delete
"""Set a user's IP"""
[195] Fix | Delete
if not ipaddr:
[196] Fix | Delete
return True
[197] Fix | Delete
log_print(log_file, f"setting IP of {user} to {ipaddr}")
[198] Fix | Delete
try:
[199] Fix | Delete
whmapi1.setsiteip(user, str(ipaddr))
[200] Fix | Delete
except CpAPIError as exc:
[201] Fix | Delete
log_print(log_file, f"setsiteip failed: {exc}")
[202] Fix | Delete
return False
[203] Fix | Delete
return True
[204] Fix | Delete
[205] Fix | Delete
[206] Fix | Delete
def set_package(log_file: IO, user: str, pkg: Union[str, None]) -> bool:
[207] Fix | Delete
"""Set a user's cPanel package"""
[208] Fix | Delete
if not pkg:
[209] Fix | Delete
return True
[210] Fix | Delete
log_print(log_file, f"setting package of {user} to {pkg}")
[211] Fix | Delete
try:
[212] Fix | Delete
whmapi1.changepackage(user, pkg)
[213] Fix | Delete
except CpAPIError as exc:
[214] Fix | Delete
log_print(log_file, f"changepackage failed: {exc}")
[215] Fix | Delete
return False
[216] Fix | Delete
return True
[217] Fix | Delete
[218] Fix | Delete
[219] Fix | Delete
def restorepkg(
[220] Fix | Delete
log_file: IO, cpmove: Path, newuser: Union[str, None], print_logs: bool
[221] Fix | Delete
):
[222] Fix | Delete
"""Execute restorepkg"""
[223] Fix | Delete
cmd = ['/usr/local/cpanel/scripts/restorepkg', '--skipres']
[224] Fix | Delete
if newuser:
[225] Fix | Delete
cmd.extend(['--newuser', newuser])
[226] Fix | Delete
cmd.append(cpmove)
[227] Fix | Delete
success = True
[228] Fix | Delete
with Proc(
[229] Fix | Delete
cmd,
[230] Fix | Delete
lim=os.cpu_count(),
[231] Fix | Delete
encoding='utf-8',
[232] Fix | Delete
errors='replace',
[233] Fix | Delete
stdout=Proc.PIPE,
[234] Fix | Delete
stderr=Proc.STDOUT,
[235] Fix | Delete
) as proc:
[236] Fix | Delete
for line in proc.stdout:
[237] Fix | Delete
log_print(log_file, line, print_logs)
[238] Fix | Delete
if 'Account Restore Failed' in line:
[239] Fix | Delete
success = False
[240] Fix | Delete
log_file.write('\n')
[241] Fix | Delete
if proc.returncode != 0:
[242] Fix | Delete
log_print(log_file, f'restorepkg exit code was {proc.returncode}')
[243] Fix | Delete
success = False
[244] Fix | Delete
return success
[245] Fix | Delete
[246] Fix | Delete
[247] Fix | Delete
def restore_user(args: Args, cpmove: Path, user: str, log: Path) -> list[str]:
[248] Fix | Delete
"""Restore a user (restorepkg + set owner/ip/package) and return a list of
[249] Fix | Delete
task(s) that failed, if any"""
[250] Fix | Delete
user = args.newuser or user
[251] Fix | Delete
if args.owner == user:
[252] Fix | Delete
print(f'{args.owner}: You cannot set a reseller to own themselves')
[253] Fix | Delete
return ["restorepkg"]
[254] Fix | Delete
if rads.is_cpuser(user):
[255] Fix | Delete
print(user, 'already exists', file=sys.stderr)
[256] Fix | Delete
return ["restorepkg"]
[257] Fix | Delete
print('Logging to:', log)
[258] Fix | Delete
with log.open(mode='a', encoding='utf-8') as log_file:
[259] Fix | Delete
if not restorepkg(log_file, cpmove, args.newuser, args.print_logs):
[260] Fix | Delete
return ['restorepkg']
[261] Fix | Delete
failed: list[str] = []
[262] Fix | Delete
if not set_owner(log_file, user, args.owner):
[263] Fix | Delete
failed.append(f'set owner to {args.owner}')
[264] Fix | Delete
if not set_ip(log_file, user, args.ipaddr):
[265] Fix | Delete
failed.append(f'set ip to {args.ipaddr}')
[266] Fix | Delete
if not set_package(log_file, user, args.package):
[267] Fix | Delete
failed.append(f'set package to {args.package}')
[268] Fix | Delete
return failed
[269] Fix | Delete
[270] Fix | Delete
[271] Fix | Delete
def iter_backups(path: Path) -> Generator[tuple[str, Path], None, None]:
[272] Fix | Delete
"""Iterate over backups found in a directory"""
[273] Fix | Delete
for entry in path.iterdir():
[274] Fix | Delete
if match := CPMOVE_RE.match(entry.name):
[275] Fix | Delete
yield match.group(1), entry
[276] Fix | Delete
[277] Fix | Delete
[278] Fix | Delete
def main():
[279] Fix | Delete
"""Wrapper around cPanel's restorepkg"""
[280] Fix | Delete
args = get_args()
[281] Fix | Delete
user_fails: dict[str, list[str]] = {} # user: list of any tasks that failed
[282] Fix | Delete
args.log_dir.mkdir(mode=770, exist_ok=True)
[283] Fix | Delete
if args.path.is_dir():
[284] Fix | Delete
# restoring a folder of backups
[285] Fix | Delete
backups: list[tuple[str, Path]] = list(iter_backups(args.path))
[286] Fix | Delete
if not backups:
[287] Fix | Delete
sys.exit(f'No backups in {args.path}')
[288] Fix | Delete
print('The following backups will be restored:')
[289] Fix | Delete
for user, path in backups:
[290] Fix | Delete
print(user, path, sep=': ')
[291] Fix | Delete
if args.yes:
[292] Fix | Delete
time.sleep(3)
[293] Fix | Delete
else:
[294] Fix | Delete
if not rads.prompt_y_n('Would you like to proceed?'):
[295] Fix | Delete
sys.exit(0)
[296] Fix | Delete
for user, path in backups:
[297] Fix | Delete
log = args.log_dir.joinpath(f"{user}.restore.log")
[298] Fix | Delete
failed = restore_user(args, path, user, log)
[299] Fix | Delete
for user, path in backups:
[300] Fix | Delete
user_fails[user] = failed
[301] Fix | Delete
else:
[302] Fix | Delete
# restoring from a single file
[303] Fix | Delete
# it was already validated to pass this regex in get_args()
[304] Fix | Delete
orig_user = CPMOVE_RE.match(args.path.name).group(1)
[305] Fix | Delete
user = args.newuser if args.newuser else orig_user
[306] Fix | Delete
log = args.log_dir.joinpath(f"{user}.restore.log")
[307] Fix | Delete
user_fails[user] = restore_user(args, args.path, orig_user, log)
[308] Fix | Delete
print_results(user_fails)
[309] Fix | Delete
restored = [k for k, v in user_fails.items() if v != ['restorepkg']]
[310] Fix | Delete
if args.fixperms:
[311] Fix | Delete
fixperms(restored)
[312] Fix | Delete
if args.host_mods:
[313] Fix | Delete
print_host_mods(restored)
[314] Fix | Delete
[315] Fix | Delete
[316] Fix | Delete
def print_results(user_fails: dict[str, list[str]]):
[317] Fix | Delete
"""Print results from each ``restore_user()``"""
[318] Fix | Delete
print('== Restore Results ==')
[319] Fix | Delete
for user, fails in user_fails.items():
[320] Fix | Delete
if fails:
[321] Fix | Delete
print(user, 'failed', sep=': ', end=': ')
[322] Fix | Delete
print(*fails, sep=', ')
[323] Fix | Delete
else:
[324] Fix | Delete
print(user, 'success', sep=': ')
[325] Fix | Delete
[326] Fix | Delete
[327] Fix | Delete
def fixperms(restored: list[str]):
[328] Fix | Delete
"""Runs fixperms on restored users"""
[329] Fix | Delete
if not restored:
[330] Fix | Delete
return
[331] Fix | Delete
# fixperms all users in one run and only print errors
[332] Fix | Delete
subprocess.call(['/usr/bin/fixperms', '--quiet'] + restored)
[333] Fix | Delete
[334] Fix | Delete
[335] Fix | Delete
def print_host_mods(restored: list[str]):
[336] Fix | Delete
"""Runs the command at ``HOSTFILEMODS``"""
[337] Fix | Delete
print('Host file mod entries for all restored cPanel users:')
[338] Fix | Delete
for user in restored:
[339] Fix | Delete
subprocess.call([HOSTFILEMODS, user])
[340] Fix | Delete
[341] Fix | Delete
[342] Fix | Delete
if __name__ == "__main__":
[343] Fix | Delete
main()
[344] Fix | Delete
[345] Fix | Delete
It is recommended that you Edit text format, this type of Fix handles quite a lot in one request
Function