from argparse import ArgumentParser from hashlib import sha1 from pathlib import Path from sqlalchemy import desc import logging from . import capstone_wrapper from . import rizin_wrapper from .schema import db_config, Disassembly def subdisassem_script(): parser = ArgumentParser(description="") parser.add_argument("-v", "--verbose", action="count", help="verbose logging") parser.add_argument("-b", "--bin-path", required=True) parser.add_argument("-l", "--log", action="store_true", help="log to file") parser.add_argument( "-f", "--fuzz", type=int, default=1, help="offset bruteforce max" ) args = parser.parse_args() args.bin_path = Path(args.bin_path) if args.verbose: print_count = -1 level = logging.DEBUG format = "%(asctime)s %(filename)s:%(lineno)d %(message)s" else: print_count = 5 level = logging.INFO format = "%(asctime)s %(message)s" if args.log: filename = args.bin_path.parent.joinpath(f"{args.bin_path.name}.log") logging.basicConfig( level=level, format=format, filename=filename, ) else: logging.basicConfig( level=level, format=format, ) logging.info(args) db_path = args.bin_path.parent.joinpath(f"{args.bin_path.name}.sqlite").absolute() session = db_config(db_path) logging.info(f"results sqlite database created at {db_path}") # reading the whole file into memory until I get an idea for pagnating with args.bin_path.open("rb") as file_open: raw_bytes = file_open.read() sha1sum = sha1() sha1sum.update(raw_bytes) checksum = sha1sum.hexdigest() logging.info(f"sha1sum: {checksum}") capstone_archs = [ capstone_wrapper.x86_16, capstone_wrapper.x86_32, capstone_wrapper.x86_64, capstone_wrapper.armv7, capstone_wrapper.thumb2, capstone_wrapper.aarch64, capstone_wrapper.mips32, capstone_wrapper.mips64_el, capstone_wrapper.ppc64, capstone_wrapper.sparc, capstone_wrapper.sparcv9, capstone_wrapper.systemz, capstone_wrapper.xcore, ] for arch in capstone_archs: for offset in range(args.fuzz): exists = ( session.query(Disassembly) .filter(Disassembly.checksum == checksum) .filter(Disassembly.offset == offset) .filter(Disassembly.arch == arch.__name__) .filter(Disassembly.engine == str(arch.__bases__)) .first() ) if exists: logging.debug( f"subdiassembly_exists: {[arch.__name__, checksum, offset]}" ) continue disasembler = arch(payload=raw_bytes, offset=offset) row = Disassembly() row.arch = disasembler.__class__.__name__ row.checksum = checksum row.count = len(disasembler) row.engine = str(arch.__bases__) row.mnemonic_rates = str(disasembler.mnemonic_rates[:print_count]) row.offset = offset row.opcodes = disasembler.objdump row.path = str(args.bin_path.absolute()) row.ret_rates = str(disasembler.ret_rates[:print_count]) row.size = len(raw_bytes) - offset session.add(row) session.commit() rizin_archs = [ rizin_wrapper.x86_16, ] for arch in rizin_archs: for offset in range(args.fuzz): exists = ( session.query(Disassembly) .filter(Disassembly.checksum == checksum) .filter(Disassembly.offset == offset) .filter(Disassembly.arch == arch.__name__) .filter(Disassembly.engine == str(arch.__bases__)) .first() ) if exists: logging.debug( f"subdiassembly_exists: {[arch.__name__, checksum, offset]}" ) continue disasembler = arch(path=args.bin_path) row = Disassembly() row.arch = disasembler.__class__.__name__ row.checksum = checksum row.count = len(disasembler) row.engine = str(arch.__bases__) row.mnemonic_rates = str(disasembler.mnemonic_rates[:print_count]) row.offset = offset row.opcodes = disasembler.objdump row.path = str(args.bin_path.absolute()) row.ret_rates = str(disasembler.ret_rates[:print_count]) row.size = len(raw_bytes) - offset session.add(row) session.commit() tops = list() for arch in capstone_archs: top = ( session.query(Disassembly) .filter(Disassembly.arch == arch.__name__) .order_by(desc("count")) .first() ) tops.append(top) for arch in rizin_archs: top = ( session.query(Disassembly) .filter(Disassembly.arch == arch.__name__) .order_by(desc("count")) .first() ) tops.append(top) tops = sorted(tops, key=len, reverse=True) for top in tops[:print_count]: logging.info(top)