from argparse import ArgumentParser from hashlib import sha1 from pathlib import Path from sqlalchemy import desc import logging from . import capstone_wrapper from . import rizin_wrapper from .schema import db_config, Disassembly def subdisassem_script(): parser = ArgumentParser(description="") parser.add_argument("-v", "--verbose", action="count", help="verbose logging") parser.add_argument("-b", "--bin-path", required=True) parser.add_argument("-l", "--log", action="store_true", help="log to file") parser.add_argument( "-f", "--fuzz", type=int, default=1, help="offset bruteforce max" ) args = parser.parse_args() args.bin_path = Path(args.bin_path) if args.verbose: print_count = -1 level = logging.DEBUG format = "%(asctime)s %(filename)s:%(lineno)d %(message)s" else: print_count = 5 level = logging.INFO format = "%(asctime)s %(message)s" if args.log: filename = args.bin_path.parent.joinpath(f"{args.bin_path.name}.log") logging.basicConfig( level=level, format=format, filename=filename, ) else: logging.basicConfig( level=level, format=format, ) logging.info(args) db_path = args.bin_path.parent.joinpath(f"{args.bin_path.name}.sqlite").absolute() session = db_config(db_path) logging.info(f"results sqlite database created at {db_path}") # reading the whole file into memory until I get an idea for pagnating with args.bin_path.open("rb") as file_open: raw_bytes = file_open.read() sha1sum = sha1() sha1sum.update(raw_bytes) checksum = sha1sum.hexdigest() logging.info(f"sha1sum: {checksum}") capstone_archs = [ capstone_wrapper.x86_16, capstone_wrapper.x86_32, capstone_wrapper.x86_64, capstone_wrapper.armv7, capstone_wrapper.thumb2, capstone_wrapper.aarch64, capstone_wrapper.mips32, capstone_wrapper.mips64_el, capstone_wrapper.ppc64, capstone_wrapper.sparc, capstone_wrapper.sparcv9, capstone_wrapper.systemz, capstone_wrapper.xcore, ] for arch in capstone_archs: for offset in range(args.fuzz): exists = ( session.query(Disassembly) .filter(Disassembly.checksum == checksum) .filter(Disassembly.offset == offset) .filter(Disassembly.arch == arch.__name__) .filter(Disassembly.engine == str(arch.__bases__)) .first() ) if exists: logging.debug( f"subdiassembly_exists: {[arch.__name__, checksum, offset]}" ) continue disasembler = arch(payload=raw_bytes, offset=offset) row = Disassembly() row.arch = disasembler.__class__.__name__ row.checksum = checksum row.count = len(disasembler) row.engine = str(arch.__bases__) row.mnemonic_rates = str(disasembler.mnemonic_rates[:print_count]) row.offset = offset row.opcodes = disasembler.objdump row.path = str(args.bin_path.absolute()) row.ret_rates = str(disasembler.ret_rates[:print_count]) row.size = len(raw_bytes) - offset session.add(row) session.commit() rizin_archs = [ rizin_wrapper._6502_8, rizin_wrapper._6502_16, rizin_wrapper._8051, rizin_wrapper.amd29k, rizin_wrapper.arc_16, rizin_wrapper.arc_32, rizin_wrapper.arm_as_16, rizin_wrapper.arm_as_32, rizin_wrapper.arm_as_64, rizin_wrapper.arm_16, rizin_wrapper.arm_32, rizin_wrapper.arm_64, rizin_wrapper.arm_gnu_16, rizin_wrapper.arm_gnu_32, rizin_wrapper.arm_gnu_64, rizin_wrapper.arm_wine_16, rizin_wrapper.arm_wine_32, rizin_wrapper.avr_8, rizin_wrapper.avr_16, rizin_wrapper.bf_16, rizin_wrapper.bf_32, rizin_wrapper.bf_64, rizin_wrapper.bf_64, rizin_wrapper.chip8, rizin_wrapper.cr_16, rizin_wrapper.cris, rizin_wrapper.dalvik_32, rizin_wrapper.dalvik_64, rizin_wrapper.dcpu16, rizin_wrapper.ebc_32, rizin_wrapper.ebc_64, rizin_wrapper.gb, rizin_wrapper.h8300, rizin_wrapper.hexagon, rizin_wrapper.hppa, rizin_wrapper.i4004, rizin_wrapper.i8080, rizin_wrapper.java, rizin_wrapper.lanai, rizin_wrapper.lh5801, rizin_wrapper.lm32, rizin_wrapper.luac, rizin_wrapper.m68k, rizin_wrapper.m680x_8, rizin_wrapper.m680x_32, rizin_wrapper.malbolge, rizin_wrapper.mcore, rizin_wrapper.mcs96, rizin_wrapper.mips_16, rizin_wrapper.mips_32, rizin_wrapper.mips_64, rizin_wrapper.mips_gnu_32, rizin_wrapper.mips_gnu_64, rizin_wrapper.msp430, rizin_wrapper.nios2, rizin_wrapper.or1k, rizin_wrapper.pic, rizin_wrapper.ppc_as_32, rizin_wrapper.ppc_as_64, rizin_wrapper.ppc_32, rizin_wrapper.ppc_64, rizin_wrapper.ppc_gnu_32, rizin_wrapper.ppc_gnu_64, rizin_wrapper.propeller, rizin_wrapper.pyc_8, rizin_wrapper.pyc_16, rizin_wrapper.riscv_32, rizin_wrapper.riscv_64, rizin_wrapper.rsp, rizin_wrapper.sh, rizin_wrapper.snes_8, rizin_wrapper.snes_16, rizin_wrapper.sparc_32, rizin_wrapper.sparc_64, rizin_wrapper.sparc_gnu_32, rizin_wrapper.sparc_gnu_64, rizin_wrapper.spc700, rizin_wrapper.sysz_32, rizin_wrapper.sysz_64, rizin_wrapper.tms320, rizin_wrapper.tms320c64x, rizin_wrapper.tricore, rizin_wrapper.v810_32, rizin_wrapper.v850, rizin_wrapper.vax_8, rizin_wrapper.vax_32, rizin_wrapper.wasm_32, rizin_wrapper.x86_as_16, rizin_wrapper.x86_as_32, rizin_wrapper.x86_as_64, rizin_wrapper.x86_16, rizin_wrapper.x86_32, rizin_wrapper.x86_64, rizin_wrapper.x86_nasm_16, rizin_wrapper.x86_nasm_32, rizin_wrapper.x86_nasm_64, rizin_wrapper.x86_nz_16, rizin_wrapper.x86_nz_32, rizin_wrapper.x86_nz_64, rizin_wrapper.xap, rizin_wrapper.xcore, rizin_wrapper.xtensa, rizin_wrapper.z80, ] for arch in rizin_archs: for offset in range(args.fuzz): exists = ( session.query(Disassembly) .filter(Disassembly.checksum == checksum) .filter(Disassembly.offset == offset) .filter(Disassembly.arch == arch.__name__) .filter(Disassembly.engine == str(arch.__bases__)) .first() ) if exists: logging.debug( f"subdiassembly_exists: {[arch.__name__, checksum, offset]}" ) continue disasembler = arch(path=args.bin_path) row = Disassembly() row.arch = disasembler.__class__.__name__ row.checksum = checksum row.count = len(disasembler) row.engine = str(arch.__bases__) row.mnemonic_rates = str(disasembler.mnemonic_rates[:print_count]) row.offset = offset row.opcodes = disasembler.objdump row.path = str(args.bin_path.absolute()) row.ret_rates = str(disasembler.ret_rates[:print_count]) row.size = len(raw_bytes) - offset session.add(row) session.commit() tops = list() for arch in capstone_archs: top = ( session.query(Disassembly) .filter(Disassembly.arch == arch.__name__) .order_by(desc("count")) .first() ) tops.append(top) for arch in rizin_archs: top = ( session.query(Disassembly) .filter(Disassembly.arch == arch.__name__) .order_by(desc("count")) .first() ) tops.append(top) tops = sorted(tops, key=len, reverse=True) for top in tops[:print_count]: logging.info(top)