netarch/__init__.py

198 lines
6.5 KiB
Python

import typing
import io
from . import binary
class Error(Exception):
"""Base class for netshovel exceptions"""
pass
class ShortError(Error):
"""Exception raised when not enough data is available.
Attributes:
wanted -- how much data we wanted
available -- how much data we had
"""
def __init__(self, wanted:int, available:int):
self.wanted = wanted
self.available = available
def __str__(self):
return "Not enough data available: wanted %d, got %d" % (self.wanted, self.got)
class MissingError(Error):
"""Exception raised when gaps were present for code that can't handle gaps.
"""
def __init__(self):
pass
def __str__(self):
return "Operation on missing bytes"
class namedField(typing.NamedTuple):
key: str
value: str
class headerField(typing.NamedTuple):
name: str
bits: int
value: typing.Any
order: binary.ByteOrder
class Packet:
def __init__(self, when, payload):
self.opcode = -1
self.description = "Undefined"
self.when = when
self.payload = payload
self.header = []
self.fields = []
def describeType(self) -> str:
"""Returns a string with timestamp, opcode, and description of this packet"""
return "%s Opcode %d: %s" % (self.when, self.opcode, self.description)
def describeFields(self) -> str:
"""Returns a multi-line string describing fields in this packet"""
lines = []
for k, v in self.fields:
lines.append(" %s: %s\n", k, v)
return "".join(lines)
def describeHeader(self) -> str:
"""Returns a multi-line string describing this packet's header structure"""
out = io.StringIO()
out.write(" 0 1 \n")
out.write(" 0 1 2 3 4 5 6 7 8 9 a b c d e f 0 1 2 3 4 5 6 7 8 9 a b c d e f\n")
bitOffset = 0
for f in self.header:
bits = f.bits
while bits > 0:
if bitOffset == 0:
out.write("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n")
linebits = bits
if linebits+bitOffset > 0x20:
linebits = 0x20 - bitOffset
nameval = "%s (0x%x)" % (f.name, f.value)
out.write("|" + nameval.center(linebits*2-1))
bitOffset += linebits
bits -= linebits
if linebits == 0x20:
out.write("|\n")
bitOffset = 0
out.write("+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+\n")
return out.getvalue()
def describe(self) -> str:
"""Return a multi-line string describing this packet
This shows the timestamp, opcode, description, and hex dump.
If you set any values, those are displayed in the order they were set.
This will quickly get unweildy, especially for large conversations.
You are encouraged to implement your own describe() method.
"""
out = io.StringIO()
out.write(self.describeType())
out.write("\n")
out.write(self.describeFields())
out.write(self.describeHeader())
out.write(self.payload.hexdump())
return out.getvalue()
def setValue(self, key:str, value:str):
"""Set a value
This is intended to be used to note debugging information
that you'd like to see on each packet.
"""
self.fields.append(namedField(key, value))
def setString(self, key:str, value:str):
"""Set a string value, displaying its Python string representation"""
self.setValue(key, repr(value))
def setInt(self, key:str, value:int):
"""Set an int value, displaying its decimal and hexadecimal representations"""
self.setValue(key, "%d == 0x%x" % (value, value))
setUInt = setInt
def setUInt32(self, key:str, value:int):
"""Set a Uint32 value, dispalying its decimal and 0-padded hexadecimal representations"""
self.setValue(key, "%d == %04x" % (value, value))
def setBytes(self, key:str, value:str):
"""Set a bytes value, displaying the hex encoding of the bytes"""
self.setValue(key, binascii.hexlify(value).encode("ascii"))
def peel(self, octets:int) -> bytes:
"""Peel octets bytes off the Payload, returning those bytes"""
pllen = len(self.payload)
if octets > pllen:
raise ShortError(octets, pllen)
buf = self.payload[:octets]
if buf.missing() > 0:
raise MissingError()
self.payload = self.payload[octets:]
return buf.bytes()
def addHeaderField(self, order:binary.ByteOrder, name:str, bits:int, value:typing.Any):
"""Add a field to the header field description."""
h = headerField(name, bits, value, order)
self.header.append(h)
def readUint(self, order:binary.ByteOrder, bits:int, name:str):
"""Peel an unsigned integer of size bits, adding it to the header field"""
if bits not in (8, 16, 32, 64):
raise RuntimeError("Weird number of bits: %d" % bits)
octets = bits >> 3
b = self.peel(octets)
if bits == 8:
value = b[0]
elif bits == 16:
value = order.Uint16(b)
elif bits == 32:
value = order.Uint32(b)
elif bits == 64:
value = order.Uint64(b)
self.addHeaderField(order, name, bits, value)
return value
def uint8(self, name:str) -> int:
"Peel off a uint8 (aka byte)"
return self.readUint(binary.LittleEndian, 8, name)
def uint16le(self, name:str) -> int:
"Peel off a uint16, little-endian"
return self.readUint(binary.LittleEndian, 16, name)
def uint32le(self, name:str) -> int:
"Peel off a uint32, little-endian"
return self.readUint(binary.LittleEndian, 32, name)
def uint64le(self, name:str) -> int:
"Peel off a uint64, little-endian"
return self.readUint(binary.LittleEndian, 64, name)
def uint16be(self, name:str) -> int:
"Peel off a uint64, big-endian"
return self.readUint(binary.BigEndian, 16, name)
def uint32be(self, name:str) -> int:
"Peel off a uint32, big-endian"
return self.readUint(binary.BigEndian, 32, name)
def uint64be(self, name:str) -> int:
"Peel off a uint44, big-endian"
return self.readUint(binary.BigEndian, 64, name)