moth/points.py

197 lines
4.1 KiB
Python
Executable File

#! /usr/bin/env python3
import socket
import hmac
import struct
import io
##
## Authentication
##
key = b'mollusks peck my galloping genitals'
def digest(data):
return hmac.new(key, data).digest()
def sign(data):
return data + digest(data)
def check_sig(data):
base, mac = data[:-16], data[-16:]
if mac == digest(base):
return base
else:
raise ValueError('Invalid message digest')
##
## Marshalling
##
def unpack(fmt, buf):
"""Unpack buf based on fmt, return the rest as a buffer."""
size = struct.calcsize(fmt)
vals = struct.unpack(fmt, buf[:size])
return vals + (buf[size:],)
def packstr(s):
b = bytes(s, 'utf-8')
return struct.pack('!H', len(b)) + b
def unpackstr(b):
l, b = unpack('!H', b)
s, b = b[:l], b[l:]
return str(s, 'utf-8'), b
##
## Request
##
def encode_request(when, cat, team, score):
base = (struct.pack('!I', when) +
packstr(cat) +
packstr(team) +
struct.pack('!i', score))
return sign(base)
def decode_request(b):
base = check_sig(b)
when, base = unpack('!I', base)
cat, base = unpackstr(base)
team, base = unpackstr(base)
score, base = unpack('!i', base)
assert not base
return (when, cat, team, score)
##
## Response
##
def encode_response(when, txt):
base = (struct.pack('!I', when) +
packstr(txt))
return sign(base)
def decode_response(b):
base = check_sig(b)
when, base = unpack('!I', base)
txt, base = unpackstr(base)
assert not base
return (when, txt)
##
## Storage
##
def incdict(dict, key, amt=1):
dict.setdefault(key, 0)
dict[key] += amt
class Storage:
def __init__(self, fn):
self.points_by_team = {}
self.points_by_cat = {}
self.points_by_cat_team = {}
self.log = []
self.events = set()
self.f = io.BytesIO()
# Read stored scores
try:
f = open(fn, 'rb')
while True:
l = f.read(4)
if not l:
break
(l,) = struct.unpack('!I', l)
b = f.read(l)
req = decode_request(b)
self.add(req)
f.close()
except IOError:
pass
self.f = open(fn, 'ab')
def __contains__(self, req):
return req in self.events
def add(self, req):
if req in self.events:
return
when, cat, team, score = req
incdict(self.points_by_team, team, score)
incdict(self.points_by_cat, cat, score)
incdict(self.points_by_cat_team, (cat, team), score)
self.log.append(req)
self.events.add(req)
b = encode_request(*req)
lb = struct.pack('!I', len(b)) + b
self.f.write(lb)
self.f.flush()
def categories(self):
return sorted(self.points_by_cat)
def teams(self):
return sorted(self.points_by_team)
def cat_points(self, cat):
return self.points_by_cat.get(cat, 0)
def team_points(self, team):
return self.points_by_team.get(team, 0)
def team_points_in_cat(self, cat, team):
return self.points_by_cat_team.get((cat, team), 0)
##
## Testing
##
def test():
import time
import os
now = int(time.time())
req = (now, 'category 5', 'foobers in heat', 43)
assert decode_request(encode_request(*req)) == req
rsp = (now, 'hello world')
assert decode_response(encode_response(*rsp)) == rsp
try:
os.unlink('test.dat')
except OSError:
pass
s = Storage('test.dat')
s.add((now, 'cat1', 'zebras', 20))
s.add((now, 'cat1', 'aardvarks', 10))
s.add((now, 'merf', 'aardvarks', 50))
assert s.teams() == ['aardvarks', 'zebras']
assert s.categories() == ['cat1', 'merf']
assert s.team_points('aardvarks') == 60
assert s.cat_points('cat1') == 30
assert s.team_points_in_cat('cat1', 'aardvarks') == 10
assert s.team_points_in_cat('merf', 'zebras') == 0
del s
s = Storage('test.dat')
assert s.teams() == ['aardvarks', 'zebras']
print('all tests pass; output file is test.dat')
if __name__ == '__main__':
test()