#!/opt/imh-python/bin/python3
"""Forensic Attack Tool -- This program will dump forensic data from ss,
tcpdump, netstat, and ps for use in analyzing attack vectors, surfaces,
and impact as part of the post-mortem process."""
# Copyright 2012 -- InMotion Hosting Inc.
# Author: Kevin D (kevind@inmotionhosting.com, ext 1116)
# Dependencies: Python 2.4
# 2012071201 -- updated to fix binary paths for subprocesses
# 2012061803 -- updated to give more feedback to the user
# 2012061802 -- misc syntax and header info updates
# -- added in venet and CE6 interface input validation
# 2012061801 -- fixed log init time (moved later to prevent blank logs)
# -- added input sanity checking
# -- removed log tagging. may be added back in later release
# 2012061501 -- Initial release
# Reference for backgrounding:
# https://www.oreilly.com/library/view/python-cookbook/0596001673/ch06s08.html
from datetime import datetime
from argparse import ArgumentParser
from concurrent.futures import ThreadPoolExecutor
DATESTAMP = NOW.strftime("%Y%m%d_%H%M%S")
parser = ArgumentParser(description=__doc__)
"-i", "--interface", dest="eth_interface", metavar="INTERFACE",
help="Define a capture interface for tcpdump",
"-s", "--sip", "--source-ip", dest="sip", metavar="SIP",
help="Define a source IP for tcpdump",
"-d", "--dip", "--destination-ip", dest="dip", metavar="DIP",
help="Define a destination IP for tcpdump",
args = parser.parse_args()
valid_interface = re.compile(
r"^(eth|venet|em)[0-9]{1,3}(\:(cp)?[0-9]{1,3})?$"
valid_ip = re.compile(r"^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$")
if args.eth_interface is not None:
if not valid_interface.match(args.eth_interface):
sys.exit("Invalid ethernet interface! Exiting")
if not valid_ip.match(args.sip):
sys.exit("Invalid source IP! Exiting")
if not valid_ip.match(args.dip):
sys.exit("Invalid destination IP! Exiting")
"Backgrounding forensic proccesses.",
"Please include the following line in your post-mortem report.",
"FAT logs: /var/log/attack.{DATESTAMP}.*.log",
sys.exit(f"fork #1 failed: {exc}")
# Decouple from parent environment
sys.exit(f"fork #2 failed: {exc}")
'/usr/sbin/tcpdump', '-Un', '-c', '5000', '-Z', 'root',
'-w', f'/var/log/attack.{DATESTAMP}.tcpdump.cap',
netstat = ['/bin/netstat', '-pnee', '-a']
ss_cmd = ['/usr/sbin/ss', '-pain']
ps_cmd = ['/bin/ps', 'aufx']
# If an ethernet interface is defined, add it to the command.
if args.eth_interface is not None:
tcpdump.extend(['-i', args.eth_interface])
# If a source IP is defined, add it to the command.
tcpdump.extend(['src', args.sip])
# If source AND destination IPs are defined, add them both to tcpdump.
tcpdump.extend(['and', 'dst', args.dip])
# If a only destination IP is defined, add it to the command.
elif args.dip is not None:
tcpdump.extend(['dst', args.dip])
log = f'/var/log/attack.{DATESTAMP}.%s.log'
with ThreadPoolExecutor(max_workers=4) as pool:
pool.submit(run_cmd, tcpdump, log % 'tcpdump')
pool.submit(run_cmd, netstat, log % 'netstat')
pool.submit(run_cmd, ss_cmd, log % 'ss')
pool.submit(run_cmd, ps_cmd, log % 'psaufx')
def run_cmd(cmd, log_path):
with open(log_path, 'wb') as log_file:
subprocess.run(cmd, stdout=log_file, stderr=log_file, check=False)
log_file.write(bytes(str(exc), 'utf-8', 'replace'))
if __name__ == '__main__':