#!/usr/bin/env python3 from datetime import datetime from hashlib import sha1 from pathlib import Path from sqlalchemy import LargeBinary, Column, ForeignKey, Integer, String, DateTime, create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import Session, relationship, backref from sqlalchemy.orm.collections import attribute_mapped_collection import json from .disassemble import disasm now = '{0:%Y%m%dT%H%M%S}'.format(datetime.utcnow()) Base = declarative_base() def db_config(path: Path) -> Session: engine = create_engine(f'sqlite:///{path.resolve()}', native_datetime=True) Base.metadata.create_all(engine) session = Session(engine) return session class ScrapNode(Base): __tablename__ = 'scrap_node' ctime = Column(DateTime, default=datetime.utcnow) id = Column(Integer, primary_key=True) length = Column(Integer, default=0) mtime = Column(DateTime, onupdate=datetime.utcnow) parent_id = Column(Integer, ForeignKey(id)) checksum = Column(String) disasm = Column(String) image = Column(LargeBinary) children = relationship( "ScrapNode", cascade="all, delete-orphan", backref=backref("parent", remote_side=id), collection_class=attribute_mapped_collection('name')) def __init__(self, *, child: bytes, parent_id: int = None): self.parent_id = parent_id self.image = child self.length = len(child) self.sha1sum self.disasm = disasm(child) def __repr__(self): values = { 'checksum': self.checksum, 'length': self.length, 'disasm': self.disasm, 'parent_id': self.parent_id, 'id': self.id, } return json.dumps(values, indent=1) @property def sha1sum(self): if self.checksum: return self.checksum checksum = sha1() checksum.update(self.image) self.checksum = checksum.hexdigest() return self.checksum def disasm(shellcode: bytes) -> str: opcodes = list() for opcode in capstone.disasm(shellcode, 0): opcodes += f'{opcode.mnemonic} {opcode.op_str}\n' return opcodes