diff --git a/cmd/vail/book.go b/cmd/vail/book.go index 15c0823..6f8bafa 100644 --- a/cmd/vail/book.go +++ b/cmd/vail/book.go @@ -29,7 +29,7 @@ type bookEvent struct { eventType bookEventType name string w io.Writer - p []byte + m Message } func (b Book) Join(name string, w io.Writer) { @@ -48,11 +48,11 @@ func (b Book) Part(name string, w io.Writer) { } } -func (b Book) Send(name string, p []byte) { +func (b Book) Send(name string, m Message) { b.events <- bookEvent{ eventType: sendEvent, name: name, - p: p, + m: m, } } @@ -87,6 +87,6 @@ func (b Book) loop() { log.Println("WARN: Sending to an empty channel:", event.name) break } - repeater.Send(event.p) + repeater.Send(event.m) } } diff --git a/cmd/vail/book_test.go b/cmd/vail/book_test.go index 0eabb9f..fd59482 100644 --- a/cmd/vail/book_test.go +++ b/cmd/vail/book_test.go @@ -7,53 +7,63 @@ import ( func TestBook(t *testing.T) { b := NewBook() + m := TestMessage{Message{1, 2, []uint8{3, 4}}} buf1 := bytes.NewBufferString("buf1") + buf1Expect := bytes.NewBufferString("buf1") b.Join("moo", buf1) + m.Clients = 1 b.loop() if len(b.entries) != 1 { t.Error("Wrong number of entries") } // Send to an empty channel - b.Send("merf", []byte("goober")) + b.Send("merf", m.Message) b.loop() - if buf1.String() != "buf1" { + if buf1.String() != buf1Expect.String() { t.Error("Sending to empty channel sent to non-empty channel") } // Send to a non-empty channel! - b.Send("moo", []byte("goober")) + b.Send("moo", m.Message) b.loop() - if buf1.String() != "buf1goober" { + buf1Expect.Write(m.bytes()) + if buf1.String() != buf1Expect.String() { t.Error("Sending didn't work") } // Join another client buf2 := bytes.NewBufferString("buf2") + buf2Expect := bytes.NewBufferString("buf2") b.Join("moo", buf2) + m.Clients = 2 b.loop() // Send to both - b.Send("moo", []byte("snerk")) + b.Send("moo", m.Message) b.loop() - if buf1.String() != "buf1goobersnerk" { + buf1Expect.Write(m.bytes()) + buf2Expect.Write(m.bytes()) + if buf1.String() != buf1Expect.String() { t.Error("Send to 2-member channel busted", buf1) } - if buf2.String() != "buf2snerk" { + if buf2.String() != buf2Expect.String() { t.Error("Send to 2-member channel busted", buf2) } // Part a client b.Part("moo", buf1) b.loop() + m.Clients = 1 - b.Send("moo", []byte("peanut")) + b.Send("moo", m.Message) b.loop() - if buf1.String() != "buf1goobersnerk" { + buf2Expect.Write(m.bytes()) + if buf1.String() != buf1Expect.String() { t.Error("Parted channel but still getting messages", buf1) } - if buf2.String() != "buf2snerkpeanut" { + if buf2.String() != buf2Expect.String() { t.Error("Someone else parting somehow messed up sends", buf2) } } diff --git a/cmd/vail/main.go b/cmd/vail/main.go index b602fa7..8e305ca 100644 --- a/cmd/vail/main.go +++ b/cmd/vail/main.go @@ -2,10 +2,11 @@ package main import ( "fmt" - "time" - "os" "log" "net/http" + "os" + "time" + "golang.org/x/net/websocket" ) @@ -16,12 +17,13 @@ type Client struct { } func (c Client) Handle(ws *websocket.Conn) { - ws.MaxPayloadBytes = 500 + nowMilli := time.Now().UnixMilli() + ws.MaxPayloadBytes = 50 book.Join(c.repeaterName, ws) defer book.Part(c.repeaterName, ws) - + // Tell the client what time we think it is - fmt.Fprintf(ws, "[%d]", time.Now().UnixNano() / time.Millisecond.Nanoseconds()) + fmt.Fprintf(ws, "[%d]", time.Now().UnixNano()/time.Millisecond.Nanoseconds()) for { buf := make([]byte, ws.MaxPayloadBytes) @@ -29,15 +31,34 @@ func (c Client) Handle(ws *websocket.Conn) { if n, err := ws.Read(buf); err != nil { break } else { - buf = buf[:n] + buf = buf[:n] } - - book.Send(c.repeaterName, buf) + + // Decode into a Message + var m Message + if err := m.UnmarshalBinary(buf); err != nil { + fmt.Fprintln(ws, err) + ws.Close() + return + } + + // If it's wildly out of time, reject it + timeDelta := (nowMilli - m.Timestamp) + if timeDelta < 0 { + timeDelta = -timeDelta + } + if timeDelta > 9999 { + fmt.Fprintln(ws, "Bad timestamp") + ws.Close() + return + } + + book.Send(c.repeaterName, m) } } func ChatHandler(w http.ResponseWriter, r *http.Request) { - c := Client { + c := Client{ repeaterName: r.FormValue("repeater"), } @@ -57,7 +78,7 @@ func main() { port = "8080" } log.Println("Listening on port", port) - err := http.ListenAndServe(":" + port, nil) + err := http.ListenAndServe(":"+port, nil) if err != nil { log.Fatal(err.Error()) } diff --git a/cmd/vail/message.go b/cmd/vail/message.go index c9d80db..5cadb7f 100644 --- a/cmd/vail/message.go +++ b/cmd/vail/message.go @@ -8,11 +8,12 @@ import ( // VailMessage is a single Vail message. type Message struct { - // Relative time in ms of this message. - // These timestamps need to be consistent, but the offset can be anything. - // ECMAScript `performance.now()` is ideal. + // Timestamp of this message. Milliseconds since epoch. Timestamp int64 - + + // Number of connected clients + Clients uint16 + // Message timing in ms. // Timings alternate between tone and silence. // For example, `A` could be sent as [80, 80, 240] @@ -22,13 +23,13 @@ type Message struct { func NewMessage(ts time.Time, durations []time.Duration) Message { msg := Message{ Timestamp: ts.UnixNano() / time.Millisecond.Nanoseconds(), - Duration: make([]uint8, len(durations)), + Duration: make([]uint8, len(durations)), } for i, dns := range durations { ms := dns.Milliseconds() - if (ms > 255) { + if ms > 255 { ms = 255 - } else if (ms < 0) { + } else if ms < 0 { ms = 0 } msg.Duration[i] = uint8(ms) @@ -42,6 +43,9 @@ func (m Message) MarshalBinary() ([]byte, error) { if err := binary.Write(&w, binary.BigEndian, m.Timestamp); err != nil { return nil, err } + if err := binary.Write(&w, binary.BigEndian, m.Clients); err != nil { + return nil, err + } if err := binary.Write(&w, binary.BigEndian, m.Duration); err != nil { return nil, err } @@ -54,6 +58,9 @@ func (m *Message) UnmarshalBinary(data []byte) error { if err := binary.Read(r, binary.BigEndian, &m.Timestamp); err != nil { return err } + if err := binary.Read(r, binary.BigEndian, &m.Clients); err != nil { + return err + } dlen := r.Len() m.Duration = make([]uint8, dlen) if err := binary.Read(r, binary.BigEndian, &m.Duration); err != nil { @@ -66,16 +73,16 @@ func (m Message) Equal(m2 Message) bool { if m.Timestamp != m2.Timestamp { return false } - + if len(m.Duration) != len(m2.Duration) { return false } - + for i := range m.Duration { if m.Duration[i] != m2.Duration[i] { return false } } - + return true } diff --git a/cmd/vail/message_test.go b/cmd/vail/message_test.go index 94354c6..70ff0ec 100644 --- a/cmd/vail/message_test.go +++ b/cmd/vail/message_test.go @@ -6,9 +6,9 @@ import ( "time" ) -func TestMessage(t *testing.T) { - m := Message{0x1122334455, []uint8{0xaa, 0xbb, 0xcc}} - m2 := Message{12, []uint8{1}} +func TestMessageStruct(t *testing.T) { + m := Message{0x1122334455, 0, []uint8{0xaa, 0xbb, 0xcc}} + m2 := Message{12, 0, []uint8{1}} if !m.Equal(m) { t.Error("Equal messages did not compare equal") @@ -16,7 +16,7 @@ func TestMessage(t *testing.T) { if m.Equal(m2) { t.Error("Unequal messages compared equal") } - if m.Equal(Message{m.Timestamp, []uint8{1, 2, 3}}) { + if m.Equal(Message{m.Timestamp, 0, []uint8{1, 2, 3}}) { t.Error("Messages with different payloads compared equal") } @@ -24,7 +24,7 @@ func TestMessage(t *testing.T) { if err != nil { t.Error(err) } - if !bytes.Equal(bm, []byte("\x00\x00\x00\x11\x22\x33\x44\x55\xaa\xbb\xcc")) { + if !bytes.Equal(bm, []byte("\x00\x00\x00\x11\x22\x33\x44\x55\x00\x00\xaa\xbb\xcc")) { t.Error("Encoded wrong:", bm) } diff --git a/cmd/vail/repeater.go b/cmd/vail/repeater.go index c0366f5..1fbd0b9 100644 --- a/cmd/vail/repeater.go +++ b/cmd/vail/repeater.go @@ -2,6 +2,7 @@ package main import ( "io" + "log" ) // A Repeater is just a list of Writers. @@ -29,9 +30,14 @@ func (r *Repeater) Part(w io.Writer) { } } -func (r *Repeater) Send(p []byte) { +func (r *Repeater) Send(m Message) { + m.Clients = uint16(r.Listeners()) + buf, err := m.MarshalBinary() + if err != nil { + log.Fatal(err) + } for _, s := range r.writers { - s.Write(p) + s.Write(buf) } } diff --git a/cmd/vail/repeater_test.go b/cmd/vail/repeater_test.go index c09d5f0..6d52b0c 100644 --- a/cmd/vail/repeater_test.go +++ b/cmd/vail/repeater_test.go @@ -5,35 +5,54 @@ import ( "testing" ) +type TestMessage struct { + Message +} + +func (m TestMessage) bytes() []byte { + b, _ := m.MarshalBinary() + return b +} + func TestRepeater(t *testing.T) { r := NewRepeater() + m := TestMessage{Message{1, 3, []uint8{3, 4}}} buf1 := bytes.NewBufferString("buf1") + buf1Expect := bytes.NewBufferString("buf1") r.Join(buf1) if r.Listeners() != 1 { t.Error("Joining did nothing") } - r.Send([]byte("moo")) - if buf1.String() != "buf1moo" { + r.Send(m.Message) + m.Clients = 1 + buf1Expect.Write(m.bytes()) + if buf1.String() != buf1Expect.String() { t.Error("Client 1 not repeating", buf1) } buf2 := bytes.NewBufferString("buf2") + buf2Expect := bytes.NewBufferString("buf2") r.Join(buf2) - r.Send([]byte("bar")) - if buf1.String() != "buf1moobar" { - t.Error("Client 1 not repeating", buf1) + r.Send(m.Message) + m.Clients = 2 + buf1Expect.Write(m.bytes()) + buf2Expect.Write(m.bytes()) + if buf1.String() != buf1Expect.String() { + t.Errorf("Client 1 not repeating %#v %#v", buf1, buf1Expect) } - if buf2.String() != "buf2bar" { + if buf2.String() != buf2Expect.String() { t.Error("Client 2 not repeating", buf2) } r.Part(buf1) - r.Send([]byte("baz")) - if buf1.String() != "buf1moobar" { + r.Send(m.Message) + m.Clients = 1 + buf2Expect.Write(m.bytes()) + if buf1.String() != buf1Expect.String() { t.Error("Client 1 still getting data after part", buf1) } - if buf2.String() != "buf2barbaz" { + if buf2.String() != buf2Expect.String() { t.Error("Client 2 not getting data after part", buf2) } }