More work

This commit is contained in:
Neale Pickett 2022-05-15 21:12:36 -06:00
parent d6e6a268a3
commit 4ef1ff7517
7 changed files with 159 additions and 87 deletions

View File

@ -1,7 +1,6 @@
package main package main
import ( import (
"io"
"log" "log"
) )
@ -33,25 +32,25 @@ const (
type bookEvent struct { type bookEvent struct {
eventType bookEventType eventType bookEventType
name string name string
w io.Writer sender MessageSender
m Message m Message
} }
// Join adds a writer to a named repeater // Join adds a writer to a named repeater
func (b Book) Join(name string, w io.Writer) { func (b Book) Join(name string, sender MessageSender) {
b.events <- bookEvent{ b.events <- bookEvent{
eventType: joinEvent, eventType: joinEvent,
name: name, name: name,
w: w, sender: sender,
} }
} }
// Part removes a writer from a named repeater // Part removes a writer from a named repeater
func (b Book) Part(name string, w io.Writer) { func (b Book) Part(name string, sender MessageSender) {
b.events <- bookEvent{ b.events <- bookEvent{
eventType: partEvent, eventType: partEvent,
name: name, name: name,
w: w, sender: sender,
} }
} }
@ -81,13 +80,13 @@ func (b Book) loop() {
repeater = b.makeRepeater() repeater = b.makeRepeater()
b.entries[event.name] = repeater b.entries[event.name] = repeater
} }
repeater.Join(event.w) repeater.Join(event.sender)
case partEvent: case partEvent:
if !ok { if !ok {
log.Println("WARN: Parting an empty channel:", event.name) log.Println("WARN: Parting an empty channel:", event.name)
break break
} }
repeater.Part(event.w) repeater.Part(event.sender)
if repeater.Listeners() == 0 { if repeater.Listeners() == 0 {
delete(b.entries, event.name) delete(b.entries, event.name)
} }

View File

@ -27,31 +27,41 @@ func (WallClock) Now() time.Time {
return time.Now() return time.Now()
} }
// VailWebSocketConnection reads and writes Message structs
type VailWebSocketConnection struct {
*websocket.Conn
}
func (c *VailWebSocketConnection) Receive() (Message, error) {
var m Message
err := websocket.JSON.Receive(c.Conn, &m)
return m, err
}
func (c *VailWebSocketConnection) Send(m Message) error {
return websocket.JSON.Send(c.Conn, m)
}
type Client struct { type Client struct {
repeaterName string repeaterName string
} }
func (c Client) Handle(ws *websocket.Conn) { func (c Client) Handle(ws *websocket.Conn) {
sock := &VailWebSocketConnection{ws}
nowMilli := time.Now().UnixMilli() nowMilli := time.Now().UnixMilli()
ws.MaxPayloadBytes = 50 ws.MaxPayloadBytes = 50
book.Join(c.repeaterName, ws) book.Join(c.repeaterName, sock)
defer book.Part(c.repeaterName, ws) defer book.Part(c.repeaterName, sock)
for { for {
buf := make([]byte, ws.MaxPayloadBytes) m, err := sock.Receive()
if err != nil {
if n, err := ws.Read(buf); err != nil {
break break
} else {
buf = buf[:n]
} }
// Decode into a Message // If it's empty, skip it
var m Message if len(m.Duration) == 0 {
if err := m.UnmarshalBinary(buf); err != nil { continue
fmt.Fprintln(ws, err)
ws.Close()
return
} }
// If it's wildly out of time, reject it // If it's wildly out of time, reject it

View File

@ -3,9 +3,26 @@ package main
import ( import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt"
"time" "time"
) )
// MessageSender can send Messages
type MessageSender interface {
Send(m Message) error
}
// MessageReceiver can receive Messages
type MessageReceiver interface {
Receive() (Message, error)
}
// MessageSocket can send and receive Messages
type MessageSocket interface {
MessageSender
MessageReceiver
}
// VailMessage is a single Vail message. // VailMessage is a single Vail message.
type Message struct { type Message struct {
// Timestamp of this message. Milliseconds since epoch. // Timestamp of this message. Milliseconds since epoch.
@ -69,6 +86,24 @@ func (m *Message) UnmarshalBinary(data []byte) error {
return nil return nil
} }
func (m Message) MarshalJSON() ([]byte, error) {
buf := new(bytes.Buffer)
fmt.Fprint(buf, "{")
fmt.Fprintf(buf, "\"Timestamp\":%d,", m.Timestamp)
fmt.Fprintf(buf, "\"Clients\":%d,", m.Clients)
fmt.Fprint(buf, "\"Duration\":[")
for i := 0; i < len(m.Duration); i++ {
fmt.Fprint(buf, m.Duration[i])
if i <= len(m.Duration)-1 {
fmt.Fprint(buf, ",")
}
}
fmt.Fprint(buf)
fmt.Fprint(buf, "]")
fmt.Fprint(buf, "}")
return buf.Bytes(), nil
}
func (m Message) Equal(m2 Message) bool { func (m Message) Equal(m2 Message) bool {
if m.Timestamp != m2.Timestamp { if m.Timestamp != m2.Timestamp {
return false return false

View File

@ -1,38 +1,36 @@
package main package main
import ( import (
"io"
"log"
"time" "time"
) )
// A Repeater is just a list of Writers. // A Repeater is just a list of senders.
type Repeater struct { type Repeater struct {
clock Clock clock Clock
writers []io.Writer senders []MessageSender
} }
// NewRepeater returns a newly-created repeater // NewRepeater returns a newly-created repeater
func NewRepeater() *Repeater { func NewRepeater() *Repeater {
return &Repeater{ return &Repeater{
clock: WallClock{}, clock: WallClock{},
writers: make([]io.Writer, 0, 20), senders: make([]MessageSender, 0, 20),
} }
} }
// Join joins a writer to this repeater // Join joins a writer to this repeater
func (r *Repeater) Join(w io.Writer) { func (r *Repeater) Join(sender MessageSender) {
r.writers = append(r.writers, w) r.senders = append(r.senders, sender)
r.SendMessage() r.SendMessage()
} }
// Part removes a writer from this repeater // Part removes a writer from this repeater
func (r *Repeater) Part(w io.Writer) { func (r *Repeater) Part(sender MessageSender) {
for i, s := range r.writers { for i, s := range r.senders {
if s == w { if s == sender {
nsubs := len(r.writers) nsubs := len(r.senders)
r.writers[i] = r.writers[nsubs-1] r.senders[i] = r.senders[nsubs-1]
r.writers = r.writers[:nsubs-1] r.senders = r.senders[:nsubs-1]
} }
} }
r.SendMessage() r.SendMessage()
@ -41,12 +39,8 @@ func (r *Repeater) Part(w io.Writer) {
// Send send a message to all connected clients // Send send a message to all connected clients
func (r *Repeater) Send(m Message) { func (r *Repeater) Send(m Message) {
m.Clients = uint16(r.Listeners()) m.Clients = uint16(r.Listeners())
buf, err := m.MarshalBinary() for _, s := range r.senders {
if err != nil { s.Send(m)
log.Fatal(err)
}
for _, s := range r.writers {
s.Write(buf)
} }
} }
@ -58,5 +52,5 @@ func (r *Repeater) SendMessage(durations ...time.Duration) {
// Listeners returns the number of connected clients // Listeners returns the number of connected clients
func (r *Repeater) Listeners() int { func (r *Repeater) Listeners() int {
return len(r.writers) return len(r.senders)
} }

View File

@ -1,8 +1,6 @@
package main package main
import ( import (
"bytes"
"io"
"testing" "testing"
"time" "time"
) )
@ -14,35 +12,46 @@ func (f FakeClock) Now() time.Time {
} }
type TestingClient struct { type TestingClient struct {
bytes.Buffer buf []Message
expected bytes.Buffer expected []Message
repeater *Repeater
t *testing.T t *testing.T
} }
func NewTestingClient(t *testing.T) *TestingClient { func NewTestingClient(t *testing.T) *TestingClient {
return &TestingClient{ return &TestingClient{
Buffer: bytes.Buffer{}, t: t,
expected: bytes.Buffer{},
t: t,
} }
} }
func (tc *TestingClient) Send(m Message) error {
tc.buf = append(tc.buf, m)
return nil
}
func (tc *TestingClient) Len() int {
return len(tc.buf)
}
func (tc *TestingClient) Expect(clients uint16, payload ...uint8) { func (tc *TestingClient) Expect(clients uint16, payload ...uint8) {
m := Message{0, clients, payload} m := Message{0, clients, payload}
buf, _ := m.MarshalBinary() tc.expected = append(tc.expected, m)
tc.expected.Write(buf) if len(tc.buf) != len(tc.expected) {
if tc.String() != tc.expected.String() { tc.t.Errorf("Client buffer mismatch. Wanted length %d, got length %d", len(tc.expected), len(tc.buf))
tc.t.Errorf("Client buffer mismatch. Wanted %#v, got %#v", tc.expected.String(), tc.String())
} }
tc.Reset() for i := 0; i < len(tc.buf); i++ {
tc.expected.Reset() if !tc.buf[i].Equal(tc.expected[i]) {
tc.t.Errorf("Client buffer mismatch at entry %d. Wanted %#v, got %#v", i, tc.expected[i], tc.buf[i])
}
}
tc.buf = []Message{}
tc.expected = []Message{}
} }
func NewTestingRepeater() *Repeater { func NewTestingRepeater() *Repeater {
return &Repeater{ return &Repeater{
clock: FakeClock{}, clock: FakeClock{},
writers: make([]io.Writer, 0, 2), senders: make([]MessageSender, 0, 2),
} }
} }

View File

@ -4,6 +4,25 @@ const Millisecond = 1
const Second = 1000 * Millisecond const Second = 1000 * Millisecond
const Minute = 60 * Second const Minute = 60 * Second
/**
* Compare two messages
*
* @param {Object} m1 First message
* @param {Object} m2 Second message
* @returns {Boolean} true if messages are equal
*/
function MessageEqual(m1, m2) {
if ((m1.Timestamp != m2.Timestamp) || (m1.Duration.length != m2.Duration.length)) {
return false
}
for (let i=0; i < m1.Duration.length; i++) {
if (m1.Duration[i] != m2.Duration[i]) {
return false
}
}
return true
}
export class Vail { export class Vail {
constructor(rx, name) { constructor(rx, name) {
this.rx = rx this.rx = rx
@ -28,14 +47,13 @@ export class Vail {
this.clockOffset = 0 this.clockOffset = 0
this.socket = new WebSocket(this.wsUrl) this.socket = new WebSocket(this.wsUrl)
this.socket.addEventListener("message", e => this.wsMessage(e)) this.socket.addEventListener("message", e => this.wsMessage(e))
this.socket.addEventListener("close", () => this.reopen()) this.socket.addEventListener(
} "close",
() => {
stats() { console.info("Repeater connection dropped.")
return { setTimeout(() => this.reopen(), 5*Second)
averageLag: this.lagDurations.reduce((a,b) => (a+b), 0) / this.lagDurations.length, }
clockOffset: this.clockOffset, )
}
} }
wsMessage(event) { wsMessage(event) {
@ -46,48 +64,51 @@ export class Vail {
msg = JSON.parse(jmsg) msg = JSON.parse(jmsg)
} }
catch (err) { catch (err) {
console.error(err, jmsg) console.error(jmsg)
return return
} }
let stats = {
averageLag: this.lagDurations.reduce((a,b) => (a+b), 0) / this.lagDurations.length,
clockOffset: this.clockOffset,
clients: msg.Clients,
}
let beginTxTime = msg[0] // XXX: Why is this happening?
let durations = msg.slice(1) if (msg.Timestamp == 0) {
// Why is this happening?
if (beginTxTime == 0) {
return return
} }
let sent = this.sent.filter(e => e != jmsg) let sent = this.sent.filter(m => !MessageEqual(msg, m))
if (sent.length < this.sent.length) { if (sent.length < this.sent.length) {
// We're getting our own message back, which tells us our lag. // We're getting our own message back, which tells us our lag.
// We shouldn't emit a tone, though. // We shouldn't emit a tone, though.
let totalDuration = durations.reduce((a, b) => a + b) let totalDuration = msg.Duration.reduce((a, b) => a + b)
this.sent = sent this.sent = sent
this.lagDurations.unshift(now - this.clockOffset - beginTxTime - totalDuration) this.lagDurations.unshift(now - this.clockOffset - msg.Timestamp - totalDuration)
this.lagDurations.splice(20, 2) this.lagDurations.splice(20, 2)
this.rx(0, 0, this.stats()) this.rx(0, 0, stats)
return return
} }
// The very first packet is the server telling us the current time // Packets with 0 length tell us what time the server thinks it is,
if (durations.length == 0) { // and how many clients are connected
if (msg.Duration.length == 0) {
if (this.clockOffset == 0) { if (this.clockOffset == 0) {
this.clockOffset = now - beginTxTime this.clockOffset = now - msg.Timestamp
this.rx(0, 0, this.stats()) this.rx(0, 0, stats)
} }
return return
} }
// Adjust playback time to clock offset // Adjust playback time to clock offset
let adjustedTxTime = beginTxTime + this.clockOffset let adjustedTxTime = msg.Timestamp + this.clockOffset
// Every second value is a silence duration // Every second value is a silence duration
let tx = true let tx = true
for (let duration of durations) { for (let duration of msg.Duration) {
duration = Number(duration) duration = Number(duration)
if (tx && (duration > 0)) { if (tx && (duration > 0)) {
this.rx(adjustedTxTime, duration, this.stats()) this.rx(adjustedTxTime, duration, stats)
} }
adjustedTxTime = Number(adjustedTxTime) + duration adjustedTxTime = Number(adjustedTxTime) + duration
tx = !tx tx = !tx
@ -97,13 +118,17 @@ export class Vail {
/** /**
* Send a transmission * Send a transmission
* *
* @param {number} time When to play this transmission * @param {number} timestamp When to play this transmission
* @param {number} duration How long the transmission is * @param {number} duration How long the transmission is
* @param {boolean} squelch True to mute this tone when we get it back from the repeater * @param {boolean} squelch True to mute this tone when we get it back from the repeater
*/ */
Transmit(time, duration, squelch=true) { Transmit(timestamp, duration, squelch=true) {
let msg = [time - this.clockOffset, duration] let msg = {
Timestamp: timestamp,
Duration: [duration],
}
let jmsg = JSON.stringify(msg) let jmsg = JSON.stringify(msg)
if (this.socket.readyState != 1) { if (this.socket.readyState != 1) {
// If we aren't connected, complain. // If we aren't connected, complain.
console.error("Not connected, dropping", jmsg) console.error("Not connected, dropping", jmsg)
@ -111,7 +136,7 @@ export class Vail {
} }
this.socket.send(jmsg) this.socket.send(jmsg)
if (squelch) { if (squelch) {
this.sent.push(jmsg) this.sent.push(msg)
} }
} }

View File

@ -53,7 +53,7 @@ class VailClient {
e.addEventListener("click", e => this.maximize(e)) e.addEventListener("click", e => this.maximize(e))
} }
for (let e of document.querySelectorAll("#ck")) { for (let e of document.querySelectorAll("#ck")) {
e.addEventListener("click", e => this.test()) e.addEventListener("click", e => this.check())
} }
for (let e of document.querySelectorAll("#reset")) { for (let e of document.querySelectorAll("#reset")) {
e.addEventListener("click", e => this.reset()) e.addEventListener("click", e => this.reset())
@ -416,7 +416,7 @@ class VailClient {
/** /**
* Send "CK" to server, and don't squelch the echo * Send "CK" to server, and don't squelch the echo
*/ */
test() { check() {
let when = Date.now() let when = Date.now()
let dit = this.ditDuration let dit = this.ditDuration
let dah = dit * 3 let dah = dit * 3