scrap-is-not-scrap/sins/run.py

109 lines
3.1 KiB
Python
Executable File

#! /usr/bin/env python3
from argparse import ArgumentParser
from datetime import datetime
from multiprocessing import Process, Queue
from pathlib import Path
from queue import Empty
from sqlalchemy import exists, desc
from tempfile import TemporaryDirectory
import logging
from .mutation import generation, mutate, seed_shell, growth
from .orm import db_config, ScrapNode
def sins():
now = '{0:%Y%m%dT%H%M%S}'.format(datetime.utcnow())
parser = ArgumentParser(
description='position independent code (PIC) mutation experiment.')
parser.add_argument('-v', '--verbose', action='count')
parser.add_argument('-s', '--seed', help='path to PIC image.')
parser.add_argument('-o', '--output', help='path to results directory.')
parser.add_argument('-l', '--lineage', default=10,
help='max count of unsuccessful generation.')
args = parser.parse_args()
log_level = logging.INFO
log_format = logging.Formatter('%(message)s')
if args.verbose:
log_level = logging.DEBUG
log_format = logging.Formatter(
'%(levelname)s %(filename)s:%(lineno)d\n%(message)s\n')
logger = logging.getLogger('sins')
logger.setLevel(log_level)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(log_level)
stream_handler.setFormatter(log_format)
logger.addHandler(stream_handler)
logger.info(now)
if args.output:
db_path = Path(f'{args.output}/sins.sqlite')
else:
temp_dir = TemporaryDirectory()
db_path = Path(f'{temp_dir.name}/sins.sqlite')
session = db_config(db_path)
logger.info(f'db_path: {db_path}')
recent = session.query(ScrapNode).order_by(desc('ctime')).first()
if args.seed:
seed_path = Path(args.seed)
with seed_path.open('rb') as seed_file:
seed_data = seed_file.read()
seed = ScrapNode(child=seed_data)
logger.debug(f'args.seed:\n{seed}')
elif recent:
seed = recent
logger.debug(f'recent:\n{seed}')
else:
seed = ScrapNode(child=seed_shell)
logger.debug(f'seed_shell:\n{seed}')
exists = session.query(ScrapNode).filter(ScrapNode.checksum == seed.checksum).all()
if exists:
seed = exists[0]
else:
session.add(seed)
session.commit()
parent = seed
queue = Queue()
while True:
lineage = 0
while lineage < args.lineage:
scrap = mutate(parent.image)
logger.debug(f'lineage: {lineage}')
result = None
proc = Process(target=generation, args=(queue, scrap))
proc.start()
try:
result = queue.get(timeout=1)
except Empty:
lineage += 1
continue
if not result:
lineage += 1
continue
scrap = growth(shellcode=scrap, length=result)
parent = ScrapNode(child=scrap, parent_id=parent.id)
session.add(parent)
session.commit()
logger.info(f'scrap:\n{parent}')
lineage = 0