netshovel/stream.go

132 lines
2.8 KiB
Go
Raw Normal View History

package netshovel
2018-07-22 22:25:34 -06:00
import (
"fmt"
"io"
2018-07-23 17:48:14 -06:00
"os"
"net/url"
2018-07-22 22:25:34 -06:00
"strings"
"time"
"github.com/dirtbags/netshovel/gapstring"
"github.com/google/gopacket"
"github.com/google/gopacket/tcpassembly"
)
2018-07-23 17:48:14 -06:00
type NamedFile struct {
*os.File
Name string
2018-07-22 22:25:34 -06:00
}
type Utterance struct {
When time.Time
Data gapstring.GapString
}
type Stream struct {
Net, Transport gopacket.Flow
conversation chan Utterance
pending Utterance
}
2018-07-23 15:34:22 -06:00
func NewStream(net, transport gopacket.Flow) Stream {
return Stream{
2018-07-22 22:25:34 -06:00
Net: net,
Transport: transport,
conversation: make(chan Utterance, 100),
}
}
func (stream *Stream) Reassembled(rs []tcpassembly.Reassembly) {
// XXX: How do we send timestamps?
ret := Utterance{
When: rs[0].Seen,
}
for _, r := range rs {
if r.Skip > 0 {
ret.Data = ret.Data.AppendGap(r.Skip)
}
ret.Data = ret.Data.AppendBytes(r.Bytes)
}
// Throw away utterances with no data (SYN, ACK, FIN, &c)
if ret.Data.Length() > 0 {
stream.conversation <- ret
}
}
func (stream *Stream) ReassemblyComplete() {
close(stream.conversation)
}
func (stream *Stream) Read(length int) (Utterance, error) {
// Special case: length=-1 means "give me the next utterance"
if length == -1 {
if stream.pending.Data.Length() > 0 {
ret := stream.pending
stream.pending.Data = gapstring.GapString{}
return ret, nil
} else {
ret, more := <- stream.conversation
if ! more {
return ret, io.EOF
}
return ret, nil
}
}
// Pull in utterances until we have enough data.
// .When will always be the timestamp on the last received utterance
for stream.pending.Data.Length() < length {
u, more := <- stream.conversation
if ! more {
break
}
stream.pending.Data = stream.pending.Data.Append(u.Data)
stream.pending.When = u.When
}
2018-07-23 21:17:41 -06:00
pendingLen := stream.pending.Data.Length()
2018-07-22 22:25:34 -06:00
// If we got nothing, it's the end of the stream
2018-07-23 21:17:41 -06:00
if pendingLen == 0 {
2018-07-22 22:25:34 -06:00
return Utterance{}, io.EOF
}
2018-07-23 21:17:41 -06:00
sliceLen := length
if sliceLen > pendingLen {
sliceLen = pendingLen
}
2018-07-22 22:25:34 -06:00
ret := Utterance{
2018-07-23 21:17:41 -06:00
Data: stream.pending.Data.Slice(0, sliceLen),
2018-07-22 22:25:34 -06:00
When: stream.pending.When,
}
2018-07-23 21:17:41 -06:00
stream.pending.Data = stream.pending.Data.Slice(sliceLen, pendingLen)
2018-07-22 22:25:34 -06:00
return ret, nil
}
func (stream *Stream) Describe(pkt Packet) string {
out := new(strings.Builder)
2018-07-23 09:58:31 -06:00
fmt.Fprintf(out, "%v:%v → %v:%v\n",
stream.Net.Src().String(), stream.Transport.Src().String(),
stream.Net.Dst().String(), stream.Transport.Dst().String(),
2018-07-23 09:58:31 -06:00
)
out.WriteString(pkt.Describe())
2018-07-22 22:25:34 -06:00
return out.String()
}
2018-07-23 17:48:14 -06:00
func (stream *Stream) CreateFile(when time.Time, path string) (NamedFile, error) {
name := fmt.Sprintf(
"xfer/%s,%sp%s,%sp%s,%s",
when.UTC().Format(time.RFC3339Nano),
stream.Net.Src().String(), stream.Transport.Src().String(),
stream.Net.Dst().String(), stream.Transport.Dst().String(),
url.PathEscape(path),
)
f, err := os.Create(name)
outf := NamedFile{
File: f,
Name: name,
}
return outf, err
}