Neale Pickett
·
2020-09-24
stream.go
1package netshovel
2
3import (
4 "fmt"
5 "io"
6 "net/url"
7 "os"
8 "strings"
9 "time"
10
11 "github.com/dirtbags/netshovel/gapstring"
12 "github.com/google/gopacket"
13 "github.com/google/gopacket/tcpassembly"
14)
15
16// NamedFile stores a file and the path where it lives
17type NamedFile struct {
18 *os.File
19 Name string
20}
21
22// Utterance is an atomic communication within a Stream
23//
24// Streams consist of a string of Utterances.
25// Each utterance has associated data, and a time stamp.
26//
27// Typically these line up with what crosses the network,
28// but bear in mind that TCP is a streaming protocol,
29// so don't rely on Utterances alone to separate Application-layer packets.
30type Utterance struct {
31 When time.Time
32 Data gapstring.GapString
33}
34
35// A Stream is one half of a two-way conversation
36type Stream struct {
37 Net, Transport gopacket.Flow
38 conversation chan Utterance
39 pending Utterance
40}
41
42// NewStream returns a newly-built Stream
43//
44// You should embed Stream into your own Application protocol stream struct.
45// Use this to initialize the internal stuff netshovel needs.
46func NewStream(net, transport gopacket.Flow) *Stream {
47 return &Stream{
48 Net: net,
49 Transport: transport,
50 conversation: make(chan Utterance, 100),
51 }
52}
53
54// Reassembled is called by the TCP assembler when an Utterance can be built
55func (stream *Stream) Reassembled(rs []tcpassembly.Reassembly) {
56 ret := Utterance{
57 When: rs[0].Seen,
58 }
59 for _, r := range rs {
60 if r.Skip > 0 {
61 ret.Data = ret.Data.AppendGap(r.Skip)
62 }
63 ret.Data = ret.Data.AppendBytes(r.Bytes)
64 }
65
66 // Throw away utterances with no data (SYN, ACK, FIN, &c)
67 if ret.Data.Length() > 0 {
68 stream.conversation <- ret
69 }
70}
71
72// ReassemblyComplete is called by the TCP assemble when the Stream is closed
73func (stream *Stream) ReassemblyComplete() {
74 close(stream.conversation)
75}
76
77// Read an utterance of a particular size
78//
79// If you pass in a length of -1,
80// this returns utterances as they appear in the conversation.
81//
82// At first, your decoder will probably want to use a length of -1:
83// this will give you a sense of how the conversation works.
84// When you begin to understand the structure of your protocol,
85// change this to a positive integer,
86// so that if you have a large application-layer packet,
87// or multiple application-layer packets in a single transport-layer packet,
88// your decoder handles it properly.
89func (stream *Stream) Read(length int) (Utterance, error) {
90 // This probably indicates a problem, but we assume you know what you're doing
91 if length == 0 {
92 return Utterance{}, nil
93 }
94
95 // Special case: length=-1 means "give me the next utterance"
96 if length == -1 {
97 var ret Utterance
98 var err error = nil
99 if stream.pending.Data.Length() > 0 {
100 ret = stream.pending
101 stream.pending.Data = gapstring.GapString{}
102 } else {
103 r, more := <-stream.conversation
104 if !more {
105 err = io.EOF
106 }
107 ret = r
108 }
109 return ret, err
110 }
111
112 // Pull in utterances until we have enough data.
113 // .When will always be the timestamp on the last received utterance
114 for stream.pending.Data.Length() < length {
115 u, more := <-stream.conversation
116 if !more {
117 break
118 }
119 stream.pending.Data = stream.pending.Data.Append(u.Data)
120 stream.pending.When = u.When
121 }
122
123 pendingLen := stream.pending.Data.Length()
124 // If we got nothing, it's the end of the stream
125 if pendingLen == 0 {
126 return Utterance{}, io.EOF
127 }
128
129 sliceLen := length
130 if sliceLen > pendingLen {
131 sliceLen = pendingLen
132 }
133 ret := Utterance{
134 Data: stream.pending.Data.Slice(0, sliceLen),
135 When: stream.pending.When,
136 }
137 stream.pending.Data = stream.pending.Data.Slice(sliceLen, pendingLen)
138 return ret, nil
139}
140
141// Describe returns a string description of a packet
142//
143// This just prefixes our source and dest IP:Port to pkt.Describe()
144func (stream *Stream) Describe(pkt Packet) string {
145 out := new(strings.Builder)
146
147 fmt.Fprintf(out, "%v:%v → %v:%v\n",
148 stream.Net.Src().String(), stream.Transport.Src().String(),
149 stream.Net.Dst().String(), stream.Transport.Dst().String(),
150 )
151 out.WriteString(pkt.Describe())
152 return out.String()
153}
154
155// CreateFile returns a newly-created, truncated file
156//
157// This function creates consistently-named files,
158// which include a timestamp,
159// and URL-escaped full path to the file.
160//
161// Best practice is to pass in as full a path as you can find,
162// including drive letters and all parent directories.
163func (stream *Stream) CreateFile(when time.Time, path string) (NamedFile, error) {
164 name := fmt.Sprintf(
165 "xfer/%s,%sp%s,%sp%s,%s",
166 when.UTC().Format(time.RFC3339Nano),
167 stream.Net.Src().String(), stream.Transport.Src().String(),
168 stream.Net.Dst().String(), stream.Transport.Dst().String(),
169 url.PathEscape(path),
170 )
171 f, err := os.Create(name)
172 outf := NamedFile{
173 File: f,
174 Name: name,
175 }
176 return outf, err
177}