sunburst-decoder/sunburst.py

247 lines
6.9 KiB
Python
Executable File

#! /usr/bin/python3
# Neale Pickett <neale@lanl.gov>
# Unclassified/FOUO
#
# Created: 2020-12-14 16:49:51
# Last-modified: 2020-12-22 21:42:40
#
# Based on work by @RedDrip7 (twitter),
# who should be getting more credit in the English-speaking world.
import argparse
import base64
import codecs
import csv
import itertools
import re
import sys
knownDomains = [
"appsync-api.us-east-1.avsvmcloud.com",
"appsync-api.us-east-2.avsvmcloud.com",
"appsync-api.us-west-2.avsvmcloud.com",
"appsync-api.eu-west-1.avsvmcloud.com",
"avsvmcloud.com",
]
def xor(key, buf):
return bytes(b ^ k for b, k in zip(buf, itertools.cycle(key)))
Esab32Alphabet = "ph2eifo3n5utg1j8d94qrvbmk0sal76c"
SubstitutionAlphabet = 'rq3gsalt6u1iyfzop572d49bnx8cvmkewhj'
SubstitutionAlphabet0 = '0_-.'
def DecodeBase32(s: str):
"""Not used by sunburst.
If Sunburst actually used Base32, this would work to decode things.
It doesn't work.
"""
trans = str.maketrans(Esab32Alphabet,
"ABCDEFGHIJKLMNOPQRSTUVWXYZ2345678")
t = s.translate(trans)
while len(t) % 8 > 0:
t += '='
return base64.b32decode(t)
def DecodeEsab32(s: str) -> (int, int):
"""Decode using big-endian base32 algorithm.
Returns a bigint, and the number of bits contained therein
"""
acc = bits = 0
for c in s:
try:
p = Esab32Alphabet.index(c)
except ValueError:
raise RuntimeError(
"Not an Esab32 encoded character: %c in %s" % (c, s))
acc |= p << bits
bits += 5
return acc, bits
def DecodeSubst(s: str) -> str:
alphabet = SubstitutionAlphabet
out = []
for c in s:
if c == '0':
alphabet = SubstitutionAlphabet0
else:
try:
pos = (SubstitutionAlphabet.index(c) - 4) % len(alphabet)
except ValueError:
raise RuntimeError(
"Not a subst encoded character: %c in %s" % (c, s))
out.append(alphabet[pos])
alphabet = SubstitutionAlphabet
return "".join(out)
Guid = int
def isprintable(c: int) -> bool:
return c >= 0x20 and c <= 0x7f
def quopri(buf: bytes) -> str:
return codecs.encode(buf, "quopri").decode("utf-8")
class DGADecoder:
def __init__(self, guid: Guid):
self.guid = guid
self.history = []
self._decoder = self.DecodeSubst
def decode(self, s: str):
if s.startswith("00"):
self._decoder = self.DecodeEsab32
# We'll throw away the information about which is first,
# since we do a computationally intensive trick later to determine ordering
s = s[2:]
self.history.insert(0, s)
else:
self.history.append(s)
return self._decoder()
def DecodeSubst(self) -> str:
decodes = {DecodeSubst(x) for x in self.history}
return ''.join(sorted(decodes, key=len, reverse=True))
def DecodeEsab32(self) -> str:
history = {x.rstrip("0") for x in self.history}
# "Why don't we just mix up absolutely everything and see what happens?"
# -- Ridcully, in Terry Pratchett's "The Hogfather"
possibilities = []
for attempt in itertools.permutations(history):
acc, abits = DecodeEsab32(''.join(attempt))
length = abits // 8
if abits % 8:
buf = acc.to_bytes(length+1, 'little')
else:
buf = acc.to_bytes(length, 'little')
buf = buf[:length]
if sum(isprintable(b) for b in buf) == length:
# Yay it's probably okay
possibilities.append(buf)
# Well, we tried.
if not possibilities:
return quopri(buf)
else:
return " | ".join(quopri(buf) for buf in possibilities)
DecodersByGuid = {}
def DecodeDomain(domain: str) -> (Guid, int, str):
s = domain.strip()
foundDomain = None
for d in knownDomains:
if s.endswith(d):
foundDomain = d
break
if not foundDomain:
raise RuntimeError("Can't find domain for %s" % s)
s = s[:-len(foundDomain)]
if not s:
return (None, None, "[no data transmitted]")
assert(s[-1] == '.')
s = s[:-1]
if foundDomain == "avsvmcloud.com":
return (None, None, "[Probably not a Sunburst domain]")
if len(s) < 16:
return (None, None, "[too short]")
dec, _ = DecodeEsab32(s[:15])
eguid = dec.to_bytes(10, 'little')[:9]
guid = int.from_bytes(xor(eguid[0:1], eguid[1:]), 'big')
unknown_a = s[15]
payload = s[16:]
decoder = DecodersByGuid.get(guid)
if not decoder:
decoder = DGADecoder(guid)
DecodersByGuid[guid] = decoder
decoded = decoder.decode(payload)
return (guid, unknown_a, decoded)
class TextReader:
def __init__(self, infile):
self.infile = infile
self.fieldnames = ["name"]
def __iter__(self):
for s in self.infile:
yield {"name": s.strip()}
class CsvReader:
def __init__(self, infile):
self.reader = csv.DictReader(infile)
self.fieldnames = self.reader.fieldnames + \
["guid", "unknown a", "decode"]
def __iter__(self):
for record in self.reader:
yield record
def main():
parser = argparse.ArgumentParser(
description="Decode sunburst Domain Generation Algorithm (DGA) names")
parser.add_argument("--text", dest="input", action="store_const", const="text",
help="Parse bambenek-style: list of fqdns, one per line")
parser.add_argument("--csv", dest="input", action="store_const", const="csv",
help="Parse CSV: records must be in a 'name' or 'fqdn' field")
parser.add_argument("infile", nargs="?",
type=argparse.FileType("r"), default=sys.stdin)
parser.add_argument("--outfile", nargs="?",
type=argparse.FileType("w"), default=sys.stdout,
help="CSV file to write (default stdout)")
args = parser.parse_args()
reader = None
if args.input == "text":
reader = TextReader(args.infile)
elif args.input == "csv":
reader = CsvReader(args.infile)
elif args.infile.name.endswith(".txt"):
reader = TextReader(args.infile)
elif args.infile.name.endswith(".csv"):
reader = CsvReader(args.infile)
else:
parser.print_help()
return
fieldnames = reader.fieldnames + ["guid", "unknown a", "decode"]
writer = csv.DictWriter(args.outfile, fieldnames)
writer.writeheader()
for record in reader:
name = record.get("name") or record.get("fqdn")
guid, unknown_a, ptext = DecodeDomain(name)
record["guid"] = guid
record["unknown a"] = unknown_a
record["decode"] = ptext
writer.writerow(record)
if __name__ == '__main__':
main()