310 lines
10 KiB
Python
Executable file
310 lines
10 KiB
Python
Executable file
#! /usr/bin/python3
|
|
|
|
# Neale Pickett <neale@lanl.gov>
|
|
#
|
|
# Created: 2020-12-14 16:49:51
|
|
# Last-modified: 2021-05-06 11:09:55
|
|
#
|
|
# Based on work by @RedDrip7 (twitter),
|
|
# and Prevasio (https://blog.prevasio.com/2020/12/sunburst-backdoor-deeper-look-into.html)
|
|
|
|
# This is public domain software. The public may copy, distribute, prepare derivative works and
|
|
# publicly display this software without charge, provided that this Notice, the statement
|
|
# of reserved government right, and any statement of authorship are reproduced on all copies.
|
|
# If software is modified to produce derivative works, such modified software should
|
|
# be clearly marked, so as not to confuse it with the version available from LANL.
|
|
|
|
# NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY THIS LICENSE.
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
|
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
|
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
|
|
# NEITHER THE UNITED STATES NOR THE UNITED STATES DEPARTMENT OF ENERGY/NATIONAL
|
|
# NUCLEAR SECURITY ADMINSTRATION, NOR Triad National Security, LLC. NOR ANY OF THEIR
|
|
# EMPLOYEES, MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR ASSUMES ANY LEGAL LIABILITY
|
|
# OR RESPONSIBILITY FOR THE ACCURACY, COMPLETENESS, OR USEFULNESS OF ANY INFORMATION,
|
|
# APPARATUS, PRODUCT, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD NOT
|
|
# INFRINGE PRIVATELY OWNED RIGHTS.
|
|
|
|
# IN NO EVENT SHALL THE U.S. GOVERNMENT OR ITS CONTRACTORS BE LIABLE FOR ANY DIRECT,
|
|
# INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
|
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
|
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
|
|
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
|
|
# OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED
|
|
# OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
import argparse
|
|
import base64
|
|
import codecs
|
|
import csv
|
|
import itertools
|
|
import re
|
|
import sys
|
|
import time
|
|
|
|
|
|
knownDomains = [
|
|
"appsync-api.us-east-1.avsvmcloud.com",
|
|
"appsync-api.us-east-2.avsvmcloud.com",
|
|
"appsync-api.us-west-2.avsvmcloud.com",
|
|
"appsync-api.eu-west-1.avsvmcloud.com",
|
|
]
|
|
|
|
|
|
def xor(key, buf):
|
|
return bytes(b ^ k for b, k in zip(buf, itertools.cycle(key)))
|
|
|
|
|
|
Esab32Alphabet = "ph2eifo3n5utg1j8d94qrvbmk0sal76c"
|
|
SubstitutionAlphabet = 'rq3gsalt6u1iyfzop572d49bnx8cvmkewhj'
|
|
SubstitutionAlphabet0 = '0_-.'
|
|
SequenceAlphabet = '0123456789abcdefghijklmnopqrstuvwxyz'
|
|
|
|
Apps = [
|
|
"Windows Live OneCare / Windows Defender",
|
|
"Windows Defender Advanced Threat Protection",
|
|
"Microsoft Defender for Identity",
|
|
"Carbon Black",
|
|
"CrowdStrike",
|
|
"FireEye",
|
|
"ESET",
|
|
"F-Secure",
|
|
]
|
|
|
|
|
|
def DecodeBase32(s: str):
|
|
"""Not used by sunburst.
|
|
|
|
If Sunburst actually used Base32, this would work to decode things.
|
|
It doesn't work.
|
|
"""
|
|
|
|
trans = str.maketrans(Esab32Alphabet,
|
|
"ABCDEFGHIJKLMNOPQRSTUVWXYZ2345678")
|
|
t = s.translate(trans)
|
|
while len(t) % 8 > 0:
|
|
t += '='
|
|
return base64.b32decode(t)
|
|
|
|
|
|
def DecodeEsab32(s: str) -> (int, int):
|
|
"""Decode using big-endian base32 algorithm.
|
|
|
|
Returns a bigint, and the number of bits contained therein
|
|
"""
|
|
|
|
acc = bits = 0
|
|
for c in s:
|
|
try:
|
|
p = Esab32Alphabet.index(c)
|
|
except ValueError:
|
|
raise RuntimeError(
|
|
"Not an Esab32 encoded character: %c in %s" % (c, s))
|
|
acc |= p << bits
|
|
bits += 5
|
|
return acc, bits
|
|
|
|
|
|
def DecodeSubst(s: str) -> str:
|
|
alphabet = SubstitutionAlphabet
|
|
out = []
|
|
for c in s:
|
|
if c == '0':
|
|
# Use alternate alphabet next time
|
|
alphabet = SubstitutionAlphabet0
|
|
continue
|
|
try:
|
|
pos = (SubstitutionAlphabet.index(c) - 4) % len(alphabet)
|
|
except ValueError:
|
|
raise RuntimeError(
|
|
"Not a subst encoded character: %c in %s" % (c, s))
|
|
out.append(alphabet[pos])
|
|
alphabet = SubstitutionAlphabet
|
|
return "".join(out)
|
|
|
|
|
|
Guid = int
|
|
|
|
|
|
def isprintable(c: int) -> bool:
|
|
return c >= 0x20 and c <= 0x7f
|
|
|
|
|
|
def quopri(buf: bytes) -> str:
|
|
return codecs.encode(buf, "quopri").decode("utf-8")
|
|
|
|
|
|
class DGADecoder:
|
|
def __init__(self, guid: Guid):
|
|
self.guid = guid
|
|
self.history = []
|
|
self._decoder = self.DecodeSubst
|
|
|
|
def decode(self, s: str):
|
|
if s.startswith("00"):
|
|
self._decoder = self.DecodeEsab32
|
|
# We'll throw away the information about which is first,
|
|
# since we do a computationally intensive trick later to determine ordering
|
|
s = s[2:]
|
|
self.history.insert(0, s)
|
|
else:
|
|
self.history.append(s)
|
|
return self._decoder()
|
|
|
|
def DecodeSubst(self) -> str:
|
|
decodes = {DecodeSubst(x) for x in self.history}
|
|
return ''.join(sorted(decodes, key=len, reverse=True))
|
|
|
|
def DecodeEsab32(self) -> str:
|
|
history = {x.rstrip("0") for x in self.history}
|
|
|
|
# "Why don't we just mix up absolutely everything and see what happens?"
|
|
# -- Ridcully, in Terry Pratchett's "The Hogfather"
|
|
possibilities = []
|
|
for attempt in itertools.permutations(history):
|
|
acc, abits = DecodeEsab32(''.join(attempt))
|
|
length = abits // 8
|
|
if abits % 8:
|
|
buf = acc.to_bytes(length+1, 'little')
|
|
else:
|
|
buf = acc.to_bytes(length, 'little')
|
|
buf = buf[:length]
|
|
if sum(isprintable(b) for b in buf) == length:
|
|
# Yay it's probably okay
|
|
possibilities.append(buf)
|
|
# Well, we tried.
|
|
if not possibilities:
|
|
return quopri(buf)
|
|
else:
|
|
return " | ".join(quopri(buf) for buf in possibilities)
|
|
|
|
|
|
DecodersByGuid = {}
|
|
|
|
|
|
def DecodeDomain(domain: str) -> (Guid, int, str):
|
|
s = domain.strip()
|
|
foundDomain = None
|
|
for d in knownDomains:
|
|
if s.endswith(d):
|
|
foundDomain = d
|
|
break
|
|
if not foundDomain:
|
|
return (None, None, "[Probably not a Sunburst domain]")
|
|
s = s[:-len(foundDomain)]
|
|
if not s:
|
|
return (None, None, "[no data transmitted]")
|
|
assert(s[-1] == '.')
|
|
s = s[:-1]
|
|
|
|
if len(s) < 16:
|
|
return (None, None, "[too short]")
|
|
|
|
c0 = s.encode("ASCII")[0]
|
|
# https://blog.prevasio.com/2020/12/sunburst-backdoor-part-iii-dga-security.html
|
|
sequence = (c0 % 36) - SequenceAlphabet.index(s[15])
|
|
|
|
if 0 <= sequence < 3:
|
|
dec, _ = DecodeEsab32(s[:15])
|
|
eguid = dec.to_bytes(10, 'little')[:9]
|
|
guid = xor(eguid[0:1], eguid[1:])
|
|
payload = s[16:]
|
|
decoder = DecodersByGuid.get(guid)
|
|
if not decoder:
|
|
decoder = DGADecoder(guid)
|
|
DecodersByGuid[guid] = decoder
|
|
decoded = decoder.decode(payload)
|
|
else:
|
|
# https://blog.prevasio.com/2020/12/sunburst-backdoor-part-iii-dga-security.html
|
|
print(s, c0, sequence)
|
|
dec1num, bits = DecodeEsab32(s)
|
|
dec1len = bits // 8
|
|
dec1 = dec1num.to_bytes(dec1len+1, "little")[:dec1len]
|
|
|
|
dec2 = xor(dec1[:1], dec1[1:])
|
|
|
|
guid = xor(dec2[9:11], dec2[0:8])
|
|
tslen = int.from_bytes(dec2[8:11], "big")
|
|
length = tslen >> (8+8+4)
|
|
epoch = 1262329200 + ((tslen & 0xfffff) << 2) # 4s intervals since 2010-01-01
|
|
timestamp = time.gmtime(epoch)
|
|
|
|
print(dec2[8:], tslen, length, timestamp)
|
|
state = int.from_bytes(dec2[15:17], "big")
|
|
decodedStrings = []
|
|
for i in range(len(Apps)):
|
|
appState = state >> (i * 2)
|
|
app = Apps[i]
|
|
appStateString = []
|
|
if appState & 0b01:
|
|
appStateString.append("running")
|
|
if appState & 0b10:
|
|
appStateString.append("stopped")
|
|
if appStateString:
|
|
decodedStrings.append("%s [%s]" % (app, ",".join(appStateString)))
|
|
decoded = "\n".join(decodedStrings)
|
|
|
|
return (guid, sequence, decoded)
|
|
|
|
|
|
class TextReader:
|
|
def __init__(self, infile):
|
|
self.infile = infile
|
|
self.fieldnames = ["name"]
|
|
|
|
def __iter__(self):
|
|
for s in self.infile:
|
|
yield {"name": s.strip()}
|
|
|
|
|
|
class CsvReader:
|
|
def __init__(self, infile):
|
|
self.reader = csv.DictReader(infile)
|
|
self.fieldnames = self.reader.fieldnames
|
|
|
|
def __iter__(self):
|
|
for record in self.reader:
|
|
yield record
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Decode sunburst Domain Generation Algorithm (DGA) names")
|
|
parser.add_argument("--text", dest="input", action="store_const", const="text",
|
|
help="Parse bambenek-style: list of fqdns, one per line")
|
|
parser.add_argument("--csv", dest="input", action="store_const", const="csv",
|
|
help="Parse CSV: records must be in a 'name' or 'fqdn' field")
|
|
parser.add_argument("infile", nargs="?",
|
|
type=argparse.FileType("r"), default=sys.stdin)
|
|
parser.add_argument("--outfile", nargs="?",
|
|
type=argparse.FileType("w"), default=sys.stdout,
|
|
help="CSV file to write (default stdout)")
|
|
args = parser.parse_args()
|
|
|
|
reader = None
|
|
if args.input == "text":
|
|
reader = TextReader(args.infile)
|
|
elif args.input == "csv":
|
|
reader = CsvReader(args.infile)
|
|
elif args.infile.name.endswith(".txt"):
|
|
reader = TextReader(args.infile)
|
|
elif args.infile.name.endswith(".csv"):
|
|
reader = CsvReader(args.infile)
|
|
else:
|
|
parser.print_help()
|
|
return
|
|
|
|
fieldnames = reader.fieldnames + ["guid", "sequence", "decode"]
|
|
writer = csv.DictWriter(args.outfile, fieldnames)
|
|
writer.writeheader()
|
|
for record in reader:
|
|
name = record.get("name") or record.get("fqdn")
|
|
guid, sequence, ptext = DecodeDomain(name)
|
|
record["guid"] = int.from_bytes(guid, "big")
|
|
record["sequence"] = sequence
|
|
record["decode"] = ptext
|
|
writer.writerow(record)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|