mirror of https://github.com/dirtbags/netarch.git
commit
ad8d412f73
|
@ -0,0 +1,16 @@
|
||||||
|
Dirtbags Netarch Library
|
||||||
|
========================
|
||||||
|
|
||||||
|
This is a library for advanced
|
||||||
|
[network archaeology](https://sites.google.com/view/cyberfire/foundry/classes/network-archaeology).
|
||||||
|
|
||||||
|
It provides a heavily field-tested framework for
|
||||||
|
exploring unknown TCP-based protocols,
|
||||||
|
and room to grow these explorations into full-blown decoders.
|
||||||
|
|
||||||
|
Get going
|
||||||
|
=========
|
||||||
|
|
||||||
|
Documentation sucks, sorry.
|
||||||
|
The way we go about things is to copy `dumbdecode.py` to a new file,
|
||||||
|
and start hacking onto it.
|
|
@ -0,0 +1,7 @@
|
||||||
|
Things We Need To Do
|
||||||
|
====================
|
||||||
|
|
||||||
|
* documentation
|
||||||
|
* remove lingering py2-isms
|
||||||
|
* more logical way to chain together dispatcher, tcp resequencer
|
||||||
|
* some way to parallelize work
|
|
@ -0,0 +1,13 @@
|
||||||
|
#! /usr/bin/python3
|
||||||
|
|
||||||
|
import sys
|
||||||
|
from netarch import ip
|
||||||
|
from netarch import *
|
||||||
|
|
||||||
|
s = None
|
||||||
|
reseq = ip.Dispatch(*sys.argv[1:])
|
||||||
|
for h, d in reseq:
|
||||||
|
srv, first, chunk = d
|
||||||
|
if not s:
|
||||||
|
s = ip.Session(first)
|
||||||
|
s.handle(srv, first, chunk, reseq.last)
|
246
gapstr.py
246
gapstr.py
|
@ -1,246 +0,0 @@
|
||||||
#! /usr/bin/python
|
|
||||||
|
|
||||||
## 2008 Massive Blowout
|
|
||||||
|
|
||||||
"""Functions to treat a list as a byte array with gaps.
|
|
||||||
|
|
||||||
Lists should have only byte and numeric items.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
import __init__
|
|
||||||
import sys
|
|
||||||
|
|
||||||
class GapString:
|
|
||||||
def __init__(self, init=None, drop='?'):
|
|
||||||
self.contents = []
|
|
||||||
self.length = 0
|
|
||||||
self.drop = drop
|
|
||||||
|
|
||||||
if init:
|
|
||||||
self.append(init)
|
|
||||||
|
|
||||||
def __len__(self):
|
|
||||||
return int(self.length)
|
|
||||||
|
|
||||||
def loss(self):
|
|
||||||
ret = 0
|
|
||||||
for i in self.contents:
|
|
||||||
try:
|
|
||||||
ret += i
|
|
||||||
except TypeError:
|
|
||||||
pass
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return '<GapString of length %d>' % self.length
|
|
||||||
|
|
||||||
def append(self, i):
|
|
||||||
try:
|
|
||||||
self.length += len(i)
|
|
||||||
self.contents.append(i)
|
|
||||||
except TypeError:
|
|
||||||
self.length += i
|
|
||||||
self.contents.append(i)
|
|
||||||
|
|
||||||
def pop(self, idx=-1):
|
|
||||||
item = self.contents.pop(idx)
|
|
||||||
try:
|
|
||||||
self.length -= item
|
|
||||||
except TypeError:
|
|
||||||
self.length -= len(item)
|
|
||||||
return GapString(item)
|
|
||||||
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
ret = []
|
|
||||||
for i in self.contents:
|
|
||||||
try:
|
|
||||||
ret.append(self.drop * i)
|
|
||||||
except TypeError:
|
|
||||||
ret.append(i)
|
|
||||||
return ''.join(ret)
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
for i in self.contents:
|
|
||||||
try:
|
|
||||||
for c in i:
|
|
||||||
yield c
|
|
||||||
except TypeError:
|
|
||||||
for j in range(i):
|
|
||||||
yield self.drop
|
|
||||||
|
|
||||||
def __nonzero__(self):
|
|
||||||
return self.length > 0
|
|
||||||
|
|
||||||
def hasgaps(self):
|
|
||||||
for i in self.contents:
|
|
||||||
try:
|
|
||||||
len(i)
|
|
||||||
except TypeError:
|
|
||||||
return True
|
|
||||||
return False
|
|
||||||
|
|
||||||
def hexdump(self, fd=sys.stdout):
|
|
||||||
offset = 0
|
|
||||||
|
|
||||||
d = __init__.HexDumper(fd)
|
|
||||||
for i in self.contents:
|
|
||||||
try:
|
|
||||||
for j in xrange(i):
|
|
||||||
d.dump_drop()
|
|
||||||
except TypeError:
|
|
||||||
for c in i:
|
|
||||||
d.dump_chr(c)
|
|
||||||
d.finish()
|
|
||||||
|
|
||||||
def extend(self, other):
|
|
||||||
self.contents += other.contents
|
|
||||||
self.length += other.length
|
|
||||||
|
|
||||||
def __getslice__(self, start, end):
|
|
||||||
end = min(self.length, end)
|
|
||||||
start = min(self.length, start)
|
|
||||||
|
|
||||||
new = self.__class__(drop=self.drop)
|
|
||||||
new.length = max(end - start, 0)
|
|
||||||
if new.length == 0:
|
|
||||||
new.contents = []
|
|
||||||
return new
|
|
||||||
new.contents = self.contents[:]
|
|
||||||
|
|
||||||
l = self.length - new.length - start
|
|
||||||
|
|
||||||
# Trim off the beginning
|
|
||||||
while start >= 0:
|
|
||||||
i = new.contents.pop(0)
|
|
||||||
try:
|
|
||||||
start -= i
|
|
||||||
if start < 0:
|
|
||||||
new.contents.insert(0, -start)
|
|
||||||
except TypeError:
|
|
||||||
start -= len(i)
|
|
||||||
if start < 0:
|
|
||||||
new.contents.insert(0, i[start:])
|
|
||||||
|
|
||||||
# Trim off the end
|
|
||||||
while l >= 0:
|
|
||||||
i = new.contents.pop()
|
|
||||||
try:
|
|
||||||
l -= i
|
|
||||||
if l < 0:
|
|
||||||
new.contents.append(-l)
|
|
||||||
except TypeError:
|
|
||||||
l -= len(i)
|
|
||||||
if l < 0:
|
|
||||||
new.contents.append(i[:-l])
|
|
||||||
|
|
||||||
return new
|
|
||||||
|
|
||||||
def __getitem__(self, idx):
|
|
||||||
if False:
|
|
||||||
c = self[idx:idx+1]
|
|
||||||
if c.hasgaps():
|
|
||||||
return self.drop[0]
|
|
||||||
else:
|
|
||||||
return c.contents[0][0]
|
|
||||||
else:
|
|
||||||
l = 0
|
|
||||||
for i in self.contents:
|
|
||||||
try:
|
|
||||||
l += len(i)
|
|
||||||
except TypeError:
|
|
||||||
l += i
|
|
||||||
if l > idx:
|
|
||||||
offs = idx - l
|
|
||||||
try:
|
|
||||||
return i[offs]
|
|
||||||
except:
|
|
||||||
return self.drop[0]
|
|
||||||
raise IndexError('Out of bounds')
|
|
||||||
|
|
||||||
def __add__(self, other):
|
|
||||||
if isinstance(other, str):
|
|
||||||
self.append(other)
|
|
||||||
else:
|
|
||||||
new = self.__class__(drop=self.drop)
|
|
||||||
new.extend(self)
|
|
||||||
new.extend(other)
|
|
||||||
return new
|
|
||||||
|
|
||||||
def __xor__(self, mask):
|
|
||||||
try:
|
|
||||||
mask = [ord(c) for c in mask]
|
|
||||||
except TypeError:
|
|
||||||
pass
|
|
||||||
try:
|
|
||||||
masklen = len(mask)
|
|
||||||
except TypeError:
|
|
||||||
masklen = 1
|
|
||||||
mask = [mask]
|
|
||||||
|
|
||||||
new = self.__class__(drop=self.drop)
|
|
||||||
for i in self.contents:
|
|
||||||
try:
|
|
||||||
r = []
|
|
||||||
offset = len(new) % masklen
|
|
||||||
for c in i:
|
|
||||||
o = ord(c)
|
|
||||||
r.append(chr(o ^ mask[offset]))
|
|
||||||
offset = (offset + 1) % masklen
|
|
||||||
new.append(''.join(r))
|
|
||||||
except TypeError:
|
|
||||||
new.append(i)
|
|
||||||
return new
|
|
||||||
|
|
||||||
def index(self, needle):
|
|
||||||
pos = 0
|
|
||||||
for i in self.contents:
|
|
||||||
try:
|
|
||||||
return pos + i.index(needle)
|
|
||||||
except AttributeError:
|
|
||||||
pos += i
|
|
||||||
except ValueError:
|
|
||||||
pos += len(i)
|
|
||||||
raise ValueError('substring not found')
|
|
||||||
|
|
||||||
def split(self, pivot=' ', times=None):
|
|
||||||
ret = []
|
|
||||||
cur = self
|
|
||||||
while (not times) or (len(ret) < times):
|
|
||||||
try:
|
|
||||||
pos = cur.index(pivot)
|
|
||||||
except ValueError:
|
|
||||||
break
|
|
||||||
ret.append(cur[:pos])
|
|
||||||
cur = cur[pos+len(pivot):]
|
|
||||||
ret.append(cur)
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def startswith(self, what):
|
|
||||||
return (what == str(self[:len(what)]))
|
|
||||||
|
|
||||||
def endswith(self, what):
|
|
||||||
return (what == str(self[-len(what):]))
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
gs = GapString()
|
|
||||||
gs.append('hi')
|
|
||||||
assert str(gs) == 'hi'
|
|
||||||
assert str(gs[:40]) == 'hi'
|
|
||||||
gs.append(3)
|
|
||||||
assert str(gs) == 'hi???'
|
|
||||||
assert str(gs[:40]) == 'hi???'
|
|
||||||
assert str(gs[:3]) == 'hi?'
|
|
||||||
assert str(gs[-4:]) == 'i???'
|
|
||||||
assert str(gs + gs) == 'hi???hi???'
|
|
||||||
assert str(gs ^ 1) == 'ih???'
|
|
||||||
|
|
||||||
gs = GapString()
|
|
||||||
gs.append('123456789A')
|
|
||||||
assert str(gs[:4]) == '1234'
|
|
||||||
assert len(gs[:4]) == 4
|
|
||||||
assert len(gs[6:]) == 4
|
|
||||||
assert str(gs[:0]) == ''
|
|
||||||
|
|
|
@ -1,156 +1,156 @@
|
||||||
#! /usr/bin/python
|
#! /usr/bin/python3
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
## 2008 Massive Blowout
|
|
||||||
|
|
||||||
|
import binascii
|
||||||
import sys
|
import sys
|
||||||
import struct
|
import struct
|
||||||
|
|
||||||
stdch = (u'␀·········␊··␍··'
|
stdch = (
|
||||||
u'················'
|
'␀·········␊··␍··'
|
||||||
u' !"#$%&\'()*+,-./'
|
'················'
|
||||||
u'0123456789:;<=>?'
|
' !"#$%&\'()*+,-./'
|
||||||
u'@ABCDEFGHIJKLMNO'
|
'0123456789:;<=>?'
|
||||||
u'PQRSTUVWXYZ[\]^_'
|
'@ABCDEFGHIJKLMNO'
|
||||||
u'`abcdefghijklmno'
|
'PQRSTUVWXYZ[\]^_'
|
||||||
u'pqrstuvwxyz{|}~·'
|
'`abcdefghijklmno'
|
||||||
u'················'
|
'pqrstuvwxyz{|}~·'
|
||||||
u'················'
|
'················'
|
||||||
u'················'
|
'················'
|
||||||
u'················'
|
'················'
|
||||||
u'················'
|
'················'
|
||||||
u'················'
|
'················'
|
||||||
u'················'
|
'················'
|
||||||
u'················')
|
'················'
|
||||||
|
'················'
|
||||||
|
)
|
||||||
|
|
||||||
decch = (u'␀␁␂␃␄␅␆␇␈␉␊␋␌␍␎␏'
|
decch = (
|
||||||
u'␐␑␒␓␔␕␖␗␘␙␚·····'
|
'␀␁␂␃␄␅␆␇␈␉␊␋␌␍␎␏'
|
||||||
u'␠!"#$%&\'()*+,-./'
|
'␐␑␒␓␔␕␖␗␘␙␚·····'
|
||||||
u'0123456789:;<=>?'
|
'␠!"#$%&\'()*+,-./'
|
||||||
u'@ABCDEFGHIJKLMNO'
|
'0123456789:;<=>?'
|
||||||
u'PQRSTUVWXYZ[\]^_'
|
'@ABCDEFGHIJKLMNO'
|
||||||
u'`abcdefghijklmno'
|
'PQRSTUVWXYZ[\]^_'
|
||||||
u'pqrstuvwxyz{|}~␡'
|
'`abcdefghijklmno'
|
||||||
u'················'
|
'pqrstuvwxyz{|}~␡'
|
||||||
u'················'
|
'················'
|
||||||
u'················'
|
'················'
|
||||||
u'················'
|
'················'
|
||||||
u'················'
|
'················'
|
||||||
u'················'
|
'················'
|
||||||
u'················'
|
'················'
|
||||||
u'················')
|
'················'
|
||||||
|
'················'
|
||||||
|
)
|
||||||
|
|
||||||
cgach = (u'·☺☻♥♦♣♠•◘○◙♂♀♪♫☼'
|
cgach = (
|
||||||
u'►◄↕‼¶§▬↨↑↓→←∟↔▲▼'
|
'□☺☻♥♦♣♠•◘○◙♂♀♪♫☼'
|
||||||
u' !"#$%&\'()*+,-./'
|
'►◄↕‼¶§▬↨↑↓→←∟↔▲▼'
|
||||||
u'0123456789:;<=>?'
|
' !"#$%&\'()*+,-./'
|
||||||
u'@ABCDEFGHIJKLMNO'
|
'0123456789:;<=>?'
|
||||||
u'PQRSTUVWXYZ[\]^_'
|
'@ABCDEFGHIJKLMNO'
|
||||||
u'`abcdefghijklmno'
|
'PQRSTUVWXYZ[\]^_'
|
||||||
u'pqrstuvwxyz{|}~⌂'
|
'`abcdefghijklmno'
|
||||||
u'ÇüéâäàåçêëèïîìÄÅ'
|
'pqrstuvwxyz{|}~⌂'
|
||||||
u'ÉæÆôöòûùÿÖÜ¢£¥₧ƒ'
|
'ÇüéâäàåçêëèïîìÄÅ'
|
||||||
u'áíóúñѪº¿⌐¬½¼¡«»'
|
'ÉæÆôöòûùÿÖÜ¢£¥₧ƒ'
|
||||||
u'░▒▓│┤╡╢╖╕╣║╗╝╜╛┐'
|
'áíóúñѪº¿⌐¬½¼¡«»'
|
||||||
u'└┴┬├─┼╞╟╚╔╩╦╠═╬╧'
|
'░▒▓│┤╡╢╖╕╣║╗╝╜╛┐'
|
||||||
u'╨╤╥╙╘╒╓╫╪┘┌█▄▌▐▀'
|
'└┴┬├─┼╞╟╚╔╩╦╠═╬╧'
|
||||||
u'αßΓπΣσµτΦΘΩδ∞φε∩'
|
'╨╤╥╙╘╒╓╫╪┘┌█▄▌▐▀'
|
||||||
u'≡±≥≤⌠⌡÷≈°∙·√ⁿ²■¤')
|
'αßΓπΣσµτΦΘΩδ∞φε∩'
|
||||||
|
'≡±≥≤⌠⌡÷≈°∙·√ⁿ²■¤'
|
||||||
shpch = (u'␀☺☻♥♦♣♠•◘○◙♂♀♪♫☼'
|
)
|
||||||
u'►◄↕‼¶§▬↨↑↓→←∟↔▲▼'
|
|
||||||
u'␣!"#$%&\'()*+,-./'
|
|
||||||
u'0123456789:;<=>?'
|
|
||||||
u'@ABCDEFGHIJKLMNO'
|
|
||||||
u'PQRSTUVWXYZ[\]^_'
|
|
||||||
u'`abcdefghijklmno'
|
|
||||||
u'pqrstuvwxyz{|}~⌂'
|
|
||||||
u'ÇüéâäàåçêëèïîìÄÅ'
|
|
||||||
u'ÉæÆôöòûùÿÖÜ¢£¥₧ƒ'
|
|
||||||
u'áíóúñѪº¿⌐¬½¼¡«»'
|
|
||||||
u'░▒▓│┤╡╢╖╕╣║╗╝╜╛┐'
|
|
||||||
u'└┴┬├─┼╞╟╚╔╩╦╠═╬╧'
|
|
||||||
u'╨╤╥╙╘╒╓╫╪┘┌█▄▌▐▀'
|
|
||||||
u'αßΓπΣσµτΦΘΩδ∞φε∩'
|
|
||||||
u'≡±≥≤⌠⌡÷≈°∙·√ⁿ²■¤')
|
|
||||||
|
|
||||||
|
fluffych = (
|
||||||
|
'·☺☻♥♦♣♠•◘○◙♂♀♪♫☼'
|
||||||
|
'►◄↕‼¶§▬↨↑↓→←∟↔▲▼'
|
||||||
|
' !"#$%&\'()*+,-./'
|
||||||
|
'0123456789:;<=>?'
|
||||||
|
'@ABCDEFGHIJKLMNO'
|
||||||
|
'PQRSTUVWXYZ[\]^_'
|
||||||
|
'`abcdefghijklmno'
|
||||||
|
'pqrstuvwxyz{|}~⌂'
|
||||||
|
'ÇüéâäàåçêëèïîìÄÅ'
|
||||||
|
'ÉæÆôöòûùÿÖÜ¢£¥₧ƒ'
|
||||||
|
'áíóúñѪº¿⌐¬½¼¡«»'
|
||||||
|
'░▒▓│┤╡╢╖╕╣║╗╝╜╛┐'
|
||||||
|
'└┴┬├─┼╞╟╚╔╩╦╠═╬╧'
|
||||||
|
'╨╤╥╙╘╒╓╫╪┘┌█▄▌▐▀'
|
||||||
|
'αßΓπΣσµτΦΘΩδ∞φε∩'
|
||||||
|
'≡±≥≤⌠⌡÷≈°∀∃√ⁿ²■¤'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def unpack(fmt, buf):
|
def unpack(fmt, buf):
|
||||||
"""Unpack buf based on fmt, return the rest as a string."""
|
"""Unpack buf based on fmt, return the remainder."""
|
||||||
|
|
||||||
size = struct.calcsize(fmt)
|
size = struct.calcsize(fmt)
|
||||||
vals = struct.unpack(fmt, str(buf[:size]))
|
vals = struct.unpack(fmt, bytes(buf[:size]))
|
||||||
return vals + (buf[size:],)
|
return vals + (buf[size:],)
|
||||||
|
|
||||||
|
|
||||||
class HexDumper:
|
class HexDumper:
|
||||||
def __init__(self, fd=sys.stdout):
|
def __init__(self, output, charset=fluffych):
|
||||||
self.fd = fd
|
|
||||||
self.offset = 0
|
self.offset = 0
|
||||||
self.buf = []
|
self.last = None
|
||||||
|
self.elided = False
|
||||||
|
self.hexes = []
|
||||||
|
self.chars = []
|
||||||
|
self.charset = charset
|
||||||
|
self.output = output
|
||||||
|
|
||||||
def _to_printable(self, c):
|
def _spit(self):
|
||||||
if not c:
|
if self.chars == self.last:
|
||||||
return u'◌'
|
if not self.elided:
|
||||||
else:
|
self.output.write('*\n')
|
||||||
return cgach[ord(c)]
|
self.elided = True
|
||||||
|
self.hexes = []
|
||||||
|
self.chars = []
|
||||||
def write(self, what):
|
|
||||||
self.fd.write(what.encode('utf-8'))
|
|
||||||
|
|
||||||
def _flush(self):
|
|
||||||
if not self.buf:
|
|
||||||
return
|
return
|
||||||
|
self.last = self.chars[:]
|
||||||
|
self.elided = False
|
||||||
|
|
||||||
o = []
|
pad = 16 - len(self.chars)
|
||||||
for c in self.buf:
|
self.hexes += [' '] * pad
|
||||||
if c:
|
|
||||||
o.append(u'%02x' % ord(c))
|
|
||||||
else:
|
|
||||||
o.append(u'--')
|
|
||||||
o += ([u' '] * (16 - len(self.buf)))
|
|
||||||
p = [self._to_printable(c) for c in self.buf]
|
|
||||||
|
|
||||||
self.write(u'%08x ' % self.offset)
|
self.output.write('{:08x} '.format(self.offset - len(self.chars)))
|
||||||
|
self.output.write(' '.join(self.hexes[:8]))
|
||||||
|
self.output.write(' ')
|
||||||
|
self.output.write(' '.join(self.hexes[8:]))
|
||||||
|
self.output.write(' ')
|
||||||
|
self.output.write(''.join(self.chars))
|
||||||
|
self.output.write('\n')
|
||||||
|
|
||||||
self.write(u' '.join(o[:8]))
|
self.hexes = []
|
||||||
self.write(u' ')
|
self.chars = []
|
||||||
self.write(u' '.join(o[8:]))
|
|
||||||
|
|
||||||
self.write(u' ┆')
|
def add(self, b):
|
||||||
|
if self.offset and self.offset % 16 == 0:
|
||||||
|
self._spit()
|
||||||
|
|
||||||
self.write(u''.join(p))
|
if b is None:
|
||||||
|
h = '⬜'
|
||||||
|
c = '<EFBFBD>'
|
||||||
|
else:
|
||||||
|
h = '{:02x}'.format(b)
|
||||||
|
c = self.charset[b]
|
||||||
|
self.chars.append(c)
|
||||||
|
self.hexes.append(h)
|
||||||
|
|
||||||
self.write(u'┆\n')
|
self.offset += 1
|
||||||
|
|
||||||
self.offset += len(self.buf)
|
def done(self):
|
||||||
self.buf = []
|
self._spit()
|
||||||
|
self.output.write('{:08x}\n'.format(self.offset))
|
||||||
def dump_chr(self, c):
|
|
||||||
self.buf.append(c)
|
|
||||||
if len(self.buf) == 16:
|
|
||||||
self._flush()
|
|
||||||
|
|
||||||
def dump_drop(self):
|
|
||||||
self.buf.append(None)
|
|
||||||
if len(self.buf) == 16:
|
|
||||||
self._flush()
|
|
||||||
|
|
||||||
def finish(self):
|
|
||||||
self._flush()
|
|
||||||
self.write('%08x\n' % self.offset)
|
|
||||||
|
|
||||||
|
|
||||||
def hexdump(buf, f=sys.stdout):
|
def hexdump(buf, f=sys.stdout, charset=fluffych):
|
||||||
"Print a hex dump of buf"
|
"Print a hex dump of buf"
|
||||||
|
|
||||||
d = HexDumper()
|
h = HexDumper(output=f, charset=charset)
|
||||||
|
for b in buf:
|
||||||
for c in buf:
|
h.add(b)
|
||||||
d.dump_chr(c)
|
h.done()
|
||||||
d.finish()
|
|
||||||
|
|
||||||
|
|
||||||
def cstring(buf):
|
def cstring(buf):
|
||||||
|
@ -266,7 +266,7 @@ def bin(i, bits=None):
|
||||||
def unhex(s):
|
def unhex(s):
|
||||||
"""Decode a string as hex, stripping whitespace first"""
|
"""Decode a string as hex, stripping whitespace first"""
|
||||||
|
|
||||||
return [ord(i) for i in s.replace(' ', '').decode('hex')]
|
return binascii.unhexlify(s.replace(' ', ''))
|
||||||
|
|
||||||
|
|
||||||
def pp(value, bits=16):
|
def pp(value, bits=16):
|
|
@ -1,29 +1,28 @@
|
||||||
#! /usr/bin/python
|
#! /usr/bin/python3
|
||||||
|
|
||||||
## IP resequencing + protocol reversing skeleton
|
## IP resequencing + protocol reversing skeleton
|
||||||
## 2008 Massive Blowout
|
## 2008, 2018 Neale Pickett
|
||||||
|
|
||||||
import StringIO
|
|
||||||
import struct
|
import struct
|
||||||
import socket
|
import socket
|
||||||
import warnings
|
import warnings
|
||||||
import heapq
|
import heapq
|
||||||
import gapstr
|
|
||||||
import time
|
import time
|
||||||
|
import io
|
||||||
try:
|
try:
|
||||||
import pcap
|
import pcap
|
||||||
except ImportError:
|
except ImportError:
|
||||||
import py_pcap as pcap
|
warnings.warn("Using slow pure-python pcap library")
|
||||||
|
import netarch.py_pcap as pcap
|
||||||
import os
|
import os
|
||||||
import cgi
|
import cgi
|
||||||
import urllib
|
import urllib.parse
|
||||||
import UserDict
|
from netarch import unpack, hexdump
|
||||||
from __init__ import *
|
from netarch.trilobytes import TriloBytes
|
||||||
|
|
||||||
def unpack_nybbles(byte):
|
def unpack_nybbles(byte):
|
||||||
return (byte >> 4, byte & 0x0F)
|
return (byte >> 4, byte & 0x0F)
|
||||||
|
|
||||||
|
|
||||||
transfers = os.environ.get('TRANSFERS', 'transfers')
|
transfers = os.environ.get('TRANSFERS', 'transfers')
|
||||||
|
|
||||||
IP = 0x0800
|
IP = 0x0800
|
||||||
|
@ -43,6 +42,7 @@ class Frame:
|
||||||
def __init__(self, pkt):
|
def __init__(self, pkt):
|
||||||
((self.time, self.time_usec, _), frame) = pkt
|
((self.time, self.time_usec, _), frame) = pkt
|
||||||
|
|
||||||
|
|
||||||
# Ethernet
|
# Ethernet
|
||||||
(self.eth_dhost,
|
(self.eth_dhost,
|
||||||
self.eth_shost,
|
self.eth_shost,
|
||||||
|
@ -132,15 +132,17 @@ class Frame:
|
||||||
|
|
||||||
|
|
||||||
def get_src_addr(self):
|
def get_src_addr(self):
|
||||||
saddr = struct.pack('!i', self.saddr)
|
if not hasattr(self, "_src_addr"):
|
||||||
self.src_addr = socket.inet_ntoa(saddr)
|
saddr = struct.pack('!i', self.saddr)
|
||||||
return self.src_addr
|
self._src_addr = socket.inet_ntoa(saddr)
|
||||||
|
return self._src_addr
|
||||||
src_addr = property(get_src_addr)
|
src_addr = property(get_src_addr)
|
||||||
|
|
||||||
def get_dst_addr(self):
|
def get_dst_addr(self):
|
||||||
daddr = struct.pack('!i', self.daddr)
|
if not hasattr(self, "_dst_addr"):
|
||||||
self.dst_addr = socket.inet_ntoa(daddr)
|
daddr = struct.pack('!i', self.daddr)
|
||||||
return self.dst_addr
|
self._dst_addr = socket.inet_ntoa(daddr)
|
||||||
|
return self._dst_addr
|
||||||
dst_addr = property(get_dst_addr)
|
dst_addr = property(get_dst_addr)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
|
@ -231,7 +233,7 @@ class TCP_Recreate:
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
return ethhdr + iphdr + tcphdr + str(payload)
|
return ethhdr + iphdr + tcphdr + bytes(payload)
|
||||||
|
|
||||||
def write_pkt(self, timestamp, cli, payload, flags=0):
|
def write_pkt(self, timestamp, cli, payload, flags=0):
|
||||||
p = self.packet(cli, payload, flags)
|
p = self.packet(cli, payload, flags)
|
||||||
|
@ -309,16 +311,14 @@ class TCP_Resequence:
|
||||||
|
|
||||||
pending = self.pending[xdi]
|
pending = self.pending[xdi]
|
||||||
# Get a sorted list of sequence numbers
|
# Get a sorted list of sequence numbers
|
||||||
keys = pending.keys()
|
keys = sorted(pending)
|
||||||
keys.sort()
|
|
||||||
|
|
||||||
# Build up return value
|
# Build up return value
|
||||||
gs = gapstr.GapString()
|
gs = TriloBytes()
|
||||||
if keys:
|
if keys:
|
||||||
f = pending[keys[0]]
|
first = pending[keys[0]]
|
||||||
ret = (xdi, f, gs)
|
|
||||||
else:
|
else:
|
||||||
ret = (xdi, None, gs)
|
first = None
|
||||||
|
|
||||||
# Fill in gs with our frames
|
# Fill in gs with our frames
|
||||||
for key in keys:
|
for key in keys:
|
||||||
|
@ -328,13 +328,14 @@ class TCP_Resequence:
|
||||||
frame = pending[key]
|
frame = pending[key]
|
||||||
if key > seq:
|
if key > seq:
|
||||||
# Dropped frame(s)
|
# Dropped frame(s)
|
||||||
if key - seq > 6000:
|
dropped = key - seq
|
||||||
print "Gosh, bob, %d dropped octets sure is a lot!" % (key - seq)
|
if dropped > 6000:
|
||||||
gs.append(key - seq)
|
print("Gosh, %d dropped octets sure is a lot!" % (dropped))
|
||||||
|
gs += [None] * dropped
|
||||||
seq = key
|
seq = key
|
||||||
if key == seq:
|
if key == seq:
|
||||||
# Default
|
# Default
|
||||||
gs.append(frame.payload)
|
gs += frame.payload
|
||||||
seq += len(frame.payload)
|
seq += len(frame.payload)
|
||||||
del pending[key]
|
del pending[key]
|
||||||
elif key < seq:
|
elif key < seq:
|
||||||
|
@ -349,13 +350,14 @@ class TCP_Resequence:
|
||||||
self.handle = self.handle_drop
|
self.handle = self.handle_drop
|
||||||
if seq != pkt.ack:
|
if seq != pkt.ack:
|
||||||
# Drop at the end
|
# Drop at the end
|
||||||
if pkt.ack - seq > 6000:
|
dropped = pkt.ack - seq
|
||||||
print 'Large drop at end of session!'
|
if dropped > 6000:
|
||||||
print ' %s' % ((pkt, pkt.time),)
|
print('Large drop at end of session!')
|
||||||
print ' %x %x' % (pkt.ack, seq)
|
print(' %s' % ((pkt, pkt.time),))
|
||||||
gs.append(pkt.ack - seq)
|
print(' %x %x' % (pkt.ack, seq))
|
||||||
|
gs += [None] * dropped
|
||||||
|
|
||||||
return ret
|
return (xdi, first, gs)
|
||||||
|
|
||||||
|
|
||||||
def handle(self, pkt):
|
def handle(self, pkt):
|
||||||
|
@ -447,14 +449,14 @@ class Dispatch:
|
||||||
if not literal:
|
if not literal:
|
||||||
parts = filename.split(':::')
|
parts = filename.split(':::')
|
||||||
fn = parts[0]
|
fn = parts[0]
|
||||||
fd = file(fn)
|
fd = open(fn, "rb")
|
||||||
pc = pcap.open(fd)
|
pc = pcap.open(fd)
|
||||||
if len(parts) > 1:
|
if len(parts) > 1:
|
||||||
pos = int(parts[1])
|
pos = int(parts[1])
|
||||||
fd.seek(pos)
|
fd.seek(pos)
|
||||||
self._read(pc, fn, fd)
|
self._read(pc, fn, fd)
|
||||||
else:
|
else:
|
||||||
fd = file(filename)
|
fd = open(filename, "rb")
|
||||||
pc = pcap.open(fd)
|
pc = pcap.open(fd)
|
||||||
self._read(pc, filename, fd)
|
self._read(pc, filename, fd)
|
||||||
|
|
||||||
|
@ -490,7 +492,7 @@ class Dispatch:
|
||||||
class NeedMoreData(Exception):
|
class NeedMoreData(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class Packet(UserDict.DictMixin):
|
class Packet:
|
||||||
"""Base class for a packet from a binary protocol.
|
"""Base class for a packet from a binary protocol.
|
||||||
|
|
||||||
This is a base class for making protocol reverse-engineering easier.
|
This is a base class for making protocol reverse-engineering easier.
|
||||||
|
@ -549,43 +551,46 @@ class Packet(UserDict.DictMixin):
|
||||||
assert a in b, ('%r not in %r' % (a, b))
|
assert a in b, ('%r not in %r' % (a, b))
|
||||||
|
|
||||||
def show(self):
|
def show(self):
|
||||||
print '%s %3s: %s' % (self.__class__.__name__,
|
print('%s %3s: %s' % (self.__class__.__name__,
|
||||||
self.opcode,
|
self.opcode,
|
||||||
self.opcode_desc)
|
self.opcode_desc))
|
||||||
if self.firstframe:
|
if self.firstframe:
|
||||||
print ' %s:%d -> %s:%d (%s.%06dZ)' % (self.firstframe.src_addr,
|
print(' %s:%d -> %s:%d (%s.%06dZ)' % (self.firstframe.src_addr,
|
||||||
self.firstframe.sport,
|
self.firstframe.sport,
|
||||||
self.firstframe.dst_addr,
|
self.firstframe.dst_addr,
|
||||||
self.firstframe.dport,
|
self.firstframe.dport,
|
||||||
time.strftime('%Y-%m-%dT%T', time.gmtime(self.firstframe.time)),
|
time.strftime('%Y-%m-%dT%T', time.gmtime(self.firstframe.time)),
|
||||||
self.firstframe.time_usec)
|
self.firstframe.time_usec))
|
||||||
|
|
||||||
if self.parts:
|
if self.parts:
|
||||||
dl = len(self.parts[-1])
|
dl = len(self.parts[-1])
|
||||||
p = []
|
p = []
|
||||||
|
xp = []
|
||||||
for x in self.parts[:-1]:
|
for x in self.parts[:-1]:
|
||||||
if x == dl:
|
if x == dl:
|
||||||
p.append('%3d!' % x)
|
p.append('%3d!' % x)
|
||||||
|
xp.append('%3x!' % x)
|
||||||
else:
|
else:
|
||||||
p.append('%3d' % x)
|
p.append('%3d' % x)
|
||||||
print ' parts: (%s) +%d bytes' % (','.join(p), dl)
|
xp.append('%3x' % x)
|
||||||
|
print(' parts: (%s) +%d octets' % (','.join(p), dl))
|
||||||
|
print(' 0xparts: (%s) +%x octets' % (','.join(xp), dl))
|
||||||
|
|
||||||
keys = self.params.keys()
|
keys = sorted(self.params)
|
||||||
keys.sort()
|
|
||||||
for k in keys:
|
for k in keys:
|
||||||
print ' %12s: %s' % (k, self.params[k])
|
print(' %12s: %s' % (k, self.params[k]))
|
||||||
|
|
||||||
if self.subpackets:
|
if self.subpackets:
|
||||||
for p in self.subpackets:
|
for p in self.subpackets:
|
||||||
p.show()
|
p.show()
|
||||||
elif self.payload:
|
elif self.payload:
|
||||||
try:
|
try:
|
||||||
self.payload.hexdump()
|
hexdump(self.payload)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
print ' payload: %r' % self.payload
|
print(' payload: %r' % self.payload)
|
||||||
|
|
||||||
def parse(self, data):
|
def parse(self, data):
|
||||||
"""Parse a chunk of data (possibly a GapString).
|
"""Parse a chunk of data (possibly a TriloBytes).
|
||||||
|
|
||||||
Anything returned is not part of this packet and will be passed
|
Anything returned is not part of this packet and will be passed
|
||||||
in to a subsequent packet.
|
in to a subsequent packet.
|
||||||
|
@ -600,7 +605,7 @@ class Packet(UserDict.DictMixin):
|
||||||
"""Handle data from a Session class."""
|
"""Handle data from a Session class."""
|
||||||
|
|
||||||
data = self.parse(data)
|
data = self.parse(data)
|
||||||
if self.opcode <> None:
|
if self.opcode != None:
|
||||||
try:
|
try:
|
||||||
f = getattr(self, 'opcode_%s' % self.opcode)
|
f = getattr(self, 'opcode_%s' % self.opcode)
|
||||||
except AttributeError:
|
except AttributeError:
|
||||||
|
@ -642,12 +647,12 @@ class Session:
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def handle(self, is_srv, frame, gs, lastpos):
|
def handle(self, is_srv, frame, data, lastpos):
|
||||||
"""Handle a data burst.
|
"""Handle a data burst.
|
||||||
|
|
||||||
@param is_srv Is this from the server?
|
@param is_srv Is this from the server?
|
||||||
@param frame A frame associated with this packet, or None if it's all drops
|
@param frame A frame associated with this packet, or None if it's all drops
|
||||||
@param gs A gapstring of the data
|
@param data A TriloBytes of the data
|
||||||
@param lastpos Last position in the source file, for debugging
|
@param lastpos Last position in the source file, for debugging
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
@ -659,21 +664,21 @@ class Session:
|
||||||
try:
|
try:
|
||||||
saddr = frame.saddr
|
saddr = frame.saddr
|
||||||
try:
|
try:
|
||||||
(f, data) = self.pending.pop(saddr)
|
(f, buf) = self.pending.pop(saddr)
|
||||||
except KeyError:
|
except KeyError:
|
||||||
f = frame
|
f = frame
|
||||||
data = gapstr.GapString()
|
buf = TriloBytes()
|
||||||
data.extend(gs)
|
buf += data
|
||||||
try:
|
try:
|
||||||
while data:
|
while buf:
|
||||||
p = self.Packet(self, f)
|
p = self.Packet(self, f)
|
||||||
data = p.handle(data)
|
buf = p.handle(buf)
|
||||||
self.process(p)
|
self.process(p)
|
||||||
except NeedMoreData:
|
except NeedMoreData:
|
||||||
self.pending[saddr] = (f, data)
|
self.pending[saddr] = (f, buf)
|
||||||
self.count += 1
|
self.count += 1
|
||||||
except:
|
except:
|
||||||
print ('Lastpos: %r' % (lastpos,))
|
print('Lastpos: %r' % (lastpos,))
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def process(self, packet):
|
def process(self, packet):
|
||||||
|
@ -698,11 +703,11 @@ class Session:
|
||||||
fn = '%d-%s~%d-%s~%d---%s' % (frame.time,
|
fn = '%d-%s~%d-%s~%d---%s' % (frame.time,
|
||||||
frame.src_addr, frame.sport,
|
frame.src_addr, frame.sport,
|
||||||
frame.dst_addr, frame.dport,
|
frame.dst_addr, frame.dport,
|
||||||
urllib.quote(fn, ''))
|
urllib.parse.quote(fn, ''))
|
||||||
fullfn = os.path.join(self.basename, fn)
|
fullfn = os.path.join(self.basename, fn)
|
||||||
fullfn2 = os.path.join(self.basename2, fn)
|
fullfn2 = os.path.join(self.basename2, fn)
|
||||||
print ' writing %s' % (fn,)
|
print(' writing %s' % (fn,))
|
||||||
fd = file(fullfn, 'w')
|
fd = open(fullfn, 'wb')
|
||||||
try:
|
try:
|
||||||
os.unlink(fullfn2)
|
os.unlink(fullfn2)
|
||||||
except OSError:
|
except OSError:
|
||||||
|
@ -721,7 +726,9 @@ class Session:
|
||||||
class HtmlSession(Session):
|
class HtmlSession(Session):
|
||||||
def __init__(self, frame):
|
def __init__(self, frame):
|
||||||
Session.__init__(self, frame)
|
Session.__init__(self, frame)
|
||||||
self.sessfd = self.open_out('session.html')
|
fd = self.open_out('session.html')
|
||||||
|
fbuf = io.BufferedWriter(fd, 1024)
|
||||||
|
self.sessfd = io.TextIOWrapper(fbuf, encoding="utf-8")
|
||||||
self.sessfd.write('''<?xml version="1.0" encoding="UTF-8"?>
|
self.sessfd.write('''<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<!DOCTYPE html
|
<!DOCTYPE html
|
||||||
PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
|
PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN"
|
|
@ -1,13 +1,16 @@
|
||||||
#! /usr/bin/python
|
#! /usr/bin/python3
|
||||||
|
|
||||||
import struct
|
import struct
|
||||||
|
import builtins
|
||||||
|
|
||||||
_MAGIC = 0xA1B2C3D4
|
_MAGIC = 0xA1B2C3D4
|
||||||
|
|
||||||
class pcap:
|
class PcapFile:
|
||||||
def __init__(self, stream, mode='rb', snaplen=65535, linktype=1):
|
def __init__(self, stream, mode='r', snaplen=65535, linktype=1):
|
||||||
|
if 'b' not in mode:
|
||||||
|
mode += 'b'
|
||||||
try:
|
try:
|
||||||
self.stream = file(stream, mode)
|
self.stream = builtins.open(stream, mode)
|
||||||
except TypeError:
|
except TypeError:
|
||||||
self.stream = stream
|
self.stream = stream
|
||||||
try:
|
try:
|
||||||
|
@ -16,7 +19,7 @@ class pcap:
|
||||||
except IOError:
|
except IOError:
|
||||||
hdr = None
|
hdr = None
|
||||||
|
|
||||||
if hdr:
|
if 'r' in mode:
|
||||||
# We're in read mode
|
# We're in read mode
|
||||||
self._endian = None
|
self._endian = None
|
||||||
for endian in '<>':
|
for endian in '<>':
|
||||||
|
@ -71,17 +74,22 @@ class pcap:
|
||||||
break
|
break
|
||||||
yield r
|
yield r
|
||||||
|
|
||||||
|
open = PcapFile
|
||||||
open = pcap
|
pcap = PcapFile
|
||||||
open_offline = pcap
|
open_offline = PcapFile
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
p = open('test.pcap', 'w') # Create a new file
|
import io
|
||||||
p.write(((0, 0, 3), 'foo')) # Add a packet
|
|
||||||
p.write(((0, 0, 3), 'bar'))
|
f = io.BytesIO()
|
||||||
|
p = PcapFile(f, 'w')
|
||||||
|
p.write(((0, 0, 3), b'foo')) # Add a packet
|
||||||
|
p.write(((0, 0, 3), b'bar'))
|
||||||
del p
|
del p
|
||||||
p = open(file('test.pcap')) # Also takes file objects
|
|
||||||
|
f.seek(0)
|
||||||
|
p = PcapFile(f)
|
||||||
assert ((p.version, p.thiszone, p.sigfigs, p.snaplen, p.linktype) ==
|
assert ((p.version, p.thiszone, p.sigfigs, p.snaplen, p.linktype) ==
|
||||||
((2, 4), 0, 0, 65535, 1))
|
((2, 4), 0, 0, 65535, 1))
|
||||||
assert ([i for i in p] == [((0, 0, 3), 'foo'), ((0, 0, 3), 'bar')])
|
assert ([i for i in p] == [((0, 0, 3), b'foo'), ((0, 0, 3), b'bar')])
|
|
@ -0,0 +1,146 @@
|
||||||
|
#! /usr/bin/python3
|
||||||
|
|
||||||
|
## 2008, 2018 Neale Pickett
|
||||||
|
|
||||||
|
import itertools
|
||||||
|
|
||||||
|
class TriloBytes:
|
||||||
|
"""Three-level byte array (0, 1, Missing).
|
||||||
|
|
||||||
|
This allows you to represent on-wire transactions with holes in the middle,
|
||||||
|
due to eg. dropped packets.
|
||||||
|
|
||||||
|
>>> tb = TriloBytes(b'hi')
|
||||||
|
>>> bytes(tb)
|
||||||
|
b'hi'
|
||||||
|
>>> bytes(tb[:40])
|
||||||
|
b'hi'
|
||||||
|
|
||||||
|
>>> tb = TriloBytes(b'hi') + [None] * 3
|
||||||
|
>>> bytes(tb)
|
||||||
|
b'hi???'
|
||||||
|
>>> bytes(tb[:40])
|
||||||
|
b'hi???'
|
||||||
|
>>> bytes(tb[:3])
|
||||||
|
b'hi?'
|
||||||
|
>>> bytes(tb[-4:])
|
||||||
|
b'i???'
|
||||||
|
>>> bytes(tb + tb)
|
||||||
|
b'hi???hi???'
|
||||||
|
>>> bytes(tb ^ 1)
|
||||||
|
b'ih???'
|
||||||
|
>>> bytes(tb ^ [32, 1])
|
||||||
|
b'Hh???'
|
||||||
|
|
||||||
|
>>> tb = TriloBytes(b'hi', drop=b'DROP')
|
||||||
|
>>> bytes(tb)
|
||||||
|
b'hi'
|
||||||
|
>>> tb += [None] * 7
|
||||||
|
>>> bytes(tb)
|
||||||
|
b'hiOPDROPD'
|
||||||
|
|
||||||
|
>>> tb = TriloBytes(b'00')^1
|
||||||
|
>>> tb[0]
|
||||||
|
1
|
||||||
|
|
||||||
|
>>> bytes(TriloBytes(b'00'))
|
||||||
|
b'\x00'
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, initializer=(), drop=b'?'):
|
||||||
|
self._drop = drop
|
||||||
|
self._contents = tuple(initializer)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def fromhex(cls, string):
|
||||||
|
"""
|
||||||
|
>>> bytes(TriloBytes.fromhex("616263"))
|
||||||
|
b'abc'
|
||||||
|
"""
|
||||||
|
|
||||||
|
return cls(bytes.fromhex(string))
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
"""
|
||||||
|
>>> len(TriloBytes(b'abc'))
|
||||||
|
3
|
||||||
|
"""
|
||||||
|
|
||||||
|
return len(self._contents)
|
||||||
|
|
||||||
|
def __nonzero__(self):
|
||||||
|
"""
|
||||||
|
>>> 10 if TriloBytes() else -10
|
||||||
|
-10
|
||||||
|
>>> 10 if TriloBytes(b'a') else -10
|
||||||
|
10
|
||||||
|
"""
|
||||||
|
|
||||||
|
return len(self) > 0
|
||||||
|
|
||||||
|
def __getitem__(self, key):
|
||||||
|
ret = self._contents[key]
|
||||||
|
try:
|
||||||
|
return TriloBytes(ret, self._drop)
|
||||||
|
except:
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
for val in self._contents:
|
||||||
|
yield val
|
||||||
|
|
||||||
|
def __bytes__(self):
|
||||||
|
return bytes((d if v is None else v for v,d in zip(self,itertools.cycle(self._drop))))
|
||||||
|
|
||||||
|
def __add__(self, other):
|
||||||
|
try:
|
||||||
|
contents = self._contents + other._contents
|
||||||
|
except AttributeError:
|
||||||
|
contents = self._contents + tuple(other)
|
||||||
|
return TriloBytes(contents, self._drop)
|
||||||
|
|
||||||
|
def __eq__(self, other):
|
||||||
|
try:
|
||||||
|
return self._contents == other._contents
|
||||||
|
except:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def __hash__(self):
|
||||||
|
return hash(self._contents)
|
||||||
|
|
||||||
|
def __xor__(self, mask):
|
||||||
|
try:
|
||||||
|
mask[0]
|
||||||
|
except TypeError:
|
||||||
|
mask = [mask]
|
||||||
|
return TriloBytes(((None if x is None or y is None else x^y) for x,y in zip(self._contents, itertools.cycle(mask))), drop=self._drop)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
"""
|
||||||
|
>>> TriloBytes(b'abc')
|
||||||
|
<TriloBytes missing 0 of 3>
|
||||||
|
>>> TriloBytes(b'abc') + [None]
|
||||||
|
<TriloBytes missing 1 of 4>
|
||||||
|
"""
|
||||||
|
|
||||||
|
return '<TriloBytes missing %d of %d>' % (self.missing(), len(self))
|
||||||
|
|
||||||
|
def decode(self, codec):
|
||||||
|
return bytes(self).decode(codec)
|
||||||
|
|
||||||
|
def missing(self):
|
||||||
|
"""
|
||||||
|
>>> TriloBytes(b'abc').missing()
|
||||||
|
0
|
||||||
|
>>> (TriloBytes(b'abc') + [None, None]).missing()
|
||||||
|
2
|
||||||
|
"""
|
||||||
|
return self._contents.count(None)
|
||||||
|
|
||||||
|
def map(self, func, *args):
|
||||||
|
return (v if v is not None else func(v, *args) for v in self)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
import doctest
|
||||||
|
doctest.testmod()
|
|
@ -0,0 +1,73 @@
|
||||||
|
#! /usr/bin/python3
|
||||||
|
|
||||||
|
ENDIAN_LITTLE = 1
|
||||||
|
ENDIAN_BIG = 2
|
||||||
|
ENDIAN_MIDDLE = 3
|
||||||
|
ENDIAN_NETWORK = ENDIAN_BIG
|
||||||
|
|
||||||
|
class Unpacker:
|
||||||
|
"""Class that lets you peel values off
|
||||||
|
|
||||||
|
>>> u = Unpacker(bytes((1, 0,2, 0,0,0,3, 0,0,0,0,0,0,0,4)))
|
||||||
|
>>> u.uint8()
|
||||||
|
1
|
||||||
|
>>> u.uint16()
|
||||||
|
2
|
||||||
|
>>> u.uint32()
|
||||||
|
3
|
||||||
|
>>> u.uint64()
|
||||||
|
4
|
||||||
|
|
||||||
|
>>> u = Unpacker(bytes((1,0, 104,105)), ENDIAN_LITTLE)
|
||||||
|
>>> u.uint16()
|
||||||
|
1
|
||||||
|
>>> u.buf
|
||||||
|
b'hi'
|
||||||
|
|
||||||
|
>>> u = Unpacker(bytes((1,0, 0,2)))
|
||||||
|
>>> u.uint16(ENDIAN_LITTLE)
|
||||||
|
1
|
||||||
|
>>> u.uint(16, ENDIAN_BIG)
|
||||||
|
2
|
||||||
|
|
||||||
|
>>> u = Unpacker(bytes((0,1,2,3)), ENDIAN_MIDDLE)
|
||||||
|
>>> '%08x' % u.uint32()
|
||||||
|
'01000302'
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, buf, endian=ENDIAN_NETWORK):
|
||||||
|
self.endian = endian
|
||||||
|
self.buf = buf
|
||||||
|
|
||||||
|
def uint(self, size, endian=None):
|
||||||
|
endian = endian or self.endian
|
||||||
|
if size not in (8, 16, 32, 64):
|
||||||
|
# XXX: I'm pretty sure this can be done, but I don't want to code it up right now.
|
||||||
|
raise ValueError("Can't do weird sizes")
|
||||||
|
noctets = size // 8
|
||||||
|
if endian == ENDIAN_BIG:
|
||||||
|
r = range(0, noctets)
|
||||||
|
elif endian == ENDIAN_LITTLE:
|
||||||
|
r = range(noctets-1, -1, -1)
|
||||||
|
elif endian == ENDIAN_MIDDLE:
|
||||||
|
r = (1, 0, 3, 2, 5, 4, 7, 6)[:noctets]
|
||||||
|
else:
|
||||||
|
raise ValueError("Unsupported byte order")
|
||||||
|
pull, self.buf = self.buf[:noctets], self.buf[noctets:]
|
||||||
|
acc = 0
|
||||||
|
for i in r:
|
||||||
|
acc = (acc << 8) | pull[i]
|
||||||
|
return acc
|
||||||
|
|
||||||
|
def uint8(self):
|
||||||
|
return self.uint(8)
|
||||||
|
def uint16(self, endian=None):
|
||||||
|
return self.uint(16, endian)
|
||||||
|
def uint32(self, endian=None):
|
||||||
|
return self.uint(32, endian)
|
||||||
|
def uint64(self, endian=None):
|
||||||
|
return self.uint(64, endian)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import doctest
|
||||||
|
doctest.testmod()
|
Loading…
Reference in New Issue