Add xmodem example

This commit is contained in:
Neale Pickett 2021-07-02 17:18:59 -06:00
parent 8cd7cb0bdc
commit 6a1cbaec73
7 changed files with 69 additions and 17 deletions

View File

@ -11,6 +11,9 @@ and room to grow these explorations into full-blown decoders.
Get going Get going
========= =========
Documentation sucks, sorry. Documentation doesn't exist. Sorry.
The way we go about things is to copy `dumbdecode.py` to a new file, The way we go about things is to copy [dumbdecode](examples/dumbdecode.py) to a new file,
and start hacking onto it. and start hacking onto it.
You may find the [example xmodem decoder](examples/xmodem.py) to be helpful!
It illustrates a fair amount of what the library provides.

View File

@ -1,7 +1,6 @@
#! /usr/bin/python3 #! /usr/bin/python3
import netarch import netarch
import sys
class DumbPacket(netarch.Packet): class DumbPacket(netarch.Packet):
def parse(self, data): def parse(self, data):
@ -10,4 +9,4 @@ class DumbPacket(netarch.Packet):
class DumbSession(netarch.Session): class DumbSession(netarch.Session):
Packet = DumbPacket Packet = DumbPacket
netarch.main(DumbSession, sys.argv[1:]) netarch.main(DumbSession)

BIN
examples/xmodem.pcap Normal file

Binary file not shown.

45
examples/xmodem.py Executable file
View File

@ -0,0 +1,45 @@
#! /usr/bin/python3
import netarch
class XmodemPacket(netarch.Packet):
def parse(self, data):
datastream = netarch.Unpacker(data)
self.opcode = datastream.uint8()
self.payload = datastream.buf
def opcode_1(self):
"Xfer"
datastream = netarch.Unpacker(self.payload)
self["seq"] = datastream.uint8()
self["~seq"] = datastream.uint8()
assert self["seq"] == 255 - self["~seq"]
assert len(datastream.buf) == 0x81
self["checksum"] = datastream.uint8(pos=-1)
self.payload = datastream.buf
def opcode_4(self):
"EOT"
def opcode_6(self):
"ACK"
def opcode_21(self):
"Begin transmission?"
class XmodemSession(netarch.Session):
Packet = XmodemPacket
def process(self, packet):
packet.show()
if packet.opcode == 21:
# Open a new file for output
self.out = self.open_out("data.bin")
elif packet.opcode == 1:
self.out.write(bytes(packet.payload))
netarch.main(XmodemSession)

View File

@ -2,8 +2,9 @@
import binascii import binascii
import struct import struct
import sys
from . import ip from . import ip
from .unpack import Unpacker
def cstring(buf): def cstring(buf):
@ -180,8 +181,10 @@ def _registry(encoding):
codecs.register(_registry) codecs.register(_registry)
def main(session, pcaps): def main(session, pcaps=None):
s = None s = None
if not pcaps:
pcaps = sys.argv[1:]
reseq = ip.Dispatch(*pcaps) reseq = ip.Dispatch(*pcaps)
for _, d in reseq: for _, d in reseq:
srv, first, chunk = d srv, first, chunk = d

View File

@ -39,7 +39,7 @@ class Unpacker:
self.endian = endian self.endian = endian
self.buf = buf self.buf = buf
def uint(self, size, endian=None): def uint(self, size, pos=0, endian=None):
endian = endian or self.endian endian = endian or self.endian
if size not in (8, 16, 32, 64): if size not in (8, 16, 32, 64):
# XXX: I'm pretty sure this can be done, but I don't want to code it up right now. # XXX: I'm pretty sure this can be done, but I don't want to code it up right now.
@ -53,20 +53,22 @@ class Unpacker:
r = (1, 0, 3, 2, 5, 4, 7, 6)[:noctets] r = (1, 0, 3, 2, 5, 4, 7, 6)[:noctets]
else: else:
raise ValueError("Unsupported byte order") raise ValueError("Unsupported byte order")
pull, self.buf = self.buf[:noctets], self.buf[noctets:] if pos < 0:
pos += len(self.buf)
pull, self.buf = self.buf[pos:pos+noctets], self.buf[:pos] + self.buf[pos+noctets:]
acc = 0 acc = 0
for i in r: for i in r:
acc = (acc << 8) | pull[i] acc = (acc << 8) | pull[i]
return acc return acc
def uint8(self): def uint8(self, pos=0):
return self.uint(8) return self.uint(8, pos)
def uint16(self, endian=None): def uint16(self, pos=0, endian=None):
return self.uint(16, endian) return self.uint(16, pos, endian)
def uint32(self, endian=None): def uint32(self, pos=0, endian=None):
return self.uint(32, endian) return self.uint(32, pos, endian)
def uint64(self, endian=None): def uint64(self, pos=0, endian=None):
return self.uint(64, endian) return self.uint(64, pos, endian)
if __name__ == "__main__": if __name__ == "__main__":
import doctest import doctest

View File

@ -38,7 +38,7 @@ setup(
# For a discussion on single-sourcing the version across setup.py and the # For a discussion on single-sourcing the version across setup.py and the
# project code, see # project code, see
# https://packaging.python.org/en/latest/single_source_version.html # https://packaging.python.org/en/latest/single_source_version.html
version='1.0.1', # Required version='2.1.0', # Required
# This is a one-line description or tagline of what your project does. This # This is a one-line description or tagline of what your project does. This
# corresponds to the "Summary" metadata field: # corresponds to the "Summary" metadata field: