more py3 fixes

This commit is contained in:
Neale Pickett 2020-09-21 14:00:59 -06:00
parent 93498f17a9
commit c3ced6f1c4
5 changed files with 186 additions and 194 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
__pycache__

View File

@ -3,154 +3,8 @@
import binascii import binascii
import sys import sys
import struct import struct
from . import ip
stdch = (
'␀·········␊··␍··'
'················'
' !"#$%&\'()*+,-./'
'0123456789:;<=>?'
'@ABCDEFGHIJKLMNO'
'PQRSTUVWXYZ[\]^_'
'`abcdefghijklmno'
'pqrstuvwxyz{|}~·'
'················'
'················'
'················'
'················'
'················'
'················'
'················'
'················'
)
decch = (
'␀␁␂␃␄␅␆␇␈␉␊␋␌␍␎␏'
'␐␑␒␓␔␕␖␗␘␙␚·····'
'␠!"#$%&\'()*+,-./'
'0123456789:;<=>?'
'@ABCDEFGHIJKLMNO'
'PQRSTUVWXYZ[\]^_'
'`abcdefghijklmno'
'pqrstuvwxyz{|}~␡'
'················'
'················'
'················'
'················'
'················'
'················'
'················'
'················'
)
cgach = (
'□☺☻♥♦♣♠•◘○◙♂♀♪♫☼'
'►◄↕‼¶§▬↨↑↓→←∟↔▲▼'
' !"#$%&\'()*+,-./'
'0123456789:;<=>?'
'@ABCDEFGHIJKLMNO'
'PQRSTUVWXYZ[\]^_'
'`abcdefghijklmno'
'pqrstuvwxyz{|}~⌂'
'ÇüéâäàåçêëèïîìÄÅ'
'ÉæÆôöòûùÿÖÜ¢£¥₧ƒ'
'áíóúñѪº¿⌐¬½¼¡«»'
'░▒▓│┤╡╢╖╕╣║╗╝╜╛┐'
'└┴┬├─┼╞╟╚╔╩╦╠═╬╧'
'╨╤╥╙╘╒╓╫╪┘┌█▄▌▐▀'
'αßΓπΣσµτΦΘΩδ∞φε∩'
'≡±≥≤⌠⌡÷≈°∙·√ⁿ²■¤'
)
fluffych = (
'·☺☻♥♦♣♠•◘○◙♂♀♪♫☼'
'►◄↕‼¶§▬↨↑↓→←∟↔▲▼'
' !"#$%&\'()*+,-./'
'0123456789:;<=>?'
'@ABCDEFGHIJKLMNO'
'PQRSTUVWXYZ[\]^_'
'`abcdefghijklmno'
'pqrstuvwxyz{|}~⌂'
'ÇüéâäàåçêëèïîìÄÅ'
'ÉæÆôöòûùÿÖÜ¢£¥₧ƒ'
'áíóúñѪº¿⌐¬½¼¡«»'
'░▒▓│┤╡╢╖╕╣║╗╝╜╛┐'
'└┴┬├─┼╞╟╚╔╩╦╠═╬╧'
'╨╤╥╙╘╒╓╫╪┘┌█▄▌▐▀'
'αßΓπΣσµτΦΘΩδ∞φε∩'
'≡±≥≤⌠⌡÷≈°∀∃√ⁿ²■¤'
)
def unpack(fmt, buf):
"""Unpack buf based on fmt, return the remainder."""
size = struct.calcsize(fmt)
vals = struct.unpack(fmt, bytes(buf[:size]))
return vals + (buf[size:],)
class HexDumper:
def __init__(self, output, charset=fluffych):
self.offset = 0
self.last = None
self.elided = False
self.hexes = []
self.chars = []
self.charset = charset
self.output = output
def _spit(self):
if self.chars == self.last:
if not self.elided:
self.output.write('*\n')
self.elided = True
self.hexes = []
self.chars = []
return
self.last = self.chars[:]
self.elided = False
pad = 16 - len(self.chars)
self.hexes += [' '] * pad
self.output.write('{:08x} '.format(self.offset - len(self.chars)))
self.output.write(' '.join(self.hexes[:8]))
self.output.write(' ')
self.output.write(' '.join(self.hexes[8:]))
self.output.write(' ')
self.output.write(''.join(self.chars))
self.output.write('\n')
self.hexes = []
self.chars = []
def add(self, b):
if self.offset and self.offset % 16 == 0:
self._spit()
if b is None:
h = ''
c = '<EFBFBD>'
else:
h = '{:02x}'.format(b)
c = self.charset[b]
self.chars.append(c)
self.hexes.append(h)
self.offset += 1
def done(self):
self._spit()
self.output.write('{:08x}\n'.format(self.offset))
def hexdump(buf, f=sys.stdout, charset=fluffych):
"Print a hex dump of buf"
h = HexDumper(output=f, charset=charset)
for b in buf:
h.add(b)
h.done()
def cstring(buf): def cstring(buf):
@ -160,10 +14,6 @@ def cstring(buf):
return buf[:i] return buf[:i]
def md5sum(txt):
return md5.new(txt).hexdigest()
def assert_equal(a, b): def assert_equal(a, b):
assert a == b, ('%r != %r' % (a, b)) assert a == b, ('%r != %r' % (a, b))
@ -218,7 +68,7 @@ class BitVector:
"""Iterate from LSB to MSB""" """Iterate from LSB to MSB"""
v = self._val v = self._val
for i in xrange(self._len): for _ in range(self._len):
yield int(v & 1) yield int(v & 1)
v >>= 1 v >>= 1
@ -282,7 +132,7 @@ import string
b64alpha = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/' b64alpha = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/'
def from_b64(s, alphabet, codec='base64'): def from_b64(s, alphabet, codec='base64'):
tr = string.maketrans(alphabet, b64alpha) tr = alphabet.maketrans(b64alpha)
t = s.translate(tr) t = s.translate(tr)
return t.decode(codec) return t.decode(codec)
@ -330,3 +180,15 @@ def _registry(encoding):
Esab64StreamReader, Esab64StreamWriter) Esab64StreamReader, Esab64StreamWriter)
codecs.register(_registry) codecs.register(_registry)
def main(session):
s = None
reseq = ip.Dispatch(*sys.argv[1:])
for _, d in reseq:
srv, first, chunk = d
if not s:
s = session(first)
s.handle(srv, first, chunk, reseq.last)
Session = ip.Session
Packet = ip.Packet

View File

@ -1,20 +1,12 @@
#! /usr/bin/python3 #! /usr/bin/python3
import sys import netarch
from netarch import ip
from netarch import *
class DumbPacket(ip.Packet): class DumbPacket(netarch.Packet):
def parse(self, data): def parse(self, data):
self.payload = data self.payload = data
class DumbSession(ip.Session): class DumbSession(netarch.Session):
Packet = DumbPacket Packet = DumbPacket
s = None netarch.main(DumbSession)
reseq = ip.Dispatch(*sys.argv[1:])
for h, d in reseq:
srv, first, chunk = d
if not s:
s = DumbSession(first)
s.handle(srv, first, chunk, reseq.last)

140
hexdump.py Normal file
View File

@ -0,0 +1,140 @@
import sys
stdch = (
'␀·········␊··␍··'
'················'
' !"#$%&\'()*+,-./'
'0123456789:;<=>?'
'@ABCDEFGHIJKLMNO'
'PQRSTUVWXYZ[\\]^_'
'`abcdefghijklmno'
'pqrstuvwxyz{|}~·'
'················'
'················'
'················'
'················'
'················'
'················'
'················'
'················'
)
decch = (
'␀␁␂␃␄␅␆␇␈␉␊␋␌␍␎␏'
'␐␑␒␓␔␕␖␗␘␙␚·····'
'␠!"#$%&\'()*+,-./'
'0123456789:;<=>?'
'@ABCDEFGHIJKLMNO'
'PQRSTUVWXYZ[\\]^_'
'`abcdefghijklmno'
'pqrstuvwxyz{|}~␡'
'················'
'················'
'················'
'················'
'················'
'················'
'················'
'················'
)
cgach = (
'□☺☻♥♦♣♠•◘○◙♂♀♪♫☼'
'►◄↕‼¶§▬↨↑↓→←∟↔▲▼'
' !"#$%&\'()*+,-./'
'0123456789:;<=>?'
'@ABCDEFGHIJKLMNO'
'PQRSTUVWXYZ[\\]^_'
'`abcdefghijklmno'
'pqrstuvwxyz{|}~⌂'
'ÇüéâäàåçêëèïîìÄÅ'
'ÉæÆôöòûùÿÖÜ¢£¥₧ƒ'
'áíóúñѪº¿⌐¬½¼¡«»'
'░▒▓│┤╡╢╖╕╣║╗╝╜╛┐'
'└┴┬├─┼╞╟╚╔╩╦╠═╬╧'
'╨╤╥╙╘╒╓╫╪┘┌█▄▌▐▀'
'αßΓπΣσµτΦΘΩδ∞φε∩'
'≡±≥≤⌠⌡÷≈°∙·√ⁿ²■¤'
)
fluffych = (
'·☺☻♥♦♣♠•◘○◙♂♀♪♫☼'
'►◄↕‼¶§▬↨↑↓→←∟↔▲▼'
' !"#$%&\'()*+,-./'
'0123456789:;<=>?'
'@ABCDEFGHIJKLMNO'
'PQRSTUVWXYZ[\\]^_'
'`abcdefghijklmno'
'pqrstuvwxyz{|}~⌂'
'ÇüéâäàåçêëèïîìÄÅ'
'ÉæÆôöòûùÿÖÜ¢£¥₧ƒ'
'áíóúñѪº¿⌐¬½¼¡«»'
'░▒▓│┤╡╢╖╕╣║╗╝╜╛┐'
'└┴┬├─┼╞╟╚╔╩╦╠═╬╧'
'╨╤╥╙╘╒╓╫╪┘┌█▄▌▐▀'
'αßΓπΣσµτΦΘΩδ∞φε∩'
'≡±≥≤⌠⌡÷≈°∀∃√ⁿ²■¤'
)
class HexDumper:
def __init__(self, output, charset=fluffych):
self.offset = 0
self.last = None
self.elided = False
self.hexes = []
self.chars = []
self.charset = charset
self.output = output
def _spit(self):
if self.chars == self.last:
if not self.elided:
self.output.write('*\n')
self.elided = True
self.hexes = []
self.chars = []
return
self.last = self.chars[:]
self.elided = False
pad = 16 - len(self.chars)
self.hexes += [' '] * pad
self.output.write('{:08x} '.format(self.offset - len(self.chars)))
self.output.write(' '.join(self.hexes[:8]))
self.output.write(' ')
self.output.write(' '.join(self.hexes[8:]))
self.output.write(' ')
self.output.write(''.join(self.chars))
self.output.write('\n')
self.hexes = []
self.chars = []
def add(self, b):
if self.offset and self.offset % 16 == 0:
self._spit()
if b is None:
h = ''
c = '<EFBFBD>'
else:
h = '{:02x}'.format(b)
c = self.charset[b]
self.chars.append(c)
self.hexes.append(h)
self.offset += 1
def done(self):
self._spit()
self.output.write('{:08x}\n'.format(self.offset))
def hexdump(buf, f=sys.stdout, charset=fluffych):
"Print a hex dump of buf"
h = HexDumper(output=f, charset=charset)
for b in buf:
h.add(b)
h.done()

39
ip.py
View File

@ -13,12 +13,20 @@ try:
import pcap import pcap
except ImportError: except ImportError:
warnings.warn("Using slow pure-python pcap library") warnings.warn("Using slow pure-python pcap library")
import netarch.py_pcap as pcap from . import py_pcap as pcap
import os import os
import cgi import cgi
import urllib.parse import urllib.parse
from netarch import unpack, hexdump from .hexdump import hexdump
from netarch.trilobytes import TriloBytes from .trilobytes import TriloBytes
def unpack(fmt, buf):
"""Unpack buf based on fmt, return the remainder."""
size = struct.calcsize(fmt)
vals = struct.unpack(fmt, bytes(buf[:size]))
return vals + (buf[size:],)
def unpack_nybbles(byte): def unpack_nybbles(byte):
return (byte >> 4, byte & 0x0F) return (byte >> 4, byte & 0x0F)
@ -65,7 +73,6 @@ class Frame:
p) = unpack('!HHBBH6si6si', p) p) = unpack('!HHBBH6si6si', p)
self.saddr = self.ar_sip self.saddr = self.ar_sip
self.daddr = self.ar_tip self.daddr = self.ar_tip
self.__repr__ = self.__arp_repr__
elif self.eth_type == IP: elif self.eth_type == IP:
# IP # IP
(self.ihlvers, (self.ihlvers,
@ -146,18 +153,18 @@ class Frame:
dst_addr = property(get_dst_addr) dst_addr = property(get_dst_addr)
def __repr__(self): def __repr__(self):
return ('<Frame %s %s:%r(%08x) -> %s:%r(%08x) length %d>' % if self.eth_type == ARP:
(self.name,
self.src_addr, self.sport, self.seq,
self.dst_addr, self.dport, self.ack,
len(self.payload)))
def __arp_repr__(self):
return '<Frame %s %s(%s) -> %s(%s)>' % (self.name, return '<Frame %s %s(%s) -> %s(%s)>' % (self.name,
str_of_eth(self.ar_sha), str_of_eth(self.ar_sha),
self.src_addr, self.src_addr,
str_of_eth(self.ar_tha), str_of_eth(self.ar_tha),
self.dst_addr) self.dst_addr)
else:
return ('<Frame %s %s:%r(%08x) -> %s:%r(%08x) length %d>' %
(self.name,
self.src_addr, self.sport, self.seq,
self.dst_addr, self.dport, self.ack,
len(self.payload)))
class TCP_Recreate: class TCP_Recreate:
closed = True closed = True
@ -360,16 +367,6 @@ class TCP_Resequence:
return (xdi, first, gs) return (xdi, first, gs)
def handle(self, pkt):
"""Stub.
This function will never be called, it is immediately overridden
by __init__. The current value of self.handle is the state.
"""
raise NotImplementedError()
def handle_handshake(self, pkt): def handle_handshake(self, pkt):
if not self.first: if not self.first:
self.first = pkt self.first = pkt