#! /usr/bin/python3 # Neale Pickett # Unclassified/FOUO # # Created: 2020-12-14 16:49:51 # Last-modified: 2020-12-22 17:57:54 # # Based on work by @RedDrip7 (twitter), # who should be getting more credit in the English-speaking world. import argparse import base64 import codecs import csv import itertools import re import sys knownDomains = [ "appsync-api.us-east-1.avsvmcloud.com", "appsync-api.us-east-2.avsvmcloud.com", "appsync-api.us-west-2.avsvmcloud.com", "appsync-api.eu-west-1.avsvmcloud.com", "avsvmcloud.com", ] def xor(key, buf): return bytes(b^k for b,k in zip(buf, itertools.cycle(key))) Bsae32Alphabet = "ph2eifo3n5utg1j8d94qrvbmk0sal76c" def DecodeBase32(s: str): """Not used by sunburst. If Sunburst actually used Base32, this would work to decode things. It doesn't work. """ t = s.translate(Bsae32Alphabet, "ABCDEFGHIJKLMNOPQRSTUVWXYZ2345678") while len(t) % 8 > 0: t += '=' return base64.b32decode(t) def DecodeBsae32(s: str): """Decode using zany base32-like algorithm. The following opinion has been formed hastily and could be misinformed: This is not proper Base32. It's more like somebody read about Base32, implemented an encoder and decoder incorrectly, and went on to the next task. """ bits = 0 acc = 0 for c in s: acc |= Bsae32Alphabet.find(c) << bits bits += 5 out8 = [] while bits > 0: out8.append(acc & 255) acc >>= 8 bits -= 8 if bits: del out8[-1] return bytes(out8) SubstitutionAlphabet = 'rq3gsalt6u1iyfzop572d49bnx8cvmkewhj' SubstitutionXlat = str.maketrans( SubstitutionAlphabet[4:] + SubstitutionAlphabet[:4], SubstitutionAlphabet) SubstitutionXlat0 = str.maketrans( SubstitutionAlphabet, ('0_-.' * 9)[:len(SubstitutionAlphabet)]) def DecodeSubst(s: str): zeroBaby = False out = [] for c in s: if c == '0': zeroBaby = True continue if zeroBaby: out.append(c.translate(SubstitutionXlat0)) else: out.append(c.translate(SubstitutionXlat)) zeroBaby = False return ''.join(out) PayloadsByGuid = {} def DecodeDomain(domain: str): s = domain.strip() foundDomain = None for d in knownDomains: if s.endswith(d): foundDomain = d break if not foundDomain: raise RuntimeError("Can't find domain for %s" % s) s = s[:-len(foundDomain)] if not s: return (None, None, "[no data transmitted]") assert(s[-1] == '.') s = s[:-1] if len(s) < 16: return (None, None, "[unable to decode: too short for any known decoding rules]") eguid = DecodeBsae32(s[:15]) unknown_a = s[15] guid = xor(eguid[0:1], eguid[1:]) s = s[16:] payloads = PayloadsByGuid.setdefault(guid, []) if s not in payloads: if s.startswith("00"): payloads.insert(0, s) else: payloads.append(s) # People: friggin' preserve metadata, ugh. # If I gave you every line of The Empire Strikes Back, # sorted alphabetically, without timestamps, # could you reconstruct the movie? payloads.sort(key=len, reverse=True) payload = ''.join(payloads) if payload.startswith("00"): buf = DecodeBsae32(payload[2:]) decoded = codecs.encode(buf, "quopri").decode("utf-8") else: decoded = DecodeSubst(payload) return (guid, unknown_a, decoded) class TextReader: def __init__(self, infile): self.infile = infile self.fieldnames = ["name"] def __iter__(self): for s in self.infile: yield {"name": s.strip()} class CsvReader: def __init__(self, infile): self.reader = csv.DictReader(infile) self.fieldnames = self.reader.fieldnames + ["guid", "unknown a", "decode"] def __iter__(self): for record in self.reader: yield record def main(): parser = argparse.ArgumentParser( description="Decode sunburst Domain Generation Algorithm (DGA) names") parser.add_argument("--text", dest="input", action="store_const", const="text", help="Parse bambenek-style: list of fqdns, one per line") parser.add_argument("--csv", dest="input", action="store_const", const="csv", help="Parse CSV: records must be in a 'name' or 'fqdn' field") parser.add_argument("infile", nargs="?", type=argparse.FileType("r"), default=sys.stdin) parser.add_argument("--outfile", nargs="?", type=argparse.FileType("w"), default=sys.stdout, help="CSV file to write (default stdout)") args = parser.parse_args() reader = None if args.input == "text": reader = TextReader(args.infile) elif args.input == "csv": reader = CsvReader(args.infile) elif args.infile.name.endswith(".txt"): reader = TextReader(args.infile) elif args.infile.name.endswith(".csv"): reader = CsvReader(args.infile) else: parser.print_help() return fieldnames = reader.fieldnames + ["guid", "unknown a", "decode"] writer = csv.DictWriter(args.outfile, fieldnames) writer.writeheader() for record in reader: name = record.get("name") or record.get("fqdn") guid, unknown_a, ptext = DecodeDomain(name) record["guid"] = int.from_bytes(guid or b"", "big") record["unknown a"] = unknown_a record["decode"] = ptext writer.writerow(record) if __name__ == '__main__': main()