Edit File by line
/home/barbar84/public_h.../wp-conte.../plugins/sujqvwi/AnonR/anonr.TX.../opt/support/lib
File: firewall_tools.py
from typing import Union, Literal
[0] Fix | Delete
from pathlib import Path
[1] Fix | Delete
import re
[2] Fix | Delete
import subprocess
[3] Fix | Delete
import netaddr
[4] Fix | Delete
from output import err_exit, print_listed, warn
[5] Fix | Delete
from run_cmd import is_exe, which
[6] Fix | Delete
from rads.color import yellow
[7] Fix | Delete
[8] Fix | Delete
[9] Fix | Delete
def fw_info() -> tuple[
[10] Fix | Delete
Literal['APF', 'CSF', 'ipset+fail2ban'],
[11] Fix | Delete
Literal['/usr/local/sbin/apf', '/usr/sbin/csf', None],
[12] Fix | Delete
Union[list[dict[str, str]], str],
[13] Fix | Delete
]:
[14] Fix | Delete
"""Yields a tuple of fw_name, fw_command, fw_data.
[15] Fix | Delete
fw_name will be "APF", "CSF", or "ipset+fail2ban".
[16] Fix | Delete
[17] Fix | Delete
If fw_name was "APF" or "CSF", fw_command will be the path to its exe.
[18] Fix | Delete
[19] Fix | Delete
If fw_name was "APF" or "CSF", fw_data will be the contents of its deny
[20] Fix | Delete
file. Otherwise, fw_data will be a list of dicts containing "listname"
[21] Fix | Delete
and "ip".
[22] Fix | Delete
[23] Fix | Delete
[24] Fix | Delete
Returns:
[25] Fix | Delete
tuple[str, str | None, list[dict[str, str] | None]]: see above
[26] Fix | Delete
"""
[27] Fix | Delete
if is_exe('/usr/local/sbin/apf'):
[28] Fix | Delete
fw_cmd = '/usr/local/sbin/apf'
[29] Fix | Delete
deny_file = Path('/etc/apf/deny_hosts.rules')
[30] Fix | Delete
name = 'APF'
[31] Fix | Delete
elif is_exe('/usr/sbin/csf'):
[32] Fix | Delete
fw_cmd = '/usr/sbin/csf'
[33] Fix | Delete
deny_file = Path('/etc/csf/csf.deny')
[34] Fix | Delete
name = 'CSF'
[35] Fix | Delete
elif is_exe('/opt/imh-python/bin/fail2ban-client') and which('ipset'):
[36] Fix | Delete
name = 'ipset+fail2ban'
[37] Fix | Delete
deny_file = None
[38] Fix | Delete
fw_cmd = None
[39] Fix | Delete
else:
[40] Fix | Delete
err_exit('Cannot identify firewall')
[41] Fix | Delete
if deny_file is None:
[42] Fix | Delete
deny_data = list(read_ipset_save())
[43] Fix | Delete
else:
[44] Fix | Delete
try:
[45] Fix | Delete
deny_data = deny_file.read_text(encoding='utf-8')
[46] Fix | Delete
except FileNotFoundError:
[47] Fix | Delete
err_exit(f'Cannot read {deny_file}. Firewall is misconfigured.')
[48] Fix | Delete
return name, fw_cmd, deny_data
[49] Fix | Delete
[50] Fix | Delete
[51] Fix | Delete
def read_ipset_save():
[52] Fix | Delete
irgx = re.compile(r'add (?P<listname>[a-zA-Z0-9\-_]+) (?P<ip>[0-9\./]+)$')
[53] Fix | Delete
with subprocess.Popen(
[54] Fix | Delete
['ipset', 'save'],
[55] Fix | Delete
encoding='utf-8',
[56] Fix | Delete
stdout=subprocess.PIPE,
[57] Fix | Delete
universal_newlines=True,
[58] Fix | Delete
) as proc:
[59] Fix | Delete
for line in proc.stdout:
[60] Fix | Delete
if match := irgx.match(line.rstrip()):
[61] Fix | Delete
yield match.groupdict()
[62] Fix | Delete
[63] Fix | Delete
[64] Fix | Delete
def ipset_list_action(
[65] Fix | Delete
listname: str,
[66] Fix | Delete
) -> Literal['ACCEPT', 'DROP', 'DENY', 'UNKNOWN']:
[67] Fix | Delete
"""Check whether an ipset list is set to ACCEPT, DROP, or DENY"""
[68] Fix | Delete
try:
[69] Fix | Delete
iptables = subprocess.check_output(
[70] Fix | Delete
['iptables', '-nL'], encoding='utf-8'
[71] Fix | Delete
)
[72] Fix | Delete
except (OSError, subprocess.CalledProcessError):
[73] Fix | Delete
err_exit('Failed to execute iptables to determine list type')
[74] Fix | Delete
ipt_data = [x for x in iptables.splitlines() if x.find('match-set') > 0]
[75] Fix | Delete
for tline in ipt_data:
[76] Fix | Delete
if listname == tline.split()[6]:
[77] Fix | Delete
return tline.split()[0]
[78] Fix | Delete
return 'UNKNOWN'
[79] Fix | Delete
[80] Fix | Delete
[81] Fix | Delete
def ipset_fail2ban_check(
[82] Fix | Delete
fw_data: list[dict[str, str]], ipaddr: netaddr.IPAddress
[83] Fix | Delete
) -> tuple[bool, Union[str, None]]:
[84] Fix | Delete
"""Check deny_data ``fw_info()`` for an IP address. If found, return whether
[85] Fix | Delete
it's blocked and in what fail2ban list if it was automatically blocked
[86] Fix | Delete
[87] Fix | Delete
Args:
[88] Fix | Delete
fw_data (list[dict[str, str]]): third arg returned by ``fw_info()``
[89] Fix | Delete
ipaddr (netaddr.IPAddress): IP address to check
[90] Fix | Delete
[91] Fix | Delete
Returns:
[92] Fix | Delete
tuple[bool, str | None]]: if blocked and in what fail2ban list if any
[93] Fix | Delete
"""
[94] Fix | Delete
list_name = None
[95] Fix | Delete
for tnet in fw_data:
[96] Fix | Delete
try:
[97] Fix | Delete
listed = ipaddr in netaddr.IPNetwork(tnet['ip'])
[98] Fix | Delete
if listed:
[99] Fix | Delete
list_name = tnet['listname']
[100] Fix | Delete
break
[101] Fix | Delete
except netaddr.AddrFormatError:
[102] Fix | Delete
continue
[103] Fix | Delete
list_action = ipset_list_action(list_name)
[104] Fix | Delete
if not listed:
[105] Fix | Delete
print_listed(ipaddr, False, 'any ipset or fail2ban list')
[106] Fix | Delete
return False, None
[107] Fix | Delete
print_listed(ipaddr, True, f'the {list_name} {list_action} list')
[108] Fix | Delete
if list_action == 'ACCEPT':
[109] Fix | Delete
warn(f'{ipaddr} is NOT BLOCKED. It is whitelisted.', color=yellow)
[110] Fix | Delete
return False, None
[111] Fix | Delete
if list_name.startswith('f2b-'):
[112] Fix | Delete
warn(
[113] Fix | Delete
'Automatically blocked by fail2ban in jail:',
[114] Fix | Delete
list_name.replace('f2b-', ''),
[115] Fix | Delete
color=yellow,
[116] Fix | Delete
)
[117] Fix | Delete
return listed, list_name.replace('f2b-', '')
[118] Fix | Delete
return listed, None
[119] Fix | Delete
[120] Fix | Delete
[121] Fix | Delete
def check_iptables(ipaddr: netaddr.IPAddress) -> bool:
[122] Fix | Delete
"""Search iptables -nL for a line containing an IP which does not start with
[123] Fix | Delete
ACCEPT"""
[124] Fix | Delete
try:
[125] Fix | Delete
fw_data = subprocess.check_output(['iptables', '-nL'], encoding='utf-8')
[126] Fix | Delete
except (OSError, subprocess.CalledProcessError):
[127] Fix | Delete
# stderr will print to tty
[128] Fix | Delete
err_exit('could not run iptables -nL')
[129] Fix | Delete
for line in fw_data.splitlines():
[130] Fix | Delete
if not line.startswith('ACCEPT') and str(ipaddr) in line:
[131] Fix | Delete
return True
[132] Fix | Delete
return False
[133] Fix | Delete
[134] Fix | Delete
It is recommended that you Edit text format, this type of Fix handles quite a lot in one request
Function