Neale Pickett
·
2015-02-22
network.go
1package main
2
3import (
4 "bufio"
5 "fmt"
6 "github.com/go-fsnotify/fsnotify"
7 "io/ioutil"
8 "os"
9 "path"
10 "strconv"
11 "strings"
12 "time"
13)
14
15const eventIdSep = "/"
16
17type Network struct {
18 running bool
19
20 Name string
21 currentLog string
22 lineno int64
23
24 basePath string
25 seq int
26}
27
28func NewNetwork(basePath string) (*Network) {
29 return &Network{
30 running: true,
31 Name: path.Base(basePath),
32 basePath: basePath,
33 }
34}
35
36func (nw *Network) Close() {
37 nw.running = false
38}
39
40func (nw *Network) ReadLastEventId(lastEventId string) {
41 for _, eventId := range strings.Split(lastEventId, " ") {
42 parts := strings.Split(eventId, eventIdSep)
43 if len(parts) != 3 {
44 continue
45 }
46
47 if parts[0] != nw.Name {
48 continue
49 }
50 nw.currentLog = parts[1]
51 nw.lineno, _ = strconv.ParseInt(parts[2], 10, 64)
52 return
53 }
54}
55
56func (nw *Network) LastEventId() string {
57 parts := []string{nw.Name, nw.currentLog, strconv.FormatInt(nw.lineno, 10)}
58 return strings.Join(parts, eventIdSep)
59}
60
61func (nw *Network) errmsg(err error) []string {
62 s := fmt.Sprintf("ERROR: %s", err.Error())
63 return []string{s}
64}
65
66func (nw *Network) Tail(out chan<- []string) {
67 if nw.currentLog == "" {
68 var err error
69
70 currentfn := path.Join(nw.basePath, "log", "current")
71 nw.currentLog, err = os.Readlink(currentfn)
72 if err != nil {
73 out <- nw.errmsg(err)
74 return
75 }
76 }
77
78 filepath := path.Join(nw.basePath, "log", nw.currentLog)
79 f, err := os.Open(filepath)
80 if err != nil {
81 out <- nw.errmsg(err)
82 return
83 }
84 defer f.Close()
85
86 watcher, err := fsnotify.NewWatcher()
87 if err != nil {
88 out <- nw.errmsg(err)
89 return
90 }
91 defer watcher.Close()
92
93 watcher.Add(filepath)
94 lineno := int64(0)
95
96 // XXX: some way to stop this?
97 for nw.running {
98 bf := bufio.NewScanner(f)
99 lines := make([]string, 0)
100 for bf.Scan() {
101 lineno += 1
102 if lineno <= nw.lineno {
103 continue
104 } else {
105 nw.lineno = lineno
106 }
107
108 t := bf.Text()
109
110 // XXX: Consider omitting PING and PONG
111 parts := strings.Split(t, " ")
112 if (len(parts) >= 4) && (parts[2] == "NEXTLOG") {
113 watcher.Remove(filepath)
114 filename := parts[3]
115 filepath = path.Join(nw.basePath, "log", filename)
116 f.Close()
117 f, err = os.Open(filepath)
118 if err != nil {
119 out <- nw.errmsg(err)
120 return
121 }
122 watcher.Add(filepath)
123 lineno = 0
124 nw.lineno = 0
125 }
126 lines = append(lines, t)
127 }
128 if len(lines) > 0 {
129 out <- lines
130 }
131
132 select {
133 case _ = <-watcher.Events:
134 // Somethin' happened!
135 case err := <-watcher.Errors:
136 out <- nw.errmsg(err)
137 return
138 }
139 }
140}
141
142func (nw *Network) Write(data []byte) {
143 epoch := time.Now().Unix()
144 pid := os.Getpid()
145 filename := fmt.Sprintf("%d-%d-%d.txt", epoch, pid, nw.seq)
146
147 filepath := path.Join(nw.basePath, "outq", filename)
148 ioutil.WriteFile(filepath, data, 0750)
149 nw.seq += 1
150}
151
152
153func Networks(basePath string) (found []*Network) {
154
155 dir, err := os.Open(basePath)
156 if err != nil {
157 return
158 }
159 defer dir.Close()
160
161
162 entities, _ := dir.Readdirnames(0)
163 for _, fn := range entities {
164 netdir := path.Join(basePath, fn)
165
166 _, err = os.Stat(path.Join(netdir, "nick"))
167 if err != nil {
168 continue
169 }
170
171 nw := NewNetwork(netdir)
172 found = append(found, nw)
173 }
174
175 return
176}
177