2018-07-09 17:14:19 -06:00
|
|
|
#! /usr/bin/python3
|
|
|
|
|
|
|
|
## 2008, 2018 Neale Pickett
|
|
|
|
|
|
|
|
import itertools
|
|
|
|
|
|
|
|
class TriloBytes:
|
|
|
|
"""Three-level byte array (0, 1, Missing).
|
|
|
|
|
|
|
|
This allows you to represent on-wire transactions with holes in the middle,
|
|
|
|
due to eg. dropped packets.
|
2018-07-10 09:34:23 -06:00
|
|
|
|
|
|
|
>>> tb = TriloBytes(b'hi')
|
|
|
|
>>> bytes(tb)
|
|
|
|
b'hi'
|
|
|
|
>>> bytes(tb[:40])
|
|
|
|
b'hi'
|
|
|
|
|
|
|
|
>>> tb = TriloBytes(b'hi') + [None] * 3
|
|
|
|
>>> bytes(tb)
|
|
|
|
b'hi???'
|
|
|
|
>>> bytes(tb[:40])
|
|
|
|
b'hi???'
|
|
|
|
>>> bytes(tb[:3])
|
|
|
|
b'hi?'
|
|
|
|
>>> bytes(tb[-4:])
|
|
|
|
b'i???'
|
|
|
|
>>> bytes(tb + tb)
|
|
|
|
b'hi???hi???'
|
|
|
|
>>> bytes(tb ^ 1)
|
|
|
|
b'ih???'
|
|
|
|
>>> bytes(tb ^ [32, 1])
|
|
|
|
b'Hh???'
|
|
|
|
|
|
|
|
>>> tb = TriloBytes(b'hi', drop=b'DROP')
|
|
|
|
>>> bytes(tb)
|
|
|
|
b'hi'
|
2018-07-10 12:13:07 -06:00
|
|
|
>>> tb += [None] * 7
|
2018-07-10 09:34:23 -06:00
|
|
|
>>> bytes(tb)
|
|
|
|
b'hiOPDROPD'
|
|
|
|
|
2018-07-09 17:14:19 -06:00
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, initializer=(), drop=b'?'):
|
|
|
|
self._drop = drop
|
|
|
|
self._contents = tuple(initializer)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def fromhex(cls, string):
|
2018-07-10 09:34:23 -06:00
|
|
|
"""
|
|
|
|
>>> bytes(TriloBytes.fromhex("616263"))
|
|
|
|
b'abc'
|
|
|
|
"""
|
|
|
|
|
2018-07-09 17:14:19 -06:00
|
|
|
return cls(bytes.fromhex(string))
|
|
|
|
|
|
|
|
def __len__(self):
|
2018-07-10 09:34:23 -06:00
|
|
|
"""
|
|
|
|
>>> len(TriloBytes(b'abc'))
|
|
|
|
3
|
|
|
|
"""
|
|
|
|
|
2018-07-09 17:14:19 -06:00
|
|
|
return len(self._contents)
|
|
|
|
|
|
|
|
def __nonzero__(self):
|
2018-07-10 09:34:23 -06:00
|
|
|
"""
|
|
|
|
>>> 10 if TriloBytes() else -10
|
|
|
|
-10
|
|
|
|
>>> 10 if TriloBytes(b'a') else -10
|
|
|
|
10
|
|
|
|
"""
|
|
|
|
|
2018-07-09 17:14:19 -06:00
|
|
|
return len(self) > 0
|
|
|
|
|
|
|
|
def __getitem__(self, key):
|
|
|
|
ret = self._contents[key]
|
|
|
|
try:
|
|
|
|
return TriloBytes(ret, self._drop)
|
|
|
|
except:
|
|
|
|
return ret
|
|
|
|
|
|
|
|
def __iter__(self):
|
|
|
|
for val in self._contents:
|
|
|
|
yield val
|
|
|
|
|
|
|
|
def __bytes__(self):
|
|
|
|
return bytes((v or d for v,d in zip(self,itertools.cycle(self._drop))))
|
|
|
|
|
|
|
|
def __add__(self, other):
|
|
|
|
try:
|
|
|
|
contents = self._contents + other._contents
|
|
|
|
except AttributeError:
|
|
|
|
contents = self._contents + tuple(other)
|
|
|
|
return TriloBytes(contents, self._drop)
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
|
try:
|
|
|
|
return self._contents == other._contents
|
|
|
|
except:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def __hash__(self):
|
|
|
|
return hash(self._contents)
|
|
|
|
|
|
|
|
def __xor__(self, mask):
|
|
|
|
try:
|
|
|
|
mask[0]
|
|
|
|
except TypeError:
|
|
|
|
mask = [mask]
|
|
|
|
return TriloBytes(((x^y if x else None) for x,y in zip(self._contents, itertools.cycle(mask))), drop=self._drop)
|
|
|
|
|
|
|
|
def __repr__(self):
|
2018-07-10 09:34:23 -06:00
|
|
|
"""
|
|
|
|
>>> TriloBytes(b'abc')
|
|
|
|
<TriloBytes missing 0 of 3>
|
|
|
|
>>> TriloBytes(b'abc') + [None]
|
|
|
|
<TriloBytes missing 1 of 4>
|
|
|
|
"""
|
|
|
|
|
2018-07-09 17:14:19 -06:00
|
|
|
return '<TriloBytes missing %d of %d>' % (self.missing(), len(self))
|
|
|
|
|
|
|
|
def missing(self):
|
2018-07-10 09:34:23 -06:00
|
|
|
"""
|
|
|
|
>>> TriloBytes(b'abc').missing()
|
|
|
|
0
|
|
|
|
>>> (TriloBytes(b'abc') + [None, None]).missing()
|
|
|
|
2
|
|
|
|
"""
|
2018-07-09 17:14:19 -06:00
|
|
|
return self._contents.count(None)
|
|
|
|
|
|
|
|
def map(self, func, *args):
|
|
|
|
return (v if v is not None else func(v, *args) for v in self)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
2018-07-10 09:34:23 -06:00
|
|
|
import doctest
|
|
|
|
doctest.testmod()
|