Neale Pickett
·
2016-01-18
network.go
1package main
2
3import (
4 "bufio"
5 "fmt"
6 "github.com/go-fsnotify/fsnotify"
7 "io/ioutil"
8 "os"
9 "path"
10 "strconv"
11 "strings"
12 "time"
13)
14
15const eventIdSep = "/"
16
17type Network struct {
18 running bool
19
20 Name string
21 currentLog string
22 lineno int64
23
24 basePath string
25 seq int
26}
27
28func NewNetwork(basePath string) (*Network) {
29 return &Network{
30 running: true,
31 Name: path.Base(basePath),
32 basePath: basePath,
33 }
34}
35
36func (nw *Network) Close() {
37 nw.running = false
38}
39
40func (nw *Network) ReadLastEventId(lastEventId string) {
41 for _, eventId := range strings.Split(lastEventId, " ") {
42 parts := strings.Split(eventId, eventIdSep)
43 if len(parts) != 3 {
44 continue
45 }
46
47 if parts[0] != nw.Name {
48 continue
49 }
50 nw.currentLog = parts[1]
51 nw.lineno, _ = strconv.ParseInt(parts[2], 10, 64)
52 return
53 }
54}
55
56func (nw *Network) LastEventId() string {
57 parts := []string{nw.Name, nw.currentLog, strconv.FormatInt(nw.lineno, 10)}
58 return strings.Join(parts, eventIdSep)
59}
60
61func (nw *Network) errmsg(err error) string {
62 return fmt.Sprintf("ERROR: %s", err.Error())
63}
64
65func (nw *Network) Tail(out chan<- string) {
66 if nw.currentLog == "" {
67 var err error
68
69 currentfn := path.Join(nw.basePath, "log", "current")
70 nw.currentLog, err = os.Readlink(currentfn)
71 if err != nil {
72 out <- nw.errmsg(err)
73 return
74 }
75 }
76
77 filepath := path.Join(nw.basePath, "log", nw.currentLog)
78 f, err := os.Open(filepath)
79 if err != nil {
80 out <- nw.errmsg(err)
81 return
82 }
83 defer f.Close()
84
85 watcher, err := fsnotify.NewWatcher()
86 if err != nil {
87 out <- nw.errmsg(err)
88 return
89 }
90 defer watcher.Close()
91
92 watcher.Add(filepath)
93 lineno := int64(0)
94
95 // XXX: some way to stop this?
96 for nw.running {
97 bf := bufio.NewScanner(f)
98 for bf.Scan() {
99 lineno += 1
100 if lineno <= nw.lineno {
101 continue
102 } else {
103 nw.lineno = lineno
104 }
105
106 t := bf.Text()
107
108 parts := strings.Split(t, " ")
109 if (len(parts) >= 3) && (parts[1] == "NEXTLOG") {
110 watcher.Remove(filepath)
111 filename := parts[2]
112 filepath = path.Join(nw.basePath, "log", filename)
113 f.Close()
114 f, err = os.Open(filepath)
115 if err != nil {
116 out <- nw.errmsg(err)
117 return
118 }
119 watcher.Add(filepath)
120 lineno = 0
121 nw.lineno = 0
122 }
123 out <- t
124 }
125
126 select {
127 case _ = <-watcher.Events:
128 // Somethin' happened!
129 case err := <-watcher.Errors:
130 out <- nw.errmsg(err)
131 return
132 }
133 }
134}
135
136func (nw *Network) Write(data []byte) {
137 epoch := time.Now().Unix()
138 pid := os.Getpid()
139 filename := fmt.Sprintf("%d-%d-%d.txt", epoch, pid, nw.seq)
140
141 filepath := path.Join(nw.basePath, "outq", filename)
142 ioutil.WriteFile(filepath, data, 0750)
143 nw.seq += 1
144}
145
146
147func Networks(basePath string) (found []*Network) {
148
149 dir, err := os.Open(basePath)
150 if err != nil {
151 return
152 }
153 defer dir.Close()
154
155
156 entities, _ := dir.Readdirnames(0)
157 for _, fn := range entities {
158 netdir := path.Join(basePath, fn)
159
160 _, err = os.Stat(path.Join(netdir, "nick"))
161 if err != nil {
162 continue
163 }
164
165 nw := NewNetwork(netdir)
166 found = append(found, nw)
167 }
168
169 return
170}
171