#! /usr/bin/env python3 from argparse import ArgumentParser from datetime import datetime from multiprocessing import Process, Queue from pathlib import Path from queue import Empty from sqlalchemy import exists, desc from tempfile import TemporaryDirectory import logging from .mutation import generation, mutate, seed_shell, growth from .orm import db_config, ScrapNode from .disassemble import objdump def export(): now = '{0:%Y%m%dT%H%M%S}'.format(datetime.utcnow()) parser = ArgumentParser( description='export recent scrap shellcode.') parser.add_argument('-v', '--verbose', action='count') parser.add_argument('-db', '--database', default='/out/sins.sqlite', help='path to scrap database.') parser.add_argument('-o', '--out_path', help='path to export scrap shellcode.') args = parser.parse_args() level = logging.DEBUG if args.verbose else logging.INFO logging.basicConfig(level=level, format='%(message)s') logging.info(now) db_path = Path(args.database) session = db_config(db_path) logging.info(f'db_path: {db_path}') recent = session.query(ScrapNode).order_by(desc('ctime')).first() logging.info(f'recent: {recent}') out_path = Path(f'{args.out_path}/scrap-{recent.checksum[:8]}.bin') with out_path.open('wb') as file: file.write(recent.image) def sins(): now = '{0:%Y%m%dT%H%M%S}'.format(datetime.utcnow()) parser = ArgumentParser( description='position independent code (PIC) mutation experiment.') parser.add_argument('-v', '--verbose', action='count') parser.add_argument('-s', '--seed', help='path to PIC image.') parser.add_argument('-o', '--out_path', help='path to results directory.') parser.add_argument('-l', '--lineage', default=10, help='max count of unsuccessful generation.') args = parser.parse_args() log_level = logging.INFO log_format = logging.Formatter('%(message)s') if args.verbose: log_level = logging.DEBUG log_format = logging.Formatter( '%(levelname)s %(filename)s:%(lineno)d\n%(message)s\n') logger = logging.getLogger('sins') logger.setLevel(log_level) stream_handler = logging.StreamHandler() stream_handler.setLevel(log_level) stream_handler.setFormatter(log_format) logger.addHandler(stream_handler) logger.info(now) if args.out_path: db_path = Path(f'{args.out_path}/sins.sqlite') else: temp_dir = TemporaryDirectory() db_path = Path(f'{temp_dir.name}/sins.sqlite') session = db_config(db_path) logger.info(f'db_path: {db_path}') recent = session.query(ScrapNode).order_by(desc('ctime')).first() if args.seed: seed_path = Path(args.seed) with seed_path.open('rb') as seed_file: seed_data = seed_file.read() seed = ScrapNode(child=seed_data) logger.debug(f'args.seed:\n{seed}') elif recent: seed = recent logger.debug(f'recent:\n{seed}') else: seed = ScrapNode(child=seed_shell) logger.debug(f'seed_shell:\n{seed}') exists = session.query(ScrapNode).filter( ScrapNode.checksum == seed.checksum).all() if exists: seed = exists[0] else: session.add(seed) session.commit() parent = seed queue = Queue() while True: lineage = 0 while lineage < args.lineage: scrap = mutate(parent.image) logger.debug(f'lineage: {lineage}') result = None proc = Process(target=generation, args=(queue, scrap)) proc.start() try: result = queue.get(timeout=1) except Empty: lineage += 1 continue if result != len(scrap): lineage += 1 continue opcodes = objdump(scrap) ops_count = opcodes.count('\n') logger.debug({'result': result, 'ops': ops_count}) scrap = growth(shellcode=scrap, objdump=opcodes) parent = ScrapNode(child=scrap, parent_id=parent.id) parent.objdump = opcodes session.add(parent) session.commit() logger.info(parent) lineage = 0