From a13cd98e292c4ea1a63d455106b2e84e39ec63f7 Mon Sep 17 00:00:00 2001 From: esrrhs Date: Sat, 26 Oct 2019 12:01:30 +0800 Subject: [PATCH] add --- client.go | 118 +++++++++++++++++++++++++++++----------- cmd/main.go | 2 +- framemgr.go | 47 ++++++++++++++-- msg.pb.go | 57 +++++++++++--------- msg.proto | 1 + pingtunnel.go | 9 +--- pingtunnel_test.go | 3 +- server.go | 131 +++++++++++++++++++++++++++++++++------------ 8 files changed, 265 insertions(+), 103 deletions(-) diff --git a/client.go b/client.go index 379b78e..7d414c3 100644 --- a/client.go +++ b/client.go @@ -1,6 +1,7 @@ package pingtunnel import ( + "github.com/esrrhs/go-engine/src/common" "github.com/esrrhs/go-engine/src/loggo" "github.com/golang/protobuf/proto" "golang.org/x/net/icmp" @@ -189,7 +190,7 @@ func (p *Client) AcceptTcp() error { if err != nil { nerr, ok := err.(net.Error) if !ok || !nerr.Timeout() { - loggo.Error("Error accept tcp %s", err) + loggo.Info("Error accept tcp %s", err) continue } } @@ -217,15 +218,21 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) { bytes := make([]byte, 10240) + tcpActiveRecvTime := time.Now() + tcpActiveSendTime := time.Now() + for { - left := clientConn.fm.GetSendBufferLeft() - if left >= len(bytes) { + now := time.Now() + + left := common.MinOfInt(clientConn.fm.GetSendBufferLeft(), len(bytes)) + if left > 0 { conn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) - n, err := conn.Read(bytes) + n, err := conn.Read(bytes[0:left]) if err != nil { nerr, ok := err.(net.Error) if !ok || !nerr.Timeout() { - loggo.Error("Error read tcp %s %s %s", uuid, tcpsrcaddr.String(), err) + loggo.Info("Error read tcp %s %s %s", uuid, tcpsrcaddr.String(), err) + clientConn.fm.Close() break } } @@ -237,38 +244,33 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) { clientConn.fm.Update() sendlist := clientConn.fm.getSendList() - - now := time.Now() - clientConn.activeSendTime = now - - for e := sendlist.Front(); e != nil; e = e.Next() { - - f := e.Value.(*Frame) - mb, err := proto.Marshal(f) - if err != nil { - loggo.Error("Error tcp Marshal %s %s %s", uuid, tcpsrcaddr.String(), err) - continue + if sendlist.Len() > 0 { + clientConn.activeSendTime = now + for e := sendlist.Front(); e != nil; e = e.Next() { + f := e.Value.(*Frame) + mb, err := proto.Marshal(f) + if err != nil { + loggo.Error("Error tcp Marshal %s %s %s", uuid, tcpsrcaddr.String(), err) + continue + } + p.sequence++ + sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb, + SEND_PROTO, RECV_PROTO, p.key, + p.tcpmode, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems) + p.sendPacket++ + p.sendPacketSize += (uint64)(len(mb)) } - - p.sequence++ - - p.sendPacket++ - p.sendPacketSize += (uint64)(len(mb)) - - sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb, - SEND_PROTO, RECV_PROTO, p.key, - p.tcpmode, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems) } if clientConn.fm.GetRecvBufferSize() > 0 { rr := clientConn.fm.GetRecvReadLineBuffer() - conn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100)) n, err := conn.Write(rr) if err != nil { nerr, ok := err.(net.Error) if !ok || !nerr.Timeout() { - loggo.Error("Error write tcp %s %s %s", uuid, tcpsrcaddr.String(), err) + loggo.Info("Error write tcp %s %s %s", uuid, tcpsrcaddr.String(), err) + clientConn.fm.Close() break } } @@ -279,10 +281,66 @@ func (p *Client) AcceptTcpConn(conn *net.TCPConn) { diffrecv := now.Sub(clientConn.activeRecvTime) diffsend := now.Sub(clientConn.activeSendTime) - if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) { + tcpdiffrecv := now.Sub(tcpActiveRecvTime) + tcpdiffsend := now.Sub(tcpActiveSendTime) + if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) || + tcpdiffrecv > time.Second*(time.Duration(p.timeout)) || tcpdiffsend > time.Second*(time.Duration(p.timeout)) { loggo.Info("close inactive conn %s %s", clientConn.id, clientConn.tcpaddr.String()) + clientConn.fm.Close() break } + + if clientConn.fm.IsRemoteClosed() { + loggo.Info("closed by remote conn %s %s", clientConn.id, clientConn.tcpaddr.String()) + clientConn.fm.Close() + break + } + } + + startCloseTime := time.Now() + for { + now := time.Now() + + clientConn.fm.Update() + + sendlist := clientConn.fm.getSendList() + for e := sendlist.Front(); e != nil; e = e.Next() { + f := e.Value.(*Frame) + mb, _ := proto.Marshal(f) + p.sequence++ + sendICMP(p.id, p.sequence, *p.conn, p.ipaddrServer, p.targetAddr, clientConn.id, (uint32)(MyMsg_DATA), mb, + SEND_PROTO, RECV_PROTO, p.key, + p.tcpmode, p.tcpmode_buffersize, p.tcpmode_maxwin, p.tcpmode_resend_timems) + p.sendPacket++ + p.sendPacketSize += (uint64)(len(mb)) + } + + nodatarecv := true + if clientConn.fm.GetRecvBufferSize() > 0 { + rr := clientConn.fm.GetRecvReadLineBuffer() + conn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100)) + n, _ := conn.Write(rr) + if n > 0 { + clientConn.fm.SkipRecvBuffer(n) + nodatarecv = false + } + } + + diffclose := now.Sub(startCloseTime) + timeout := diffclose > time.Second*(time.Duration(p.timeout)) + remoteclosed := clientConn.fm.IsRemoteClosed() + + if timeout { + loggo.Info("close conn had timeout %s %s", clientConn.id, clientConn.tcpaddr.String()) + break + } + + if remoteclosed && nodatarecv { + loggo.Info("remote conn had closed %s %s", clientConn.id, clientConn.tcpaddr.String()) + break + } + + time.Sleep(time.Millisecond * 100) } loggo.Info("close tcp conn %s %s", clientConn.id, clientConn.tcpaddr.String()) @@ -302,7 +360,7 @@ func (p *Client) Accept() error { if err != nil { nerr, ok := err.(net.Error) if !ok || !nerr.Timeout() { - loggo.Error("Error read udp %s", err) + loggo.Info("Error read udp %s", err) continue } } @@ -379,7 +437,7 @@ func (p *Client) processPacket(packet *Packet) { } else { _, err := p.listenConn.WriteToUDP(packet.my.Data, addr) if err != nil { - loggo.Error("WriteToUDP Error read udp %s", err) + loggo.Info("WriteToUDP Error read udp %s", err) clientConn.close = true return } diff --git a/cmd/main.go b/cmd/main.go index 544101a..f41d19b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -76,7 +76,7 @@ func main() { loggo.Info("key %d", *key) if *t == "server" { - s, err := pingtunnel.NewServer(*timeout, *key) + s, err := pingtunnel.NewServer(*key) if err != nil { loggo.Error("ERROR: %s", err.Error()) return diff --git a/framemgr.go b/framemgr.go index 3930a2a..9b64b0f 100644 --- a/framemgr.go +++ b/framemgr.go @@ -22,6 +22,9 @@ type FrameMgr struct { recvwin *list.List recvlist *list.List recvid int + + close bool + remoteclosed bool } func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr { @@ -33,7 +36,8 @@ func NewFrameMgr(buffersize int, windowsize int, resend_timems int) *FrameMgr { recvlock: &sync.Mutex{}, windowsize: windowsize, resend_timems: resend_timems, sendwin: list.New(), sendlist: list.New(), sendid: 0, - recvwin: list.New(), recvlist: list.New(), recvid: 0} + recvwin: list.New(), recvlist: list.New(), recvid: 0, + close: false, remoteclosed: false} return fm } @@ -95,6 +99,18 @@ func (fm *FrameMgr) cutSendBufferToWindow() { fm.sendwin.PushBack(f) } + + if fm.sendb.Empty() && fm.close { + f := &Frame{Type: (int32)(Frame_DATA), Resend: false, Sendtime: 0, + Id: (int32)(fm.sendid), + Data: make([]byte, 0)} + fm.sendwin.PushBack(f) + + fm.sendid++ + if fm.sendid >= FRAME_MAX_ID { + fm.sendid = 0 + } + } } func (fm *FrameMgr) calSendList() { @@ -116,7 +132,6 @@ func (fm *FrameMgr) getSendList() *list.List { func (fm *FrameMgr) OnRecvFrame(f *Frame) { fm.recvlock.Lock() defer fm.recvlock.Unlock() - fm.recvlist.PushBack(f) } @@ -236,6 +251,9 @@ func (fm *FrameMgr) combineWindowToRecvBuffer() { if f.Id == (int32)(id) { left := fm.recvb.Capacity() - fm.recvb.Size() if left >= len(f.Data) { + if len(f.Data) == 0 { + fm.remoteclosed = true + } fm.recvb.Write(f.Data) fm.recvwin.Remove(e) done = true @@ -261,16 +279,24 @@ func (fm *FrameMgr) combineWindowToRecvBuffer() { if f.Id != (int32)(id) { reqtmp[id]++ } else { - reqtmp[id]++ e = e.Next() } id++ - if fm.recvid >= FRAME_MAX_ID { - fm.recvid = 0 + if id >= FRAME_MAX_ID { + id = 0 } } + for len(reqtmp) < fm.windowsize { + reqtmp[id]++ + id++ + if id >= FRAME_MAX_ID { + id = 0 + } + break + } + f := &Frame{Type: (int32)(Frame_REQ), Resend: false, Sendtime: 0, Id: 0, Dataid: make([]int32, len(reqtmp))} @@ -293,3 +319,14 @@ func (fm *FrameMgr) GetRecvReadLineBuffer() []byte { func (fm *FrameMgr) SkipRecvBuffer(size int) { fm.recvb.SkipRead(size) } + +func (fm *FrameMgr) Close() { + fm.recvlock.Lock() + defer fm.recvlock.Unlock() + + fm.close = true +} + +func (fm *FrameMgr) IsRemoteClosed() bool { + return fm.remoteclosed +} diff --git a/msg.pb.go b/msg.pb.go index 2fa5630..df82ed3 100644 --- a/msg.pb.go +++ b/msg.pb.go @@ -88,6 +88,7 @@ type MyMsg struct { TcpmodeBuffersize int32 `protobuf:"varint,9,opt,name=tcpmode_buffersize,json=tcpmodeBuffersize,proto3" json:"tcpmode_buffersize,omitempty"` TcpmodeMaxwin int32 `protobuf:"varint,10,opt,name=tcpmode_maxwin,json=tcpmodeMaxwin,proto3" json:"tcpmode_maxwin,omitempty"` TcpmodeResendTimems int32 `protobuf:"varint,11,opt,name=tcpmode_resend_timems,json=tcpmodeResendTimems,proto3" json:"tcpmode_resend_timems,omitempty"` + Timeout int32 `protobuf:"varint,12,opt,name=timeout,proto3" json:"timeout,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -195,6 +196,13 @@ func (m *MyMsg) GetTcpmodeResendTimems() int32 { return 0 } +func (m *MyMsg) GetTimeout() int32 { + if m != nil { + return m.Timeout + } + return 0 +} + type Frame struct { Type int32 `protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"` Resend bool `protobuf:"varint,2,opt,name=resend,proto3" json:"resend,omitempty"` @@ -284,28 +292,29 @@ func init() { func init() { proto.RegisterFile("msg.proto", fileDescriptor_c06e4cca6c2cc899) } var fileDescriptor_c06e4cca6c2cc899 = []byte{ - // 366 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x51, 0xdd, 0x8a, 0xda, 0x40, - 0x18, 0xed, 0x24, 0x99, 0x18, 0x3f, 0xad, 0xc4, 0xe9, 0x0f, 0x43, 0xaf, 0x42, 0xa0, 0x34, 0x37, - 0xed, 0x45, 0xfb, 0x04, 0xd1, 0x5a, 0x91, 0x92, 0x62, 0x07, 0x6f, 0xda, 0x1b, 0x89, 0x66, 0x0c, - 0x43, 0x9b, 0x18, 0x92, 0x91, 0x5d, 0xf7, 0x9d, 0x96, 0x7d, 0x82, 0x7d, 0x9b, 0x7d, 0x90, 0x65, - 0x3e, 0x47, 0x41, 0xf6, 0x6a, 0xce, 0xf9, 0xce, 0x61, 0xbe, 0x8f, 0x73, 0xa0, 0x5f, 0x75, 0xe5, - 0x97, 0xa6, 0xdd, 0xeb, 0x7d, 0xfc, 0xe4, 0x00, 0xcd, 0x8e, 0x59, 0x57, 0xb2, 0x11, 0x38, 0xaa, - 0xe0, 0x24, 0x22, 0x49, 0x5f, 0x38, 0xaa, 0x60, 0x0c, 0x3c, 0x7d, 0x6c, 0x24, 0x77, 0x22, 0x92, - 0x50, 0x81, 0x98, 0xbd, 0x07, 0x5f, 0xe7, 0x6d, 0x29, 0x35, 0x77, 0xd1, 0x67, 0x99, 0xf1, 0x16, - 0xb9, 0xce, 0xb9, 0x17, 0x91, 0x64, 0x28, 0x10, 0x1b, 0x6f, 0x8b, 0x3b, 0x38, 0x8d, 0x48, 0x32, - 0x16, 0x96, 0xb1, 0xb7, 0x40, 0xab, 0xbc, 0x54, 0x5b, 0xee, 0xe3, 0xf8, 0x44, 0x58, 0x08, 0xee, - 0x3f, 0x79, 0xe4, 0x3d, 0x9c, 0x19, 0xc8, 0x38, 0xf4, 0xf4, 0xb6, 0xa9, 0xf6, 0x85, 0xe4, 0x01, - 0x9e, 0x70, 0xa6, 0xec, 0x33, 0x30, 0x0b, 0xd7, 0x9b, 0xc3, 0x6e, 0x27, 0xdb, 0x4e, 0xdd, 0x49, - 0xde, 0x47, 0xd3, 0xd8, 0x2a, 0x93, 0x8b, 0xc0, 0x3e, 0xc2, 0xe8, 0x6c, 0xaf, 0xf2, 0xdb, 0x1b, - 0x55, 0x73, 0x40, 0xeb, 0x6b, 0x3b, 0xcd, 0x70, 0xc8, 0xbe, 0xc2, 0xbb, 0xb3, 0xad, 0x95, 0x9d, - 0xac, 0x8b, 0xb5, 0x56, 0x95, 0xac, 0x3a, 0x3e, 0x40, 0xf7, 0x1b, 0x2b, 0x0a, 0xd4, 0x56, 0x28, - 0xc5, 0x9f, 0xc0, 0x5b, 0xfd, 0x59, 0xce, 0x58, 0x00, 0xde, 0xf7, 0x74, 0x95, 0x86, 0xaf, 0x0c, - 0x5a, 0x2e, 0x7e, 0xcd, 0x43, 0xc2, 0x06, 0x40, 0xb3, 0x74, 0xbe, 0x98, 0x86, 0xf7, 0x8f, 0x6e, - 0xfc, 0x40, 0x80, 0xfe, 0x68, 0xf3, 0x4a, 0x5e, 0x62, 0x25, 0xd7, 0xb1, 0x9e, 0x56, 0x62, 0xd8, - 0x81, 0xb0, 0x8c, 0x7d, 0x80, 0xc0, 0xbc, 0xe6, 0x0e, 0x0c, 0xdc, 0x15, 0x17, 0x6e, 0xeb, 0xf2, - 0xf0, 0x17, 0x5b, 0x17, 0x56, 0x40, 0xaf, 0x2b, 0x30, 0xaf, 0x2a, 0xb8, 0x1f, 0xb9, 0x09, 0x15, - 0x96, 0xc5, 0xf1, 0x8b, 0xb3, 0x7b, 0xe0, 0x8a, 0xd9, 0xef, 0x90, 0x18, 0x90, 0x4e, 0x7f, 0x86, - 0xce, 0x64, 0xf8, 0x17, 0x1a, 0x55, 0x97, 0xfa, 0x50, 0xd7, 0xf2, 0xff, 0xc6, 0xc7, 0xee, 0xbe, - 0x3d, 0x07, 0x00, 0x00, 0xff, 0xff, 0xa7, 0xdc, 0xfd, 0xb2, 0x3a, 0x02, 0x00, 0x00, + // 372 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x51, 0xdd, 0xca, 0xd3, 0x30, + 0x18, 0x36, 0x6d, 0xd3, 0x75, 0xef, 0x37, 0x3f, 0xba, 0xf8, 0x43, 0xf0, 0xa8, 0x14, 0xc4, 0x9e, + 0xe8, 0x81, 0x5e, 0x41, 0x37, 0xe7, 0x18, 0x52, 0x99, 0x61, 0x27, 0x7a, 0x32, 0xba, 0x35, 0x2b, + 0x41, 0xd3, 0x96, 0x36, 0x43, 0xe7, 0x3d, 0x89, 0x57, 0xe0, 0xad, 0x89, 0xe4, 0x5d, 0x36, 0x18, + 0xdf, 0x51, 0x9e, 0x3f, 0x92, 0x87, 0x3c, 0x30, 0xd6, 0x43, 0xfd, 0xa6, 0xeb, 0x5b, 0xd3, 0xa6, + 0xff, 0x3c, 0xa0, 0xc5, 0xa9, 0x18, 0x6a, 0x76, 0x0f, 0x9e, 0xaa, 0x38, 0x49, 0x48, 0x36, 0x16, + 0x9e, 0xaa, 0x18, 0x83, 0xc0, 0x9c, 0x3a, 0xc9, 0xbd, 0x84, 0x64, 0x54, 0x20, 0x66, 0xcf, 0x21, + 0x34, 0x65, 0x5f, 0x4b, 0xc3, 0x7d, 0xcc, 0x39, 0x66, 0xb3, 0x55, 0x69, 0x4a, 0x1e, 0x24, 0x24, + 0x9b, 0x08, 0xc4, 0x36, 0xdb, 0xe3, 0x1b, 0x9c, 0x26, 0x24, 0x9b, 0x0a, 0xc7, 0xd8, 0x53, 0xa0, + 0xba, 0xac, 0xd5, 0x9e, 0x87, 0x28, 0x9f, 0x09, 0x8b, 0xc1, 0xff, 0x26, 0x4f, 0x7c, 0x84, 0x9a, + 0x85, 0x8c, 0xc3, 0xc8, 0xec, 0x3b, 0xdd, 0x56, 0x92, 0x47, 0x58, 0xe1, 0x42, 0xd9, 0x6b, 0x60, + 0x0e, 0x6e, 0x77, 0xc7, 0xc3, 0x41, 0xf6, 0x83, 0xfa, 0x25, 0xf9, 0x18, 0x43, 0x53, 0xe7, 0xcc, + 0xae, 0x06, 0x7b, 0x09, 0xf7, 0x97, 0xb8, 0x2e, 0x7f, 0xfe, 0x50, 0x0d, 0x07, 0x8c, 0x3e, 0x76, + 0x6a, 0x81, 0x22, 0x7b, 0x0b, 0xcf, 0x2e, 0xb1, 0x5e, 0x0e, 0xb2, 0xa9, 0xb6, 0x46, 0x69, 0xa9, + 0x07, 0x7e, 0x87, 0xe9, 0x27, 0xce, 0x14, 0xe8, 0x6d, 0xd0, 0xc2, 0x8e, 0x4a, 0xcb, 0xf6, 0x68, + 0xf8, 0xc4, 0x75, 0x3c, 0xd3, 0xf4, 0x15, 0x04, 0x9b, 0x2f, 0xeb, 0x05, 0x8b, 0x20, 0x78, 0x9f, + 0x6f, 0xf2, 0xf8, 0x91, 0x45, 0xeb, 0xd5, 0xa7, 0x65, 0x4c, 0xd8, 0x1d, 0xd0, 0x22, 0x5f, 0xae, + 0xe6, 0xf1, 0xef, 0xbf, 0x7e, 0xfa, 0x87, 0x00, 0xfd, 0xd0, 0x97, 0x5a, 0x5e, 0x3f, 0x9c, 0xdc, + 0x7e, 0xf8, 0xb9, 0x0c, 0xce, 0x10, 0x09, 0xc7, 0xd8, 0x0b, 0x88, 0xec, 0x69, 0x5f, 0xc3, 0x29, + 0x7c, 0x71, 0xe5, 0x6e, 0xc8, 0x00, 0x6f, 0x71, 0x43, 0xe2, 0x38, 0xf4, 0x76, 0x1c, 0x7b, 0xaa, + 0x8a, 0x87, 0x89, 0x9f, 0x51, 0xe1, 0x58, 0x9a, 0x3e, 0xa8, 0x3d, 0x02, 0x5f, 0x2c, 0x3e, 0xc7, + 0xc4, 0x82, 0x7c, 0xfe, 0x31, 0xf6, 0x66, 0x93, 0xaf, 0xd0, 0xa9, 0xa6, 0x36, 0xc7, 0xa6, 0x91, + 0xdf, 0x77, 0x21, 0xae, 0xfa, 0xee, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x6c, 0x65, 0x2c, 0xe9, + 0x54, 0x02, 0x00, 0x00, } diff --git a/msg.proto b/msg.proto index a539354..a0d5553 100644 --- a/msg.proto +++ b/msg.proto @@ -19,6 +19,7 @@ message MyMsg { int32 tcpmode_buffersize = 9; int32 tcpmode_maxwin = 10; int32 tcpmode_resend_timems = 11; + int32 timeout = 12; } message Frame { diff --git a/pingtunnel.go b/pingtunnel.go index a494526..59fb1b6 100644 --- a/pingtunnel.go +++ b/pingtunnel.go @@ -65,7 +65,7 @@ func sendICMP(id int, sequence int, conn icmp.PacketConn, server *net.IPAddr, ta continue } } - loggo.Error("sendICMP WriteTo error %s %s", server.String(), err) + loggo.Info("sendICMP WriteTo error %s %s", server.String(), err) } break } @@ -83,7 +83,7 @@ func recvICMP(conn icmp.PacketConn, recv chan<- *Packet) { if err != nil { nerr, ok := err.(net.Error) if !ok || !nerr.Timeout() { - loggo.Error("Error read icmp message %s", err) + loggo.Info("Error read icmp message %s", err) continue } } @@ -107,11 +107,6 @@ func recvICMP(conn icmp.PacketConn, recv chan<- *Packet) { continue } - if my.Data == nil { - loggo.Error("processPacket data nil %s", my.Id) - continue - } - recv <- &Packet{my: my, src: srcaddr.(*net.IPAddr), echoId: echoId, echoSeq: echoSeq} diff --git a/pingtunnel_test.go b/pingtunnel_test.go index 20d5a73..ff86c96 100644 --- a/pingtunnel_test.go +++ b/pingtunnel_test.go @@ -12,13 +12,14 @@ func Test0001(t *testing.T) { my.Id = "12345" my.Target = "111:11" my.Type = 12 - my.Data = make([]byte, 3) + my.Data = make([]byte, 0) dst, _ := proto.Marshal(my) fmt.Println("dst = ", dst) my1 := &MyMsg{} proto.Unmarshal(dst, my1) fmt.Println("my1 = ", my1) + fmt.Println("my1.Data = ", my1.Data) proto.Unmarshal(dst[0:4], my1) fmt.Println("my1 = ", my1) diff --git a/server.go b/server.go index e6a3b2e..62234a9 100644 --- a/server.go +++ b/server.go @@ -1,6 +1,7 @@ package pingtunnel import ( + "github.com/esrrhs/go-engine/src/common" "github.com/esrrhs/go-engine/src/loggo" "github.com/golang/protobuf/proto" "golang.org/x/net/icmp" @@ -8,16 +9,14 @@ import ( "time" ) -func NewServer(timeout int, key int) (*Server, error) { +func NewServer(key int) (*Server, error) { return &Server{ - timeout: timeout, - key: key, + key: key, }, nil } type Server struct { - timeout int - key int + key int conn *icmp.PacketConn @@ -33,6 +32,7 @@ type Server struct { } type ServerConn struct { + timeout int ipaddrTarget *net.UDPAddr conn *net.UDPConn tcpaddrTarget *net.TCPAddr @@ -118,7 +118,7 @@ func (p *Server) processPacket(packet *Packet) { fm := NewFrameMgr((int)(packet.my.TcpmodeBuffersize), (int)(packet.my.TcpmodeMaxwin), (int)(packet.my.TcpmodeResendTimems)) - localConn = &ServerConn{tcpconn: targetConn, tcpaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false, + localConn = &ServerConn{timeout: (int)(packet.my.Timeout), tcpconn: targetConn, tcpaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false, rproto: (int)(packet.my.Rproto), fm: fm, tcpmode: (int)(packet.my.Tcpmode)} p.localConnMap[id] = localConn @@ -140,7 +140,7 @@ func (p *Server) processPacket(packet *Packet) { return } - localConn = &ServerConn{conn: targetConn, ipaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false, + localConn = &ServerConn{timeout: (int)(packet.my.Timeout), conn: targetConn, ipaddrTarget: ipaddrTarget, id: id, activeRecvTime: now, activeSendTime: now, close: false, rproto: (int)(packet.my.Rproto), tcpmode: (int)(packet.my.Tcpmode)} p.localConnMap[id] = localConn @@ -166,7 +166,7 @@ func (p *Server) processPacket(packet *Packet) { } else { _, err := localConn.conn.Write(packet.my.Data) if err != nil { - loggo.Error("WriteToUDP Error %s", err) + loggo.Info("WriteToUDP Error %s", err) localConn.close = true return } @@ -183,72 +183,133 @@ func (p *Server) RecvTCP(conn *ServerConn, id string, src *net.IPAddr) { bytes := make([]byte, 10240) + tcpActiveRecvTime := time.Now() + tcpActiveSendTime := time.Now() + for { - left := conn.fm.GetSendBufferLeft() - if left >= len(bytes) { + now := time.Now() + + left := common.MinOfInt(conn.fm.GetSendBufferLeft(), len(bytes)) + if left > 0 { conn.tcpconn.SetReadDeadline(time.Now().Add(time.Millisecond * 100)) - n, err := conn.tcpconn.Read(bytes) + n, err := conn.tcpconn.Read(bytes[0:left]) if err != nil { nerr, ok := err.(net.Error) if !ok || !nerr.Timeout() { - loggo.Error("Error read tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) + loggo.Info("Error read tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) + conn.fm.Close() break } } if n > 0 { conn.fm.WriteSendBuffer(bytes[:n]) + tcpActiveRecvTime = now } } conn.fm.Update() sendlist := conn.fm.getSendList() - - now := time.Now() - conn.activeSendTime = now - - for e := sendlist.Front(); e != nil; e = e.Next() { - - f := e.Value.(*Frame) - mb, err := proto.Marshal(f) - if err != nil { - loggo.Error("Error tcp Marshal %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) - continue + if sendlist.Len() > 0 { + conn.activeSendTime = now + for e := sendlist.Front(); e != nil; e = e.Next() { + f := e.Value.(*Frame) + mb, err := proto.Marshal(f) + if err != nil { + loggo.Error("Error tcp Marshal %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) + continue + } + sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb, + conn.rproto, -1, p.key, + 0, 0, 0, 0) + p.sendPacket++ + p.sendPacketSize += (uint64)(len(mb)) } - - sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb, - conn.rproto, -1, p.key, - 0, 0, 0, 0) - - p.sendPacket++ - p.sendPacketSize += (uint64)(len(mb)) } if conn.fm.GetRecvBufferSize() > 0 { rr := conn.fm.GetRecvReadLineBuffer() - conn.tcpconn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100)) n, err := conn.tcpconn.Write(rr) if err != nil { nerr, ok := err.(net.Error) if !ok || !nerr.Timeout() { - loggo.Error("Error write tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) + loggo.Info("Error write tcp %s %s %s", conn.id, conn.tcpaddrTarget.String(), err) + conn.fm.Close() break } } if n > 0 { conn.fm.SkipRecvBuffer(n) + tcpActiveSendTime = now } } diffrecv := now.Sub(conn.activeRecvTime) diffsend := now.Sub(conn.activeSendTime) - if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) { + tcpdiffrecv := now.Sub(tcpActiveRecvTime) + tcpdiffsend := now.Sub(tcpActiveSendTime) + if diffrecv > time.Second*(time.Duration(conn.timeout)) || diffsend > time.Second*(time.Duration(conn.timeout)) || + tcpdiffrecv > time.Second*(time.Duration(conn.timeout)) || tcpdiffsend > time.Second*(time.Duration(conn.timeout)) { loggo.Info("close inactive conn %s %s", conn.id, conn.tcpaddrTarget.String()) + conn.fm.Close() + break + } + + if conn.fm.IsRemoteClosed() { + loggo.Info("closed by remote conn %s %s", conn.id, conn.tcpaddrTarget.String()) + conn.fm.Close() break } } + startCloseTime := time.Now() + for { + now := time.Now() + + conn.fm.Update() + + sendlist := conn.fm.getSendList() + for e := sendlist.Front(); e != nil; e = e.Next() { + f := e.Value.(*Frame) + mb, _ := proto.Marshal(f) + sendICMP(p.echoId, p.echoSeq, *p.conn, src, "", id, (uint32)(MyMsg_DATA), mb, + conn.rproto, -1, p.key, + 0, 0, 0, 0) + p.sendPacket++ + p.sendPacketSize += (uint64)(len(mb)) + } + + nodatarecv := true + if conn.fm.GetRecvBufferSize() > 0 { + rr := conn.fm.GetRecvReadLineBuffer() + conn.tcpconn.SetWriteDeadline(time.Now().Add(time.Millisecond * 100)) + n, _ := conn.tcpconn.Write(rr) + if n > 0 { + conn.fm.SkipRecvBuffer(n) + nodatarecv = false + } + } + + diffclose := now.Sub(startCloseTime) + timeout := diffclose > time.Second*(time.Duration(conn.timeout)) + remoteclosed := conn.fm.IsRemoteClosed() + + if timeout { + loggo.Info("close conn had timeout %s %s", conn.id, conn.tcpaddrTarget.String()) + break + } + + if remoteclosed && nodatarecv { + loggo.Info("remote conn had closed %s %s", conn.id, conn.tcpaddrTarget.String()) + break + } + + time.Sleep(time.Millisecond * 100) + } + + time.Sleep(time.Second) + loggo.Info("close tcp conn %s %s", conn.id, conn.tcpaddrTarget.String()) p.Close(conn) } @@ -265,7 +326,7 @@ func (p *Server) Recv(conn *ServerConn, id string, src *net.IPAddr) { if err != nil { nerr, ok := err.(net.Error) if !ok || !nerr.Timeout() { - loggo.Error("ReadFromUDP Error read udp %s", err) + loggo.Info("ReadFromUDP Error read udp %s", err) conn.close = true return } @@ -304,7 +365,7 @@ func (p *Server) checkTimeoutConn() { } diffrecv := now.Sub(conn.activeRecvTime) diffsend := now.Sub(conn.activeSendTime) - if diffrecv > time.Second*(time.Duration(p.timeout)) || diffsend > time.Second*(time.Duration(p.timeout)) { + if diffrecv > time.Second*(time.Duration(conn.timeout)) || diffsend > time.Second*(time.Duration(conn.timeout)) { conn.close = true } }