diff --git a/sunburst.py b/sunburst.py index cf27bf4..c8e7df6 100755 --- a/sunburst.py +++ b/sunburst.py @@ -4,7 +4,7 @@ # Unclassified/FOUO # # Created: 2020-12-14 16:49:51 -# Last-modified: 2020-12-22 17:57:54 +# Last-modified: 2020-12-22 21:42:40 # # Based on work by @RedDrip7 (twitter), # who should be getting more credit in the English-speaking world. @@ -26,10 +26,14 @@ knownDomains = [ "avsvmcloud.com", ] -def xor(key, buf): - return bytes(b^k for b,k in zip(buf, itertools.cycle(key))) -Bsae32Alphabet = "ph2eifo3n5utg1j8d94qrvbmk0sal76c" +def xor(key, buf): + return bytes(b ^ k for b, k in zip(buf, itertools.cycle(key))) + + +Esab32Alphabet = "ph2eifo3n5utg1j8d94qrvbmk0sal76c" +SubstitutionAlphabet = 'rq3gsalt6u1iyfzop572d49bnx8cvmkewhj' +SubstitutionAlphabet0 = '0_-.' def DecodeBase32(s: str): @@ -39,63 +43,109 @@ def DecodeBase32(s: str): It doesn't work. """ - t = s.translate(Bsae32Alphabet, - "ABCDEFGHIJKLMNOPQRSTUVWXYZ2345678") + trans = str.maketrans(Esab32Alphabet, + "ABCDEFGHIJKLMNOPQRSTUVWXYZ2345678") + t = s.translate(trans) while len(t) % 8 > 0: t += '=' return base64.b32decode(t) -def DecodeBsae32(s: str): - """Decode using zany base32-like algorithm. +def DecodeEsab32(s: str) -> (int, int): + """Decode using big-endian base32 algorithm. - The following opinion has been formed hastily and could be misinformed: - - This is not proper Base32. It's more like somebody read about Base32, - implemented an encoder and decoder incorrectly, and went on to the next task. + Returns a bigint, and the number of bits contained therein """ - bits = 0 - acc = 0 + acc = bits = 0 for c in s: - acc |= Bsae32Alphabet.find(c) << bits + try: + p = Esab32Alphabet.index(c) + except ValueError: + raise RuntimeError( + "Not an Esab32 encoded character: %c in %s" % (c, s)) + acc |= p << bits bits += 5 - - out8 = [] - while bits > 0: - out8.append(acc & 255) - acc >>= 8 - bits -= 8 - - if bits: - del out8[-1] - return bytes(out8) + return acc, bits -SubstitutionAlphabet = 'rq3gsalt6u1iyfzop572d49bnx8cvmkewhj' -SubstitutionXlat = str.maketrans( - SubstitutionAlphabet[4:] + SubstitutionAlphabet[:4], SubstitutionAlphabet) -SubstitutionXlat0 = str.maketrans( - SubstitutionAlphabet, ('0_-.' * 9)[:len(SubstitutionAlphabet)]) - - -def DecodeSubst(s: str): - zeroBaby = False +def DecodeSubst(s: str) -> str: + alphabet = SubstitutionAlphabet out = [] for c in s: if c == '0': - zeroBaby = True - continue - if zeroBaby: - out.append(c.translate(SubstitutionXlat0)) + alphabet = SubstitutionAlphabet0 else: - out.append(c.translate(SubstitutionXlat)) - zeroBaby = False - return ''.join(out) + try: + pos = (SubstitutionAlphabet.index(c) - 4) % len(alphabet) + except ValueError: + raise RuntimeError( + "Not a subst encoded character: %c in %s" % (c, s)) + out.append(alphabet[pos]) + alphabet = SubstitutionAlphabet + return "".join(out) -PayloadsByGuid = {} -def DecodeDomain(domain: str): +Guid = int + + +def isprintable(c: int) -> bool: + return c >= 0x20 and c <= 0x7f + + +def quopri(buf: bytes) -> str: + return codecs.encode(buf, "quopri").decode("utf-8") + + +class DGADecoder: + def __init__(self, guid: Guid): + self.guid = guid + self.history = [] + self._decoder = self.DecodeSubst + + def decode(self, s: str): + if s.startswith("00"): + self._decoder = self.DecodeEsab32 + # We'll throw away the information about which is first, + # since we do a computationally intensive trick later to determine ordering + s = s[2:] + self.history.insert(0, s) + else: + self.history.append(s) + return self._decoder() + + def DecodeSubst(self) -> str: + decodes = {DecodeSubst(x) for x in self.history} + return ''.join(sorted(decodes, key=len, reverse=True)) + + def DecodeEsab32(self) -> str: + history = {x.rstrip("0") for x in self.history} + + # "Why don't we just mix up absolutely everything and see what happens?" + # -- Ridcully, in Terry Pratchett's "The Hogfather" + possibilities = [] + for attempt in itertools.permutations(history): + acc, abits = DecodeEsab32(''.join(attempt)) + length = abits // 8 + if abits % 8: + buf = acc.to_bytes(length+1, 'little') + else: + buf = acc.to_bytes(length, 'little') + buf = buf[:length] + if sum(isprintable(b) for b in buf) == length: + # Yay it's probably okay + possibilities.append(buf) + # Well, we tried. + if not possibilities: + return quopri(buf) + else: + return " | ".join(quopri(buf) for buf in possibilities) + + +DecodersByGuid = {} + + +def DecodeDomain(domain: str) -> (Guid, int, str): s = domain.strip() foundDomain = None for d in knownDomains: @@ -110,54 +160,49 @@ def DecodeDomain(domain: str): assert(s[-1] == '.') s = s[:-1] + if foundDomain == "avsvmcloud.com": + return (None, None, "[Probably not a Sunburst domain]") if len(s) < 16: - return (None, None, "[unable to decode: too short for any known decoding rules]") - - eguid = DecodeBsae32(s[:15]) + return (None, None, "[too short]") + + dec, _ = DecodeEsab32(s[:15]) + eguid = dec.to_bytes(10, 'little')[:9] + guid = int.from_bytes(xor(eguid[0:1], eguid[1:]), 'big') + unknown_a = s[15] - guid = xor(eguid[0:1], eguid[1:]) - s = s[16:] + payload = s[16:] - payloads = PayloadsByGuid.setdefault(guid, []) - if s not in payloads: - if s.startswith("00"): - payloads.insert(0, s) - else: - payloads.append(s) - # People: friggin' preserve metadata, ugh. - # If I gave you every line of The Empire Strikes Back, - # sorted alphabetically, without timestamps, - # could you reconstruct the movie? - payloads.sort(key=len, reverse=True) + decoder = DecodersByGuid.get(guid) + if not decoder: + decoder = DGADecoder(guid) + DecodersByGuid[guid] = decoder - payload = ''.join(payloads) + decoded = decoder.decode(payload) - if payload.startswith("00"): - buf = DecodeBsae32(payload[2:]) - decoded = codecs.encode(buf, "quopri").decode("utf-8") - else: - decoded = DecodeSubst(payload) return (guid, unknown_a, decoded) + class TextReader: def __init__(self, infile): self.infile = infile self.fieldnames = ["name"] - + def __iter__(self): for s in self.infile: yield {"name": s.strip()} - + class CsvReader: def __init__(self, infile): self.reader = csv.DictReader(infile) - self.fieldnames = self.reader.fieldnames + ["guid", "unknown a", "decode"] + self.fieldnames = self.reader.fieldnames + \ + ["guid", "unknown a", "decode"] def __iter__(self): for record in self.reader: yield record + def main(): parser = argparse.ArgumentParser( description="Decode sunburst Domain Generation Algorithm (DGA) names") @@ -188,13 +233,14 @@ def main(): fieldnames = reader.fieldnames + ["guid", "unknown a", "decode"] writer = csv.DictWriter(args.outfile, fieldnames) writer.writeheader() - for record in reader: + for record in reader: name = record.get("name") or record.get("fqdn") guid, unknown_a, ptext = DecodeDomain(name) - record["guid"] = int.from_bytes(guid or b"", "big") + record["guid"] = guid record["unknown a"] = unknown_a record["decode"] = ptext writer.writerow(record) - + + if __name__ == '__main__': main()