"""Clamscan and freshclam classes"""
from contextlib import ExitStack
from dataclasses import dataclass
from typing import IO, Union
from subprocess import Popen, CalledProcessError, TimeoutExpired
from subprocess import run, DEVNULL, PIPE
from datetime import timedelta
IS_ROOT = os.getuid() == 0
DEFS_SERVER = 'https://repo.imhadmin.net/open/shellscan/v3/'
DEF_FILES = ('imh.yara',)
HEUR_FILES = ('heuristic.yara',)
HEUR_DIR = '/opt/imh-scan/sigs/heuri/'
DEFS_DIR = '/opt/imh-scan/sigs/yara/'
FRESH_CACHE = '/opt/imh-scan/sigs/last_freshclam'
DUMMY = '/opt/imh-scan/sigs/new/new.yara'
COMMAND_TIMEOUT = int(timedelta(weeks=1).total_seconds())
FOUND_RE = re.compile(r'(.*)\: (.*) FOUND$', re.DOTALL)
HEURISTIC_RE = re.compile(r'(?:YARA.)?[Hh]euristic')
CUR_USER_HOME = Path(pwd.getpwnam(getpass.getuser()).pw_dir)
HOME_RE = re.compile(r'(/home[0-9]?/[a-zA-Z0-9]{1,16})/')
hits_found: dict[Path, str]
heur_found: dict[Path, str]
def __dict__(self) -> dict:
'hits_found': {str(k): v for k, v in self.hits_found.items()},
'heur_found': {str(k): v for k, v in self.heur_found.items()},
def all_found(self) -> dict[Path, str]:
return self.heur_found | self.hits_found
class ClamParser(threading.Thread):
"""Thread for parsing clamscan stdout"""
super().__init__(target=self.parse, daemon=True)
self._scanning_summary = False
self._print_items = print_items
self.heur_found: dict[Path, str] = {}
self.hits_found: dict[Path, str] = {}
self.summary_lines: list[str] = []
self.open_logfiles = open_logfiles
for line in self._iter_found():
if self._scanning_summary:
self.summary_lines.append(line)
for line in self.proc.stdout:
for log_file in self.open_logfiles:
if self._scanning_summary:
if line.endswith('SCAN SUMMARY -----------\n'):
self._scanning_summary = True
if line.endswith('FOUND\n'):
yield f"{prev}{line}".rstrip()
def _handle_found(self, data: str):
match = FOUND_RE.match(data)
self.scanner.log.error("imh-scan bug: Regex failed on %r", data)
path_str, rule = match.groups()
self.scanner.log.error('imh-scan bug? %r is not a file', path)
if is_heur := bool(HEURISTIC_RE.search(rule)):
self.heur_found[path] = rule
self.hits_found[path] = rule
self._print_found(path, rule, is_heur)
def _print_found(self, path: str, rule: str, is_heur: bool) -> None:
if not self._print_items:
color = c.yellow if is_heur else c.red
shlex.quote(str(path)), color(f"{rule} FOUND"), sep=': ', flush=True
"""Handles executing clamscan and freshclam"""
"""Initializes variables"""
self.log = rads.setup_logging(
loglevel=logging.DEBUG if verbose else logging.INFO,
self.cmd_paths = self._check_deps(
disable_maldetect=disable_maldetect,
disable_freshclam=disable_freshclam,
disable_default=disable_default,
self.command = self._make_command(
disable_media=disable_media,
disable_new_yara=disable_new_yara,
disable_excludes=disable_excludes,
disable_maldetect=disable_maldetect,
disable_default=disable_default,
"""error handling and installing of dependencies vps can use any clamav
packages installing only supports imh centos repo"""
shared_permit = os.getuid() == 0 and not rads.IMH_ROLE == 'shared'
clamdb_need = not disable_default
freshclam_need = not disable_default and not disable_freshclam
maldet_need = not disable_maldetect
'/opt/imh-clamav/usr/bin/clamscan',
'/usr/local/cpanel/3rdparty/bin/clamscan',
'paths': ['/opt/imh-clamav/var/clamav', '/var/clamav'],
'/opt/imh-clamav/usr/bin/freshclam',
'/usr/local/cpanel/3rdparty/bin/freshclam',
'/opt/imh-clamav/etc/freshclam.conf',
'paths': ['/opt/maldetect/sigs'],
for dep, opts in deps_d.items():
found_path = _find_binary(opts['paths'])
self.log.debug('found path %s', found_path)
self.log.error('missing dependancy %s', dep)
# checks if allowed to install
self.log.error('not permitted to install missing %s', dep)
if opts['pkg'] not in install_list:
install_list.append(opts['pkg'])
# adds imh's path expecting the install to work
paths[dep] = opts['paths'][0]
# fails if not allowed to install
self.log.error('missing pkgs: %s', failed_list)
self.log.error('allowed to install: %s', install_list)
req = self._install_deps(install_list)
self.log.error("Failed to install dependancies %s", install_list)
def _install_deps(self, dep):
f'Would you like to install {dep}? (y|n)',
self.log.warning('exiting')
['yum', '-y', 'install'] + dep,
stdout=None if self.verbose else DEVNULL,
except FileNotFoundError as exc:
except CalledProcessError:
self.log.fatal('error running yum, exiting')
cpu_limit = psutil.cpu_count() - 1
loadavg = os.getloadavg()[0]
'Load too high to start clamscan (%s/%s), sleeping 30s',
def update_defs(self, disable_freshclam: bool, disable_default: bool):
"""Updates the custom definitions"""
for dir_name in DEFS_DIR, HEUR_DIR:
Path(dir_name).mkdir(mode=0o755, parents=True, exist_ok=True)
self.log.debug('Definitions to get: %s', DEF_FILES + HEUR_FILES)
for def_file in DEF_FILES:
url = f'{DEFS_SERVER}{def_file}'
path = f'{DEFS_DIR}{def_file}'
self._download_file(url, path)
for def_file in HEUR_FILES:
url = f'{DEFS_SERVER}{def_file}'
path = f'{HEUR_DIR}{def_file}'
self._download_file(url, path)
self.log.warning('Just updating defintions')
if disable_default or disable_freshclam:
def _download_file(self, url: str, dest: str):
self.log.debug('Downloading %s to %s', url, dest)
with requests.get(url, stream=True, timeout=30) as req:
with open(str(dest) + '.tmp', 'wb') as file:
for chunk in req.iter_content(chunk_size=8192):
except requests.RequestException as exc:
self.log.error("Unable to retrieve %s, skipping\n%s", url, exc)
os.rename(dest + '.tmp', dest)
"""Runs freshclam for system"""
if os.path.exists(FRESH_CACHE) and not self.update:
with open(FRESH_CACHE, encoding='ascii') as file:
last_run = int(file.read().strip())
if last_run + 86400 > now:
'freshclam ran less than a day ago, skipping'
self.log.error('error reading %s\n%s', FRESH_CACHE, exc)
freshclam_conf = f"--config-file={self.cmd_paths['freshconf']}"
fresh_cmd = [self.cmd_paths['freshclam'], freshclam_conf]
self.log.debug('freshclam command: %s', fresh_cmd)
with Popen(fresh_cmd, stdout=PIPE, encoding='utf-8') as proc:
self._freshclam_print(line)
self.log.error("ERROR: freshclam failed: %s", exc)
'ERROR: freshclam failed, database.clamav.net '
with open(FRESH_CACHE, 'w', encoding='ascii') as file:
def write_ok(self, path):
testfile = tempfile.TemporaryFile(dir=path)
self.log.debug('%s is not writeable', path)
self.log.debug('%s is writeable', path)
log_tuples: list[tuple[Path, Union[pwd.struct_passwd, None]]],
for log_path, owner in log_tuples:
log_path.parent.mkdir(mode=0o700, parents=True, exist_ok=True)
os.chown(log_path.parent, owner.pw_uid, owner.pw_gid)
self.log.fatal('%s\nerror in _init_logs', exc)
stack.enter_context(log_path.open('a', encoding='utf-8'))
os.chown(log_path, owner.pw_uid, owner.pw_gid)
log_path.chmod(mode=0o600)
self.cmd_paths['clamscan'],
cmd.append('--heuristic-alerts=yes')
cmd.append('--phishing-sigs=yes')
cmd.extend(['-d', self.cmd_paths['clamav-db']])
cmd.extend(['-d', DEFS_DIR]) # custom imh dbs and maldetect
if not disable_maldetect:
cmd.extend(['-d', self.cmd_paths['maldet']])
if not disable_new_yara and os.path.exists(DUMMY):
cmd.extend(['-d', f'{DUMMY}'])
cmd.extend(['-d', HEUR_DIR])
# excludes common false positive/time wasting dirs for cPanel
r'--exclude=\.(jpe?g|png|gif|mp(eg|4|g)|mov|avi|wmv|flv)$'
r'--exclude=/home[0-9]?/[^/]*/'
r'(quarantine*|mail/|etc/|logs/.*(\.tar)?\.gz'
r'|tmp/awstats/.*.txt|tmp/webalizer/'
r'(.*usage_.*.html|webalizer\.current))'
cmd.extend(['--exclude', path])
log_tuples: list[tuple[Path, Union[pwd.struct_passwd, None]]],
print_items: bool = False,
scan_path_strs = list(map(str, scan_paths))
cmd = self.command.copy()
cmd.extend(scan_path_strs)
self.log.warning('Scan command: %s', c.cyan(shlex.join(cmd)))
with ExitStack() as stack:
open_logfiles = self._init_logs(stack, log_tuples)
errors='surrogateescape',