netarch/__init__.py

198 lines
6.5 KiB
Python
Raw Permalink Normal View History

2008-07-08 17:52:28 -06:00
import typing
import io
from . import binary
2007-12-14 16:26:30 -07:00
class Error(Exception):
"""Base class for netshovel exceptions"""
pass
2007-12-14 16:26:30 -07:00
class ShortError(Error):
"""Exception raised when not enough data is available.
2008-07-08 17:52:28 -06:00
Attributes:
wanted -- how much data we wanted
available -- how much data we had
"""
def __init__(self, wanted:int, available:int):
self.wanted = wanted
self.available = available
2007-12-14 16:26:30 -07:00
def __str__(self):
return "Not enough data available: wanted %d, got %d" % (self.wanted, self.got)
2008-07-08 17:52:28 -06:00
class MissingError(Error):
"""Exception raised when gaps were present for code that can't handle gaps.
"""
def __init__(self):
pass
2008-07-14 16:27:45 -06:00
def __str__(self):
return "Operation on missing bytes"
class namedField(typing.NamedTuple):
key: str
value: str
class headerField(typing.NamedTuple):
name: str
bits: int
value: typing.Any
order: binary.ByteOrder
class Packet:
def __init__(self, when, payload):
self.opcode = -1
self.description = "Undefined"
self.when = when
self.payload = payload
self.header = []
self.fields = []
def describeType(self) -> str:
"""Returns a string with timestamp, opcode, and description of this packet"""
return "%s Opcode %d: %s" % (self.when, self.opcode, self.description)
def describeFields(self) -> str:
"""Returns a multi-line string describing fields in this packet"""
lines = []
for k, v in self.fields:
lines.append(" %s: %s\n", k, v)
return "".join(lines)
def describeHeader(self) -> str:
"""Returns a multi-line string describing this packet's header structure"""
out = io.StringIO()
out.write(" 0 1 \n")
out.write(" 0 1 2 3 4 5 6 7 8 9 a b c d e f 0 1 2 3 4 5 6 7 8 9 a b c d e f\n")
bitOffset = 0
for f in self.header:
bits = f.bits
while bits > 0:
if bitOffset == 0:
out.write("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n")
linebits = bits
if linebits+bitOffset > 0x20:
linebits = 0x20 - bitOffset
nameval = "%s (0x%x)" % (f.name, f.value)
out.write("|" + nameval.center(linebits*2-1))
bitOffset += linebits
bits -= linebits
if linebits == 0x20:
out.write("|\n")
bitOffset = 0
out.write("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n")
return out.getvalue()
def describe(self) -> str:
"""Return a multi-line string describing this packet
This shows the timestamp, opcode, description, and hex dump.
If you set any values, those are displayed in the order they were set.
This will quickly get unweildy, especially for large conversations.
You are encouraged to implement your own describe() method.
"""
out = io.StringIO()
out.write(self.describeType())
out.write("\n")
out.write(self.describeFields())
out.write(self.describeHeader())
out.write(self.payload.hexdump())
return out.getvalue()
def setValue(self, key:str, value:str):
"""Set a value
This is intended to be used to note debugging information
that you'd like to see on each packet.
"""
self.fields.append(namedField(key, value))
def setString(self, key:str, value:str):
"""Set a string value, displaying its Python string representation"""
self.setValue(key, repr(value))
def setInt(self, key:str, value:int):
"""Set an int value, displaying its decimal and hexadecimal representations"""
self.setValue(key, "%d == 0x%x" % (value, value))
setUInt = setInt
def setUInt32(self, key:str, value:int):
"""Set a Uint32 value, dispalying its decimal and 0-padded hexadecimal representations"""
self.setValue(key, "%d == %04x" % (value, value))
def setBytes(self, key:str, value:str):
"""Set a bytes value, displaying the hex encoding of the bytes"""
self.setValue(key, binascii.hexlify(value).encode("ascii"))
def peel(self, octets:int) -> bytes:
"""Peel octets bytes off the Payload, returning those bytes"""
pllen = len(self.payload)
if octets > pllen:
raise ShortError(octets, pllen)
buf = self.payload[:octets]
if buf.missing() > 0:
raise MissingError()
self.payload = self.payload[octets:]
return buf.bytes()
def addHeaderField(self, order:binary.ByteOrder, name:str, bits:int, value:typing.Any):
"""Add a field to the header field description."""
h = headerField(name, bits, value, order)
self.header.append(h)
def readUint(self, order:binary.ByteOrder, bits:int, name:str):
"""Peel an unsigned integer of size bits, adding it to the header field"""
if bits not in (8, 16, 32, 64):
raise RuntimeError("Weird number of bits: %d" % bits)
octets = bits >> 3
b = self.peel(octets)
if bits == 8:
value = b[0]
elif bits == 16:
value = order.Uint16(b)
elif bits == 32:
value = order.Uint32(b)
elif bits == 64:
value = order.Uint64(b)
self.addHeaderField(order, name, bits, value)
return value
def uint8(self, name:str) -> int:
"Peel off a uint8 (aka byte)"
return self.readUint(binary.LittleEndian, 8, name)
def uint16le(self, name:str) -> int:
"Peel off a uint16, little-endian"
return self.readUint(binary.LittleEndian, 16, name)
def uint32le(self, name:str) -> int:
"Peel off a uint32, little-endian"
return self.readUint(binary.LittleEndian, 32, name)
def uint64le(self, name:str) -> int:
"Peel off a uint64, little-endian"
return self.readUint(binary.LittleEndian, 64, name)
def uint16be(self, name:str) -> int:
"Peel off a uint64, big-endian"
return self.readUint(binary.BigEndian, 16, name)
def uint32be(self, name:str) -> int:
"Peel off a uint32, big-endian"
return self.readUint(binary.BigEndian, 32, name)
def uint64be(self, name:str) -> int:
"Peel off a uint44, big-endian"
return self.readUint(binary.BigEndian, 64, name)