netshovel/stream.go

177 lines
4.6 KiB
Go
Raw Normal View History

package netshovel
2018-07-22 22:25:34 -06:00
import (
"fmt"
"io"
2018-07-23 17:48:14 -06:00
"os"
"net/url"
2018-07-22 22:25:34 -06:00
"strings"
"time"
"github.com/dirtbags/netshovel/gapstring"
"github.com/google/gopacket"
"github.com/google/gopacket/tcpassembly"
)
2018-07-24 17:32:08 -06:00
// A File and the path where it lives
2018-07-23 17:48:14 -06:00
type NamedFile struct {
*os.File
Name string
2018-07-22 22:25:34 -06:00
}
2018-07-24 17:32:08 -06:00
// An atomic communication within a Stream
//
// Streams consist of a string of Utterances.
// Each utterance has associated data, and a time stamp.
//
// Typically these line up with what crosses the network,
// but bear in mind that TCP is a streaming protocol,
// so don't rely on Utterances alone to separate Application-layer packets.
2018-07-22 22:25:34 -06:00
type Utterance struct {
When time.Time
Data gapstring.GapString
}
2018-07-24 17:32:08 -06:00
// A Stream is one half of a two-way conversation
2018-07-22 22:25:34 -06:00
type Stream struct {
Net, Transport gopacket.Flow
conversation chan Utterance
pending Utterance
}
2018-07-24 17:32:08 -06:00
// Return a newly-built Stream
//
// You should embed Stream into your own Application protocol stream struct.
// Use this to initialize the internal stuff netshovel needs.
2018-07-23 15:34:22 -06:00
func NewStream(net, transport gopacket.Flow) Stream {
return Stream{
2018-07-22 22:25:34 -06:00
Net: net,
Transport: transport,
conversation: make(chan Utterance, 100),
}
}
2018-07-24 17:32:08 -06:00
// Called by the TCP assembler when an Utterance can be built
2018-07-22 22:25:34 -06:00
func (stream *Stream) Reassembled(rs []tcpassembly.Reassembly) {
ret := Utterance{
When: rs[0].Seen,
}
for _, r := range rs {
if r.Skip > 0 {
ret.Data = ret.Data.AppendGap(r.Skip)
}
ret.Data = ret.Data.AppendBytes(r.Bytes)
}
// Throw away utterances with no data (SYN, ACK, FIN, &c)
if ret.Data.Length() > 0 {
stream.conversation <- ret
}
}
2018-07-24 17:32:08 -06:00
// Called by the TCP assemble when the Stream is closed
2018-07-22 22:25:34 -06:00
func (stream *Stream) ReassemblyComplete() {
close(stream.conversation)
}
2018-07-24 17:32:08 -06:00
// Read an utterance of a particular size
//
// If you pass in a length of -1,
// this returns utterances as they appear in the conversation.
//
// At first, your decoder will probably want to use a length of -1:
// this will give you a sense of how the conversation works.
// When you begin to understand the structure of your protocol,
// change this to a positive integer,
// so that if you have a large application-layer packet,
// or multiple application-layer packets in a single transport-layer packet,
// your decoder handles it properly.
2018-07-22 22:25:34 -06:00
func (stream *Stream) Read(length int) (Utterance, error) {
2018-07-24 14:11:50 -06:00
// This probably indicates a problem, but we assume you know what you're doing
if length == 0 {
return Utterance{}, nil
}
2018-07-22 22:25:34 -06:00
// Special case: length=-1 means "give me the next utterance"
if length == -1 {
2018-07-24 14:11:50 -06:00
var ret Utterance
var err error = nil
2018-07-22 22:25:34 -06:00
if stream.pending.Data.Length() > 0 {
2018-07-24 14:11:50 -06:00
ret = stream.pending
2018-07-22 22:25:34 -06:00
stream.pending.Data = gapstring.GapString{}
} else {
2018-07-24 14:11:50 -06:00
r, more := <- stream.conversation
2018-07-22 22:25:34 -06:00
if ! more {
2018-07-24 14:11:50 -06:00
err = io.EOF
2018-07-22 22:25:34 -06:00
}
2018-07-24 14:11:50 -06:00
ret = r
2018-07-22 22:25:34 -06:00
}
2018-07-24 14:11:50 -06:00
return ret, err
2018-07-22 22:25:34 -06:00
}
// Pull in utterances until we have enough data.
// .When will always be the timestamp on the last received utterance
for stream.pending.Data.Length() < length {
u, more := <- stream.conversation
if ! more {
break
}
stream.pending.Data = stream.pending.Data.Append(u.Data)
stream.pending.When = u.When
}
2018-07-23 21:17:41 -06:00
pendingLen := stream.pending.Data.Length()
2018-07-22 22:25:34 -06:00
// If we got nothing, it's the end of the stream
2018-07-23 21:17:41 -06:00
if pendingLen == 0 {
2018-07-22 22:25:34 -06:00
return Utterance{}, io.EOF
}
2018-07-23 21:17:41 -06:00
sliceLen := length
if sliceLen > pendingLen {
sliceLen = pendingLen
}
2018-07-22 22:25:34 -06:00
ret := Utterance{
2018-07-23 21:17:41 -06:00
Data: stream.pending.Data.Slice(0, sliceLen),
2018-07-22 22:25:34 -06:00
When: stream.pending.When,
}
2018-07-23 21:17:41 -06:00
stream.pending.Data = stream.pending.Data.Slice(sliceLen, pendingLen)
2018-07-22 22:25:34 -06:00
return ret, nil
}
2018-07-24 17:32:08 -06:00
// Return a string description of a packet
//
// This just prefixes our source and dest IP:Port to pkt.Describe()
2018-07-22 22:25:34 -06:00
func (stream *Stream) Describe(pkt Packet) string {
out := new(strings.Builder)
2018-07-23 09:58:31 -06:00
fmt.Fprintf(out, "%v:%v → %v:%v\n",
stream.Net.Src().String(), stream.Transport.Src().String(),
stream.Net.Dst().String(), stream.Transport.Dst().String(),
2018-07-23 09:58:31 -06:00
)
out.WriteString(pkt.Describe())
2018-07-22 22:25:34 -06:00
return out.String()
}
2018-07-23 17:48:14 -06:00
2018-07-24 17:32:08 -06:00
// Return a newly-created, truncated file
//
// This function creates consistently-named files,
// which include a timestamp,
// and URL-escaped full path to the file.
//
// Best practice is to pass in as full a path as you can find,
// including drive letters and all parent directories.
2018-07-23 17:48:14 -06:00
func (stream *Stream) CreateFile(when time.Time, path string) (NamedFile, error) {
name := fmt.Sprintf(
"xfer/%s,%sp%s,%sp%s,%s",
when.UTC().Format(time.RFC3339Nano),
stream.Net.Src().String(), stream.Transport.Src().String(),
stream.Net.Dst().String(), stream.Transport.Dst().String(),
url.PathEscape(path),
)
f, err := os.Create(name)
outf := NamedFile{
File: f,
Name: name,
}
return outf, err
}