diff --git a/cmd/vail/book.go b/cmd/vail/book.go index f8573b9..95b9606 100644 --- a/cmd/vail/book.go +++ b/cmd/vail/book.go @@ -1,7 +1,6 @@ package main import ( - "io" "log" ) @@ -33,25 +32,25 @@ const ( type bookEvent struct { eventType bookEventType name string - w io.Writer + sender MessageSender m Message } // Join adds a writer to a named repeater -func (b Book) Join(name string, w io.Writer) { +func (b Book) Join(name string, sender MessageSender) { b.events <- bookEvent{ eventType: joinEvent, name: name, - w: w, + sender: sender, } } // Part removes a writer from a named repeater -func (b Book) Part(name string, w io.Writer) { +func (b Book) Part(name string, sender MessageSender) { b.events <- bookEvent{ eventType: partEvent, name: name, - w: w, + sender: sender, } } @@ -81,13 +80,13 @@ func (b Book) loop() { repeater = b.makeRepeater() b.entries[event.name] = repeater } - repeater.Join(event.w) + repeater.Join(event.sender) case partEvent: if !ok { log.Println("WARN: Parting an empty channel:", event.name) break } - repeater.Part(event.w) + repeater.Part(event.sender) if repeater.Listeners() == 0 { delete(b.entries, event.name) } diff --git a/cmd/vail/main.go b/cmd/vail/main.go index 65194ee..4d679b3 100644 --- a/cmd/vail/main.go +++ b/cmd/vail/main.go @@ -27,31 +27,41 @@ func (WallClock) Now() time.Time { return time.Now() } +// VailWebSocketConnection reads and writes Message structs +type VailWebSocketConnection struct { + *websocket.Conn +} + +func (c *VailWebSocketConnection) Receive() (Message, error) { + var m Message + err := websocket.JSON.Receive(c.Conn, &m) + return m, err +} + +func (c *VailWebSocketConnection) Send(m Message) error { + return websocket.JSON.Send(c.Conn, m) +} + type Client struct { repeaterName string } func (c Client) Handle(ws *websocket.Conn) { + sock := &VailWebSocketConnection{ws} nowMilli := time.Now().UnixMilli() ws.MaxPayloadBytes = 50 - book.Join(c.repeaterName, ws) - defer book.Part(c.repeaterName, ws) + book.Join(c.repeaterName, sock) + defer book.Part(c.repeaterName, sock) for { - buf := make([]byte, ws.MaxPayloadBytes) - - if n, err := ws.Read(buf); err != nil { + m, err := sock.Receive() + if err != nil { break - } else { - buf = buf[:n] } - // Decode into a Message - var m Message - if err := m.UnmarshalBinary(buf); err != nil { - fmt.Fprintln(ws, err) - ws.Close() - return + // If it's empty, skip it + if len(m.Duration) == 0 { + continue } // If it's wildly out of time, reject it diff --git a/cmd/vail/message.go b/cmd/vail/message.go index e35e7be..0b448b1 100644 --- a/cmd/vail/message.go +++ b/cmd/vail/message.go @@ -3,9 +3,26 @@ package main import ( "bytes" "encoding/binary" + "fmt" "time" ) +// MessageSender can send Messages +type MessageSender interface { + Send(m Message) error +} + +// MessageReceiver can receive Messages +type MessageReceiver interface { + Receive() (Message, error) +} + +// MessageSocket can send and receive Messages +type MessageSocket interface { + MessageSender + MessageReceiver +} + // VailMessage is a single Vail message. type Message struct { // Timestamp of this message. Milliseconds since epoch. @@ -69,6 +86,24 @@ func (m *Message) UnmarshalBinary(data []byte) error { return nil } +func (m Message) MarshalJSON() ([]byte, error) { + buf := new(bytes.Buffer) + fmt.Fprint(buf, "{") + fmt.Fprintf(buf, "\"Timestamp\":%d,", m.Timestamp) + fmt.Fprintf(buf, "\"Clients\":%d,", m.Clients) + fmt.Fprint(buf, "\"Duration\":[") + for i := 0; i < len(m.Duration); i++ { + fmt.Fprint(buf, m.Duration[i]) + if i <= len(m.Duration)-1 { + fmt.Fprint(buf, ",") + } + } + fmt.Fprint(buf) + fmt.Fprint(buf, "]") + fmt.Fprint(buf, "}") + return buf.Bytes(), nil +} + func (m Message) Equal(m2 Message) bool { if m.Timestamp != m2.Timestamp { return false diff --git a/cmd/vail/repeater.go b/cmd/vail/repeater.go index a2d00b8..34fad55 100644 --- a/cmd/vail/repeater.go +++ b/cmd/vail/repeater.go @@ -1,38 +1,36 @@ package main import ( - "io" - "log" "time" ) -// A Repeater is just a list of Writers. +// A Repeater is just a list of senders. type Repeater struct { clock Clock - writers []io.Writer + senders []MessageSender } // NewRepeater returns a newly-created repeater func NewRepeater() *Repeater { return &Repeater{ clock: WallClock{}, - writers: make([]io.Writer, 0, 20), + senders: make([]MessageSender, 0, 20), } } // Join joins a writer to this repeater -func (r *Repeater) Join(w io.Writer) { - r.writers = append(r.writers, w) +func (r *Repeater) Join(sender MessageSender) { + r.senders = append(r.senders, sender) r.SendMessage() } // Part removes a writer from this repeater -func (r *Repeater) Part(w io.Writer) { - for i, s := range r.writers { - if s == w { - nsubs := len(r.writers) - r.writers[i] = r.writers[nsubs-1] - r.writers = r.writers[:nsubs-1] +func (r *Repeater) Part(sender MessageSender) { + for i, s := range r.senders { + if s == sender { + nsubs := len(r.senders) + r.senders[i] = r.senders[nsubs-1] + r.senders = r.senders[:nsubs-1] } } r.SendMessage() @@ -41,12 +39,8 @@ func (r *Repeater) Part(w io.Writer) { // Send send a message to all connected clients func (r *Repeater) Send(m Message) { m.Clients = uint16(r.Listeners()) - buf, err := m.MarshalBinary() - if err != nil { - log.Fatal(err) - } - for _, s := range r.writers { - s.Write(buf) + for _, s := range r.senders { + s.Send(m) } } @@ -58,5 +52,5 @@ func (r *Repeater) SendMessage(durations ...time.Duration) { // Listeners returns the number of connected clients func (r *Repeater) Listeners() int { - return len(r.writers) + return len(r.senders) } diff --git a/cmd/vail/repeater_test.go b/cmd/vail/repeater_test.go index 66d2a32..16136fc 100644 --- a/cmd/vail/repeater_test.go +++ b/cmd/vail/repeater_test.go @@ -1,8 +1,6 @@ package main import ( - "bytes" - "io" "testing" "time" ) @@ -14,35 +12,46 @@ func (f FakeClock) Now() time.Time { } type TestingClient struct { - bytes.Buffer - expected bytes.Buffer - repeater *Repeater + buf []Message + expected []Message t *testing.T } func NewTestingClient(t *testing.T) *TestingClient { return &TestingClient{ - Buffer: bytes.Buffer{}, - expected: bytes.Buffer{}, - t: t, + t: t, } } +func (tc *TestingClient) Send(m Message) error { + tc.buf = append(tc.buf, m) + return nil +} + +func (tc *TestingClient) Len() int { + return len(tc.buf) +} + func (tc *TestingClient) Expect(clients uint16, payload ...uint8) { m := Message{0, clients, payload} - buf, _ := m.MarshalBinary() - tc.expected.Write(buf) - if tc.String() != tc.expected.String() { - tc.t.Errorf("Client buffer mismatch. Wanted %#v, got %#v", tc.expected.String(), tc.String()) + tc.expected = append(tc.expected, m) + if len(tc.buf) != len(tc.expected) { + tc.t.Errorf("Client buffer mismatch. Wanted length %d, got length %d", len(tc.expected), len(tc.buf)) } - tc.Reset() - tc.expected.Reset() + for i := 0; i < len(tc.buf); i++ { + if !tc.buf[i].Equal(tc.expected[i]) { + tc.t.Errorf("Client buffer mismatch at entry %d. Wanted %#v, got %#v", i, tc.expected[i], tc.buf[i]) + } + } + + tc.buf = []Message{} + tc.expected = []Message{} } func NewTestingRepeater() *Repeater { return &Repeater{ clock: FakeClock{}, - writers: make([]io.Writer, 0, 2), + senders: make([]MessageSender, 0, 2), } } diff --git a/static/repeaters.mjs b/static/repeaters.mjs index ac6bfeb..3717b2e 100644 --- a/static/repeaters.mjs +++ b/static/repeaters.mjs @@ -4,6 +4,25 @@ const Millisecond = 1 const Second = 1000 * Millisecond const Minute = 60 * Second +/** + * Compare two messages + * + * @param {Object} m1 First message + * @param {Object} m2 Second message + * @returns {Boolean} true if messages are equal + */ +function MessageEqual(m1, m2) { + if ((m1.Timestamp != m2.Timestamp) || (m1.Duration.length != m2.Duration.length)) { + return false + } + for (let i=0; i < m1.Duration.length; i++) { + if (m1.Duration[i] != m2.Duration[i]) { + return false + } + } + return true +} + export class Vail { constructor(rx, name) { this.rx = rx @@ -28,14 +47,13 @@ export class Vail { this.clockOffset = 0 this.socket = new WebSocket(this.wsUrl) this.socket.addEventListener("message", e => this.wsMessage(e)) - this.socket.addEventListener("close", () => this.reopen()) - } - - stats() { - return { - averageLag: this.lagDurations.reduce((a,b) => (a+b), 0) / this.lagDurations.length, - clockOffset: this.clockOffset, - } + this.socket.addEventListener( + "close", + () => { + console.info("Repeater connection dropped.") + setTimeout(() => this.reopen(), 5*Second) + } + ) } wsMessage(event) { @@ -46,48 +64,51 @@ export class Vail { msg = JSON.parse(jmsg) } catch (err) { - console.error(err, jmsg) + console.error(jmsg) return } + let stats = { + averageLag: this.lagDurations.reduce((a,b) => (a+b), 0) / this.lagDurations.length, + clockOffset: this.clockOffset, + clients: msg.Clients, + } - let beginTxTime = msg[0] - let durations = msg.slice(1) - - // Why is this happening? - if (beginTxTime == 0) { + // XXX: Why is this happening? + if (msg.Timestamp == 0) { return } - let sent = this.sent.filter(e => e != jmsg) + let sent = this.sent.filter(m => !MessageEqual(msg, m)) if (sent.length < this.sent.length) { // We're getting our own message back, which tells us our lag. // We shouldn't emit a tone, though. - let totalDuration = durations.reduce((a, b) => a + b) + let totalDuration = msg.Duration.reduce((a, b) => a + b) this.sent = sent - this.lagDurations.unshift(now - this.clockOffset - beginTxTime - totalDuration) + this.lagDurations.unshift(now - this.clockOffset - msg.Timestamp - totalDuration) this.lagDurations.splice(20, 2) - this.rx(0, 0, this.stats()) + this.rx(0, 0, stats) return } - // The very first packet is the server telling us the current time - if (durations.length == 0) { + // Packets with 0 length tell us what time the server thinks it is, + // and how many clients are connected + if (msg.Duration.length == 0) { if (this.clockOffset == 0) { - this.clockOffset = now - beginTxTime - this.rx(0, 0, this.stats()) + this.clockOffset = now - msg.Timestamp + this.rx(0, 0, stats) } return } // Adjust playback time to clock offset - let adjustedTxTime = beginTxTime + this.clockOffset + let adjustedTxTime = msg.Timestamp + this.clockOffset // Every second value is a silence duration let tx = true - for (let duration of durations) { + for (let duration of msg.Duration) { duration = Number(duration) if (tx && (duration > 0)) { - this.rx(adjustedTxTime, duration, this.stats()) + this.rx(adjustedTxTime, duration, stats) } adjustedTxTime = Number(adjustedTxTime) + duration tx = !tx @@ -97,13 +118,17 @@ export class Vail { /** * Send a transmission * - * @param {number} time When to play this transmission + * @param {number} timestamp When to play this transmission * @param {number} duration How long the transmission is * @param {boolean} squelch True to mute this tone when we get it back from the repeater */ - Transmit(time, duration, squelch=true) { - let msg = [time - this.clockOffset, duration] + Transmit(timestamp, duration, squelch=true) { + let msg = { + Timestamp: timestamp, + Duration: [duration], + } let jmsg = JSON.stringify(msg) + if (this.socket.readyState != 1) { // If we aren't connected, complain. console.error("Not connected, dropping", jmsg) @@ -111,7 +136,7 @@ export class Vail { } this.socket.send(jmsg) if (squelch) { - this.sent.push(jmsg) + this.sent.push(msg) } } diff --git a/static/vail.mjs b/static/vail.mjs index 470c707..6473e4a 100644 --- a/static/vail.mjs +++ b/static/vail.mjs @@ -53,7 +53,7 @@ class VailClient { e.addEventListener("click", e => this.maximize(e)) } for (let e of document.querySelectorAll("#ck")) { - e.addEventListener("click", e => this.test()) + e.addEventListener("click", e => this.check()) } for (let e of document.querySelectorAll("#reset")) { e.addEventListener("click", e => this.reset()) @@ -416,7 +416,7 @@ class VailClient { /** * Send "CK" to server, and don't squelch the echo */ - test() { + check() { let when = Date.now() let dit = this.ditDuration let dah = dit * 3