mirror of https://github.com/nealey/vail.git
Start enforcing message structure
This commit is contained in:
parent
f970376f17
commit
db9ca5dc83
|
@ -29,7 +29,7 @@ type bookEvent struct {
|
||||||
eventType bookEventType
|
eventType bookEventType
|
||||||
name string
|
name string
|
||||||
w io.Writer
|
w io.Writer
|
||||||
p []byte
|
m Message
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b Book) Join(name string, w io.Writer) {
|
func (b Book) Join(name string, w io.Writer) {
|
||||||
|
@ -48,11 +48,11 @@ func (b Book) Part(name string, w io.Writer) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b Book) Send(name string, p []byte) {
|
func (b Book) Send(name string, m Message) {
|
||||||
b.events <- bookEvent{
|
b.events <- bookEvent{
|
||||||
eventType: sendEvent,
|
eventType: sendEvent,
|
||||||
name: name,
|
name: name,
|
||||||
p: p,
|
m: m,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,6 +87,6 @@ func (b Book) loop() {
|
||||||
log.Println("WARN: Sending to an empty channel:", event.name)
|
log.Println("WARN: Sending to an empty channel:", event.name)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
repeater.Send(event.p)
|
repeater.Send(event.m)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,53 +7,63 @@ import (
|
||||||
|
|
||||||
func TestBook(t *testing.T) {
|
func TestBook(t *testing.T) {
|
||||||
b := NewBook()
|
b := NewBook()
|
||||||
|
m := TestMessage{Message{1, 2, []uint8{3, 4}}}
|
||||||
|
|
||||||
buf1 := bytes.NewBufferString("buf1")
|
buf1 := bytes.NewBufferString("buf1")
|
||||||
|
buf1Expect := bytes.NewBufferString("buf1")
|
||||||
b.Join("moo", buf1)
|
b.Join("moo", buf1)
|
||||||
|
m.Clients = 1
|
||||||
b.loop()
|
b.loop()
|
||||||
if len(b.entries) != 1 {
|
if len(b.entries) != 1 {
|
||||||
t.Error("Wrong number of entries")
|
t.Error("Wrong number of entries")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to an empty channel
|
// Send to an empty channel
|
||||||
b.Send("merf", []byte("goober"))
|
b.Send("merf", m.Message)
|
||||||
b.loop()
|
b.loop()
|
||||||
if buf1.String() != "buf1" {
|
if buf1.String() != buf1Expect.String() {
|
||||||
t.Error("Sending to empty channel sent to non-empty channel")
|
t.Error("Sending to empty channel sent to non-empty channel")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send to a non-empty channel!
|
// Send to a non-empty channel!
|
||||||
b.Send("moo", []byte("goober"))
|
b.Send("moo", m.Message)
|
||||||
b.loop()
|
b.loop()
|
||||||
if buf1.String() != "buf1goober" {
|
buf1Expect.Write(m.bytes())
|
||||||
|
if buf1.String() != buf1Expect.String() {
|
||||||
t.Error("Sending didn't work")
|
t.Error("Sending didn't work")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Join another client
|
// Join another client
|
||||||
buf2 := bytes.NewBufferString("buf2")
|
buf2 := bytes.NewBufferString("buf2")
|
||||||
|
buf2Expect := bytes.NewBufferString("buf2")
|
||||||
b.Join("moo", buf2)
|
b.Join("moo", buf2)
|
||||||
|
m.Clients = 2
|
||||||
b.loop()
|
b.loop()
|
||||||
|
|
||||||
// Send to both
|
// Send to both
|
||||||
b.Send("moo", []byte("snerk"))
|
b.Send("moo", m.Message)
|
||||||
b.loop()
|
b.loop()
|
||||||
if buf1.String() != "buf1goobersnerk" {
|
buf1Expect.Write(m.bytes())
|
||||||
|
buf2Expect.Write(m.bytes())
|
||||||
|
if buf1.String() != buf1Expect.String() {
|
||||||
t.Error("Send to 2-member channel busted", buf1)
|
t.Error("Send to 2-member channel busted", buf1)
|
||||||
}
|
}
|
||||||
if buf2.String() != "buf2snerk" {
|
if buf2.String() != buf2Expect.String() {
|
||||||
t.Error("Send to 2-member channel busted", buf2)
|
t.Error("Send to 2-member channel busted", buf2)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Part a client
|
// Part a client
|
||||||
b.Part("moo", buf1)
|
b.Part("moo", buf1)
|
||||||
b.loop()
|
b.loop()
|
||||||
|
m.Clients = 1
|
||||||
|
|
||||||
b.Send("moo", []byte("peanut"))
|
b.Send("moo", m.Message)
|
||||||
b.loop()
|
b.loop()
|
||||||
if buf1.String() != "buf1goobersnerk" {
|
buf2Expect.Write(m.bytes())
|
||||||
|
if buf1.String() != buf1Expect.String() {
|
||||||
t.Error("Parted channel but still getting messages", buf1)
|
t.Error("Parted channel but still getting messages", buf1)
|
||||||
}
|
}
|
||||||
if buf2.String() != "buf2snerkpeanut" {
|
if buf2.String() != buf2Expect.String() {
|
||||||
t.Error("Someone else parting somehow messed up sends", buf2)
|
t.Error("Someone else parting somehow messed up sends", buf2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,10 +2,11 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
|
||||||
"os"
|
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"golang.org/x/net/websocket"
|
"golang.org/x/net/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -16,12 +17,13 @@ type Client struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c Client) Handle(ws *websocket.Conn) {
|
func (c Client) Handle(ws *websocket.Conn) {
|
||||||
ws.MaxPayloadBytes = 500
|
nowMilli := time.Now().UnixMilli()
|
||||||
|
ws.MaxPayloadBytes = 50
|
||||||
book.Join(c.repeaterName, ws)
|
book.Join(c.repeaterName, ws)
|
||||||
defer book.Part(c.repeaterName, ws)
|
defer book.Part(c.repeaterName, ws)
|
||||||
|
|
||||||
// Tell the client what time we think it is
|
// Tell the client what time we think it is
|
||||||
fmt.Fprintf(ws, "[%d]", time.Now().UnixNano() / time.Millisecond.Nanoseconds())
|
fmt.Fprintf(ws, "[%d]", time.Now().UnixNano()/time.Millisecond.Nanoseconds())
|
||||||
|
|
||||||
for {
|
for {
|
||||||
buf := make([]byte, ws.MaxPayloadBytes)
|
buf := make([]byte, ws.MaxPayloadBytes)
|
||||||
|
@ -29,15 +31,34 @@ func (c Client) Handle(ws *websocket.Conn) {
|
||||||
if n, err := ws.Read(buf); err != nil {
|
if n, err := ws.Read(buf); err != nil {
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
buf = buf[:n]
|
buf = buf[:n]
|
||||||
}
|
}
|
||||||
|
|
||||||
book.Send(c.repeaterName, buf)
|
// Decode into a Message
|
||||||
|
var m Message
|
||||||
|
if err := m.UnmarshalBinary(buf); err != nil {
|
||||||
|
fmt.Fprintln(ws, err)
|
||||||
|
ws.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If it's wildly out of time, reject it
|
||||||
|
timeDelta := (nowMilli - m.Timestamp)
|
||||||
|
if timeDelta < 0 {
|
||||||
|
timeDelta = -timeDelta
|
||||||
|
}
|
||||||
|
if timeDelta > 9999 {
|
||||||
|
fmt.Fprintln(ws, "Bad timestamp")
|
||||||
|
ws.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
book.Send(c.repeaterName, m)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ChatHandler(w http.ResponseWriter, r *http.Request) {
|
func ChatHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
c := Client {
|
c := Client{
|
||||||
repeaterName: r.FormValue("repeater"),
|
repeaterName: r.FormValue("repeater"),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,7 +78,7 @@ func main() {
|
||||||
port = "8080"
|
port = "8080"
|
||||||
}
|
}
|
||||||
log.Println("Listening on port", port)
|
log.Println("Listening on port", port)
|
||||||
err := http.ListenAndServe(":" + port, nil)
|
err := http.ListenAndServe(":"+port, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err.Error())
|
log.Fatal(err.Error())
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,11 +8,12 @@ import (
|
||||||
|
|
||||||
// VailMessage is a single Vail message.
|
// VailMessage is a single Vail message.
|
||||||
type Message struct {
|
type Message struct {
|
||||||
// Relative time in ms of this message.
|
// Timestamp of this message. Milliseconds since epoch.
|
||||||
// These timestamps need to be consistent, but the offset can be anything.
|
|
||||||
// ECMAScript `performance.now()` is ideal.
|
|
||||||
Timestamp int64
|
Timestamp int64
|
||||||
|
|
||||||
|
// Number of connected clients
|
||||||
|
Clients uint16
|
||||||
|
|
||||||
// Message timing in ms.
|
// Message timing in ms.
|
||||||
// Timings alternate between tone and silence.
|
// Timings alternate between tone and silence.
|
||||||
// For example, `A` could be sent as [80, 80, 240]
|
// For example, `A` could be sent as [80, 80, 240]
|
||||||
|
@ -22,13 +23,13 @@ type Message struct {
|
||||||
func NewMessage(ts time.Time, durations []time.Duration) Message {
|
func NewMessage(ts time.Time, durations []time.Duration) Message {
|
||||||
msg := Message{
|
msg := Message{
|
||||||
Timestamp: ts.UnixNano() / time.Millisecond.Nanoseconds(),
|
Timestamp: ts.UnixNano() / time.Millisecond.Nanoseconds(),
|
||||||
Duration: make([]uint8, len(durations)),
|
Duration: make([]uint8, len(durations)),
|
||||||
}
|
}
|
||||||
for i, dns := range durations {
|
for i, dns := range durations {
|
||||||
ms := dns.Milliseconds()
|
ms := dns.Milliseconds()
|
||||||
if (ms > 255) {
|
if ms > 255 {
|
||||||
ms = 255
|
ms = 255
|
||||||
} else if (ms < 0) {
|
} else if ms < 0 {
|
||||||
ms = 0
|
ms = 0
|
||||||
}
|
}
|
||||||
msg.Duration[i] = uint8(ms)
|
msg.Duration[i] = uint8(ms)
|
||||||
|
@ -42,6 +43,9 @@ func (m Message) MarshalBinary() ([]byte, error) {
|
||||||
if err := binary.Write(&w, binary.BigEndian, m.Timestamp); err != nil {
|
if err := binary.Write(&w, binary.BigEndian, m.Timestamp); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if err := binary.Write(&w, binary.BigEndian, m.Clients); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
if err := binary.Write(&w, binary.BigEndian, m.Duration); err != nil {
|
if err := binary.Write(&w, binary.BigEndian, m.Duration); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -54,6 +58,9 @@ func (m *Message) UnmarshalBinary(data []byte) error {
|
||||||
if err := binary.Read(r, binary.BigEndian, &m.Timestamp); err != nil {
|
if err := binary.Read(r, binary.BigEndian, &m.Timestamp); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err := binary.Read(r, binary.BigEndian, &m.Clients); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
dlen := r.Len()
|
dlen := r.Len()
|
||||||
m.Duration = make([]uint8, dlen)
|
m.Duration = make([]uint8, dlen)
|
||||||
if err := binary.Read(r, binary.BigEndian, &m.Duration); err != nil {
|
if err := binary.Read(r, binary.BigEndian, &m.Duration); err != nil {
|
||||||
|
|
|
@ -6,9 +6,9 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestMessage(t *testing.T) {
|
func TestMessageStruct(t *testing.T) {
|
||||||
m := Message{0x1122334455, []uint8{0xaa, 0xbb, 0xcc}}
|
m := Message{0x1122334455, 0, []uint8{0xaa, 0xbb, 0xcc}}
|
||||||
m2 := Message{12, []uint8{1}}
|
m2 := Message{12, 0, []uint8{1}}
|
||||||
|
|
||||||
if !m.Equal(m) {
|
if !m.Equal(m) {
|
||||||
t.Error("Equal messages did not compare equal")
|
t.Error("Equal messages did not compare equal")
|
||||||
|
@ -16,7 +16,7 @@ func TestMessage(t *testing.T) {
|
||||||
if m.Equal(m2) {
|
if m.Equal(m2) {
|
||||||
t.Error("Unequal messages compared equal")
|
t.Error("Unequal messages compared equal")
|
||||||
}
|
}
|
||||||
if m.Equal(Message{m.Timestamp, []uint8{1, 2, 3}}) {
|
if m.Equal(Message{m.Timestamp, 0, []uint8{1, 2, 3}}) {
|
||||||
t.Error("Messages with different payloads compared equal")
|
t.Error("Messages with different payloads compared equal")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -24,7 +24,7 @@ func TestMessage(t *testing.T) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
if !bytes.Equal(bm, []byte("\x00\x00\x00\x11\x22\x33\x44\x55\xaa\xbb\xcc")) {
|
if !bytes.Equal(bm, []byte("\x00\x00\x00\x11\x22\x33\x44\x55\x00\x00\xaa\xbb\xcc")) {
|
||||||
t.Error("Encoded wrong:", bm)
|
t.Error("Encoded wrong:", bm)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A Repeater is just a list of Writers.
|
// A Repeater is just a list of Writers.
|
||||||
|
@ -29,9 +30,14 @@ func (r *Repeater) Part(w io.Writer) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Repeater) Send(p []byte) {
|
func (r *Repeater) Send(m Message) {
|
||||||
|
m.Clients = uint16(r.Listeners())
|
||||||
|
buf, err := m.MarshalBinary()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
for _, s := range r.writers {
|
for _, s := range r.writers {
|
||||||
s.Write(p)
|
s.Write(buf)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,35 +5,54 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type TestMessage struct {
|
||||||
|
Message
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m TestMessage) bytes() []byte {
|
||||||
|
b, _ := m.MarshalBinary()
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
func TestRepeater(t *testing.T) {
|
func TestRepeater(t *testing.T) {
|
||||||
r := NewRepeater()
|
r := NewRepeater()
|
||||||
|
m := TestMessage{Message{1, 3, []uint8{3, 4}}}
|
||||||
|
|
||||||
buf1 := bytes.NewBufferString("buf1")
|
buf1 := bytes.NewBufferString("buf1")
|
||||||
|
buf1Expect := bytes.NewBufferString("buf1")
|
||||||
r.Join(buf1)
|
r.Join(buf1)
|
||||||
if r.Listeners() != 1 {
|
if r.Listeners() != 1 {
|
||||||
t.Error("Joining did nothing")
|
t.Error("Joining did nothing")
|
||||||
}
|
}
|
||||||
r.Send([]byte("moo"))
|
r.Send(m.Message)
|
||||||
if buf1.String() != "buf1moo" {
|
m.Clients = 1
|
||||||
|
buf1Expect.Write(m.bytes())
|
||||||
|
if buf1.String() != buf1Expect.String() {
|
||||||
t.Error("Client 1 not repeating", buf1)
|
t.Error("Client 1 not repeating", buf1)
|
||||||
}
|
}
|
||||||
|
|
||||||
buf2 := bytes.NewBufferString("buf2")
|
buf2 := bytes.NewBufferString("buf2")
|
||||||
|
buf2Expect := bytes.NewBufferString("buf2")
|
||||||
r.Join(buf2)
|
r.Join(buf2)
|
||||||
r.Send([]byte("bar"))
|
r.Send(m.Message)
|
||||||
if buf1.String() != "buf1moobar" {
|
m.Clients = 2
|
||||||
t.Error("Client 1 not repeating", buf1)
|
buf1Expect.Write(m.bytes())
|
||||||
|
buf2Expect.Write(m.bytes())
|
||||||
|
if buf1.String() != buf1Expect.String() {
|
||||||
|
t.Errorf("Client 1 not repeating %#v %#v", buf1, buf1Expect)
|
||||||
}
|
}
|
||||||
if buf2.String() != "buf2bar" {
|
if buf2.String() != buf2Expect.String() {
|
||||||
t.Error("Client 2 not repeating", buf2)
|
t.Error("Client 2 not repeating", buf2)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.Part(buf1)
|
r.Part(buf1)
|
||||||
r.Send([]byte("baz"))
|
r.Send(m.Message)
|
||||||
if buf1.String() != "buf1moobar" {
|
m.Clients = 1
|
||||||
|
buf2Expect.Write(m.bytes())
|
||||||
|
if buf1.String() != buf1Expect.String() {
|
||||||
t.Error("Client 1 still getting data after part", buf1)
|
t.Error("Client 1 still getting data after part", buf1)
|
||||||
}
|
}
|
||||||
if buf2.String() != "buf2barbaz" {
|
if buf2.String() != buf2Expect.String() {
|
||||||
t.Error("Client 2 not getting data after part", buf2)
|
t.Error("Client 2 not getting data after part", buf2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue