Tailing properly, not yet resuming

This commit is contained in:
Neale Pickett 2015-02-21 21:45:56 -07:00
parent 40a3d1e755
commit c9a90d8ae0
3 changed files with 63 additions and 25 deletions

View File

@ -12,6 +12,8 @@ import (
) )
type Network struct { type Network struct {
running bool
name string name string
currentLog string currentLog string
lineno int64 lineno int64
@ -20,19 +22,20 @@ type Network struct {
seq int seq int
} }
type Update struct {
Lines []string
LastEventId string
}
func NewNetwork(basePath string) (*Network) { func NewNetwork(basePath string) (*Network) {
return &Network{ return &Network{
running: true,
name: path.Base(basePath),
basePath: basePath, basePath: basePath,
} }
} }
func (nw *Network) Close() {
nw.running = false
}
func (nw *Network) LastEventId() string { func (nw *Network) LastEventId() string {
return fmt.Sprintf("%s:%s:%d", nw.name, nw.currentLog, nw.lineno) return fmt.Sprintf("%s/%s/%d", nw.name, nw.currentLog, nw.lineno)
} }
func (nw *Network) SetPosition(filename string, lineno int64) { func (nw *Network) SetPosition(filename string, lineno int64) {
@ -40,40 +43,56 @@ func (nw *Network) SetPosition(filename string, lineno int64) {
nw.lineno = lineno nw.lineno = lineno
} }
func (nw *Network) Tail(out chan<- *Update) error { func (nw *Network) errmsg(err error) []string {
s := fmt.Sprintf("ERROR: %s", err.Error())
return []string{s}
}
func (nw *Network) Tail(out chan<- []string) {
if nw.currentLog == "" { if nw.currentLog == "" {
var err error var err error
currentfn := path.Join(nw.basePath, "log", "current") currentfn := path.Join(nw.basePath, "log", "current")
nw.currentLog, err = os.Readlink(currentfn) nw.currentLog, err = os.Readlink(currentfn)
if err != nil { if err != nil {
return err out <- nw.errmsg(err)
return
} }
} }
filepath := path.Join(nw.basePath, "log", nw.currentLog) filepath := path.Join(nw.basePath, "log", nw.currentLog)
f, err := os.Open(filepath) f, err := os.Open(filepath)
if err != nil { if err != nil {
return err out <- nw.errmsg(err)
return
} }
defer f.Close() defer f.Close()
watcher, err := fsnotify.NewWatcher() watcher, err := fsnotify.NewWatcher()
if err != nil { if err != nil {
return err out <- nw.errmsg(err)
return
} }
defer watcher.Close() defer watcher.Close()
watcher.Add(filepath) watcher.Add(filepath)
lineno := int64(0)
// XXX: some way to stop this? // XXX: some way to stop this?
for { for nw.running {
lines := make([]string, 0)
bf := bufio.NewScanner(f) bf := bufio.NewScanner(f)
lines := make([]string, 0)
for bf.Scan() { for bf.Scan() {
t := bf.Text() lineno += 1
nw.lineno += 1 if lineno <= nw.lineno {
continue
} else {
nw.lineno = lineno
}
t := bf.Text()
// XXX: Consider omitting PING and PONG
parts := strings.Split(t, " ") parts := strings.Split(t, " ")
if (len(parts) >= 4) && (parts[2] == "NEXTLOG") { if (len(parts) >= 4) && (parts[2] == "NEXTLOG") {
watcher.Remove(filepath) watcher.Remove(filepath)
@ -82,30 +101,27 @@ func (nw *Network) Tail(out chan<- *Update) error {
f.Close() f.Close()
f, err = os.Open(filepath) f, err = os.Open(filepath)
if err != nil { if err != nil {
return err out <- nw.errmsg(err)
return
} }
watcher.Add(filepath) watcher.Add(filepath)
lineno = 0
nw.lineno = 0 nw.lineno = 0
} }
lines = append(lines, t) lines = append(lines, t)
} }
if len(lines) > 0 { if len(lines) > 0 {
update := Update{ out <- lines
Lines: lines,
LastEventId: nw.LastEventId(),
}
out <- &update
} }
select { select {
case _ = <-watcher.Events: case _ = <-watcher.Events:
// Somethin' happened! // Somethin' happened!
case err := <-watcher.Errors: case err := <-watcher.Errors:
return err out <- nw.errmsg(err)
return
} }
} }
return nil
} }
func (nw *Network) Write(data []byte) { func (nw *Network) Write(data []byte) {

Binary file not shown.

View File

@ -35,10 +35,32 @@ func (h Handler) handleCommand(cfg *Config, w http.ResponseWriter, r *http.Reque
} }
func (h Handler) handleTail(cfg *Config, w http.ResponseWriter, r *http.Request) { func (h Handler) handleTail(cfg *Config, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain") w.Header().Set("Content-Type", "text/event-stream")
nws := Networks(cfg.BaseDir) nws := Networks(cfg.BaseDir)
updates := make(chan []string, 100)
for _, nw := range nws { for _, nw := range nws {
fmt.Fprintf(w, "%v\n", nw) go nw.Tail(updates)
defer nw.Close()
}
for lines := range updates {
for _, line := range lines {
fmt.Fprintf(w, "data: %s\n", line)
}
ids := make([]string, 0)
for _, nw := range nws {
ids = append(ids, nw.LastEventId())
}
idstring := strings.Join(ids, " ")
_, err := fmt.Fprintf(w, "id: %s\n\n", idstring)
if err != nil {
// Can't write anymore, guess they hung up.
return
}
w.(http.Flusher).Flush()
} }
} }