#! /usr/bin/python3 # Neale Pickett # # Created: 2020-12-14 16:49:51 # Last-modified: 2021-05-06 11:09:55 # # Based on work by @RedDrip7 (twitter), # and Prevasio (https://blog.prevasio.com/2020/12/sunburst-backdoor-deeper-look-into.html) # This is public domain software. The public may copy, distribute, prepare derivative works and # publicly display this software without charge, provided that this Notice, the statement # of reserved government right, and any statement of authorship are reproduced on all copies. # If software is modified to produce derivative works, such modified software should # be clearly marked, so as not to confuse it with the version available from LANL. # NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY THIS LICENSE. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # NEITHER THE UNITED STATES NOR THE UNITED STATES DEPARTMENT OF ENERGY/NATIONAL # NUCLEAR SECURITY ADMINSTRATION, NOR Triad National Security, LLC. NOR ANY OF THEIR # EMPLOYEES, MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL LIABILITY # OR RESPONSIBILITY FOR THE ACCURACY, COMPLETENESS, OR USEFULNESS OF ANY INFORMATION, # APPARATUS, PRODUCT, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD NOT # INFRINGE PRIVATELY OWNED RIGHTS. # IN NO EVENT SHALL THE U.S. GOVERNMENT OR ITS CONTRACTORS BE LIABLE FOR ANY DIRECT, # INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, # BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY # OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED # OF THE POSSIBILITY OF SUCH DAMAGE. import argparse import base64 import codecs import csv import itertools import re import sys import time knownDomains = [ "appsync-api.us-east-1.avsvmcloud.com", "appsync-api.us-east-2.avsvmcloud.com", "appsync-api.us-west-2.avsvmcloud.com", "appsync-api.eu-west-1.avsvmcloud.com", ] def xor(key, buf): return bytes(b ^ k for b, k in zip(buf, itertools.cycle(key))) Esab32Alphabet = "ph2eifo3n5utg1j8d94qrvbmk0sal76c" SubstitutionAlphabet = 'rq3gsalt6u1iyfzop572d49bnx8cvmkewhj' SubstitutionAlphabet0 = '0_-.' SequenceAlphabet = '0123456789abcdefghijklmnopqrstuvwxyz' Apps = [ "Windows Live OneCare / Windows Defender", "Windows Defender Advanced Threat Protection", "Microsoft Defender for Identity", "Carbon Black", "CrowdStrike", "FireEye", "ESET", "F-Secure", ] def DecodeBase32(s: str): """Not used by sunburst. If Sunburst actually used Base32, this would work to decode things. It doesn't work. """ trans = str.maketrans(Esab32Alphabet, "ABCDEFGHIJKLMNOPQRSTUVWXYZ2345678") t = s.translate(trans) while len(t) % 8 > 0: t += '=' return base64.b32decode(t) def DecodeEsab32(s: str) -> (int, int): """Decode using big-endian base32 algorithm. Returns a bigint, and the number of bits contained therein """ acc = bits = 0 for c in s: try: p = Esab32Alphabet.index(c) except ValueError: raise RuntimeError( "Not an Esab32 encoded character: %c in %s" % (c, s)) acc |= p << bits bits += 5 return acc, bits def DecodeSubst(s: str) -> str: alphabet = SubstitutionAlphabet out = [] for c in s: if c == '0': # Use alternate alphabet next time alphabet = SubstitutionAlphabet0 continue try: pos = (SubstitutionAlphabet.index(c) - 4) % len(alphabet) except ValueError: raise RuntimeError( "Not a subst encoded character: %c in %s" % (c, s)) out.append(alphabet[pos]) alphabet = SubstitutionAlphabet return "".join(out) Guid = int def isprintable(c: int) -> bool: return c >= 0x20 and c <= 0x7f def quopri(buf: bytes) -> str: return codecs.encode(buf, "quopri").decode("utf-8") class DGADecoder: def __init__(self, guid: Guid): self.guid = guid self.history = [] self._decoder = self.DecodeSubst def decode(self, s: str): if s.startswith("00"): self._decoder = self.DecodeEsab32 # We'll throw away the information about which is first, # since we do a computationally intensive trick later to determine ordering s = s[2:] self.history.insert(0, s) else: self.history.append(s) return self._decoder() def DecodeSubst(self) -> str: decodes = {DecodeSubst(x) for x in self.history} return ''.join(sorted(decodes, key=len, reverse=True)) def DecodeEsab32(self) -> str: history = {x.rstrip("0") for x in self.history} # "Why don't we just mix up absolutely everything and see what happens?" # -- Ridcully, in Terry Pratchett's "The Hogfather" possibilities = [] for attempt in itertools.permutations(history): acc, abits = DecodeEsab32(''.join(attempt)) length = abits // 8 if abits % 8: buf = acc.to_bytes(length+1, 'little') else: buf = acc.to_bytes(length, 'little') buf = buf[:length] if sum(isprintable(b) for b in buf) == length: # Yay it's probably okay possibilities.append(buf) # Well, we tried. if not possibilities: return quopri(buf) else: return " | ".join(quopri(buf) for buf in possibilities) DecodersByGuid = {} def DecodeDomain(domain: str) -> (Guid, int, str): s = domain.strip() foundDomain = None for d in knownDomains: if s.endswith(d): foundDomain = d break if not foundDomain: return (None, None, "[Probably not a Sunburst domain]") s = s[:-len(foundDomain)] if not s: return (None, None, "[no data transmitted]") assert(s[-1] == '.') s = s[:-1] if len(s) < 16: return (None, None, "[too short]") c0 = s.encode("ASCII")[0] # https://blog.prevasio.com/2020/12/sunburst-backdoor-part-iii-dga-security.html sequence = (c0 % 36) - SequenceAlphabet.index(s[15]) if 0 <= sequence < 3: dec, _ = DecodeEsab32(s[:15]) eguid = dec.to_bytes(10, 'little')[:9] guid = xor(eguid[0:1], eguid[1:]) payload = s[16:] decoder = DecodersByGuid.get(guid) if not decoder: decoder = DGADecoder(guid) DecodersByGuid[guid] = decoder decoded = decoder.decode(payload) else: # https://blog.prevasio.com/2020/12/sunburst-backdoor-part-iii-dga-security.html print(s, c0, sequence) dec1num, bits = DecodeEsab32(s) dec1len = bits // 8 dec1 = dec1num.to_bytes(dec1len+1, "little")[:dec1len] dec2 = xor(dec1[:1], dec1[1:]) guid = xor(dec2[9:11], dec2[0:8]) tslen = int.from_bytes(dec2[8:11], "big") length = tslen >> (8+8+4) epoch = 1262329200 + ((tslen & 0xfffff) << 2) # 4s intervals since 2010-01-01 timestamp = time.gmtime(epoch) print(dec2[8:], tslen, length, timestamp) state = int.from_bytes(dec2[15:17], "big") decodedStrings = [] for i in range(len(Apps)): appState = state >> (i * 2) app = Apps[i] appStateString = [] if appState & 0b01: appStateString.append("running") if appState & 0b10: appStateString.append("stopped") if appStateString: decodedStrings.append("%s [%s]" % (app, ",".join(appStateString))) decoded = "\n".join(decodedStrings) return (guid, sequence, decoded) class TextReader: def __init__(self, infile): self.infile = infile self.fieldnames = ["name"] def __iter__(self): for s in self.infile: yield {"name": s.strip()} class CsvReader: def __init__(self, infile): self.reader = csv.DictReader(infile) self.fieldnames = self.reader.fieldnames def __iter__(self): for record in self.reader: yield record def main(): parser = argparse.ArgumentParser( description="Decode sunburst Domain Generation Algorithm (DGA) names") parser.add_argument("--text", dest="input", action="store_const", const="text", help="Parse bambenek-style: list of fqdns, one per line") parser.add_argument("--csv", dest="input", action="store_const", const="csv", help="Parse CSV: records must be in a 'name' or 'fqdn' field") parser.add_argument("infile", nargs="?", type=argparse.FileType("r"), default=sys.stdin) parser.add_argument("--outfile", nargs="?", type=argparse.FileType("w"), default=sys.stdout, help="CSV file to write (default stdout)") args = parser.parse_args() reader = None if args.input == "text": reader = TextReader(args.infile) elif args.input == "csv": reader = CsvReader(args.infile) elif args.infile.name.endswith(".txt"): reader = TextReader(args.infile) elif args.infile.name.endswith(".csv"): reader = CsvReader(args.infile) else: parser.print_help() return fieldnames = reader.fieldnames + ["guid", "sequence", "decode"] writer = csv.DictWriter(args.outfile, fieldnames) writer.writeheader() for record in reader: name = record.get("name") or record.get("fqdn") guid, sequence, ptext = DecodeDomain(name) record["guid"] = int.from_bytes(guid, "big") record["sequence"] = sequence record["decode"] = ptext writer.writerow(record) if __name__ == '__main__': main()