703 lines
14 KiB
Go
Raw Permalink Normal View History

2021-12-04 16:42:11 +00:00
package kcp
import (
"crypto/sha1"
"fmt"
"io"
"log"
"net"
"net/http"
_ "net/http/pprof"
"sync"
"sync/atomic"
"testing"
"time"
"golang.org/x/crypto/pbkdf2"
)
var baseport = uint32(10000)
var key = []byte("testkey")
var pass = pbkdf2.Key(key, []byte("testsalt"), 4096, 32, sha1.New)
func init() {
go func() {
log.Println(http.ListenAndServe("0.0.0.0:6060", nil))
}()
log.Println("beginning tests, encryption:salsa20, fec:10/3")
}
func dialEcho(port int) (*UDPSession, error) {
//block, _ := NewNoneBlockCrypt(pass)
//block, _ := NewSimpleXORBlockCrypt(pass)
//block, _ := NewTEABlockCrypt(pass[:16])
//block, _ := NewAESBlockCrypt(pass)
block, _ := NewSalsa20BlockCrypt(pass)
sess, err := DialWithOptions(fmt.Sprintf("127.0.0.1:%v", port), block, 10, 3)
if err != nil {
panic(err)
}
sess.SetStreamMode(true)
sess.SetStreamMode(false)
sess.SetStreamMode(true)
sess.SetWindowSize(1024, 1024)
sess.SetReadBuffer(16 * 1024 * 1024)
sess.SetWriteBuffer(16 * 1024 * 1024)
sess.SetStreamMode(true)
sess.SetNoDelay(1, 10, 2, 1)
sess.SetMtu(1400)
sess.SetMtu(1600)
sess.SetMtu(1400)
sess.SetACKNoDelay(true)
sess.SetACKNoDelay(false)
sess.SetDeadline(time.Now().Add(time.Minute))
return sess, err
}
func dialSink(port int) (*UDPSession, error) {
sess, err := DialWithOptions(fmt.Sprintf("127.0.0.1:%v", port), nil, 0, 0)
if err != nil {
panic(err)
}
sess.SetStreamMode(true)
sess.SetWindowSize(1024, 1024)
sess.SetReadBuffer(16 * 1024 * 1024)
sess.SetWriteBuffer(16 * 1024 * 1024)
sess.SetStreamMode(true)
sess.SetNoDelay(1, 10, 2, 1)
sess.SetMtu(1400)
sess.SetACKNoDelay(false)
sess.SetDeadline(time.Now().Add(time.Minute))
return sess, err
}
func dialTinyBufferEcho(port int) (*UDPSession, error) {
//block, _ := NewNoneBlockCrypt(pass)
//block, _ := NewSimpleXORBlockCrypt(pass)
//block, _ := NewTEABlockCrypt(pass[:16])
//block, _ := NewAESBlockCrypt(pass)
block, _ := NewSalsa20BlockCrypt(pass)
sess, err := DialWithOptions(fmt.Sprintf("127.0.0.1:%v", port), block, 10, 3)
if err != nil {
panic(err)
}
return sess, err
}
//////////////////////////
func listenEcho(port int) (net.Listener, error) {
//block, _ := NewNoneBlockCrypt(pass)
//block, _ := NewSimpleXORBlockCrypt(pass)
//block, _ := NewTEABlockCrypt(pass[:16])
//block, _ := NewAESBlockCrypt(pass)
block, _ := NewSalsa20BlockCrypt(pass)
return ListenWithOptions(fmt.Sprintf("127.0.0.1:%v", port), block, 10, 0)
}
func listenTinyBufferEcho(port int) (net.Listener, error) {
//block, _ := NewNoneBlockCrypt(pass)
//block, _ := NewSimpleXORBlockCrypt(pass)
//block, _ := NewTEABlockCrypt(pass[:16])
//block, _ := NewAESBlockCrypt(pass)
block, _ := NewSalsa20BlockCrypt(pass)
return ListenWithOptions(fmt.Sprintf("127.0.0.1:%v", port), block, 10, 3)
}
func listenSink(port int) (net.Listener, error) {
return ListenWithOptions(fmt.Sprintf("127.0.0.1:%v", port), nil, 0, 0)
}
func echoServer(port int) net.Listener {
l, err := listenEcho(port)
if err != nil {
panic(err)
}
go func() {
kcplistener := l.(*Listener)
kcplistener.SetReadBuffer(4 * 1024 * 1024)
kcplistener.SetWriteBuffer(4 * 1024 * 1024)
kcplistener.SetDSCP(46)
for {
s, err := l.Accept()
if err != nil {
return
}
// coverage test
s.(*UDPSession).SetReadBuffer(4 * 1024 * 1024)
s.(*UDPSession).SetWriteBuffer(4 * 1024 * 1024)
go handleEcho(s.(*UDPSession))
}
}()
return l
}
func sinkServer(port int) net.Listener {
l, err := listenSink(port)
if err != nil {
panic(err)
}
go func() {
kcplistener := l.(*Listener)
kcplistener.SetReadBuffer(4 * 1024 * 1024)
kcplistener.SetWriteBuffer(4 * 1024 * 1024)
kcplistener.SetDSCP(46)
for {
s, err := l.Accept()
if err != nil {
return
}
go handleSink(s.(*UDPSession))
}
}()
return l
}
func tinyBufferEchoServer(port int) net.Listener {
l, err := listenTinyBufferEcho(port)
if err != nil {
panic(err)
}
go func() {
for {
s, err := l.Accept()
if err != nil {
return
}
go handleTinyBufferEcho(s.(*UDPSession))
}
}()
return l
}
///////////////////////////
func handleEcho(conn *UDPSession) {
conn.SetStreamMode(true)
conn.SetWindowSize(4096, 4096)
conn.SetNoDelay(1, 10, 2, 1)
conn.SetDSCP(46)
conn.SetMtu(1400)
conn.SetACKNoDelay(false)
conn.SetReadDeadline(time.Now().Add(time.Hour))
conn.SetWriteDeadline(time.Now().Add(time.Hour))
buf := make([]byte, 65536)
for {
n, err := conn.Read(buf)
if err != nil {
return
}
conn.Write(buf[:n])
}
}
func handleSink(conn *UDPSession) {
conn.SetStreamMode(true)
conn.SetWindowSize(4096, 4096)
conn.SetNoDelay(1, 10, 2, 1)
conn.SetDSCP(46)
conn.SetMtu(1400)
conn.SetACKNoDelay(false)
conn.SetReadDeadline(time.Now().Add(time.Hour))
conn.SetWriteDeadline(time.Now().Add(time.Hour))
buf := make([]byte, 65536)
for {
_, err := conn.Read(buf)
if err != nil {
return
}
}
}
func handleTinyBufferEcho(conn *UDPSession) {
conn.SetStreamMode(true)
buf := make([]byte, 2)
for {
n, err := conn.Read(buf)
if err != nil {
return
}
conn.Write(buf[:n])
}
}
///////////////////////////
func TestTimeout(t *testing.T) {
port := int(atomic.AddUint32(&baseport, 1))
l := echoServer(port)
defer l.Close()
cli, err := dialEcho(port)
if err != nil {
panic(err)
}
buf := make([]byte, 10)
//timeout
cli.SetDeadline(time.Now().Add(time.Second))
<-time.After(2 * time.Second)
n, err := cli.Read(buf)
if n != 0 || err == nil {
t.Fail()
}
cli.Close()
}
func TestSendRecv(t *testing.T) {
port := int(atomic.AddUint32(&baseport, 1))
l := echoServer(port)
defer l.Close()
cli, err := dialEcho(port)
if err != nil {
panic(err)
}
cli.SetWriteDelay(true)
cli.SetDUP(1)
const N = 100
buf := make([]byte, 10)
for i := 0; i < N; i++ {
msg := fmt.Sprintf("hello%v", i)
cli.Write([]byte(msg))
if n, err := cli.Read(buf); err == nil {
if string(buf[:n]) != msg {
t.Fail()
}
} else {
panic(err)
}
}
cli.Close()
}
func TestSendVector(t *testing.T) {
port := int(atomic.AddUint32(&baseport, 1))
l := echoServer(port)
defer l.Close()
cli, err := dialEcho(port)
if err != nil {
panic(err)
}
cli.SetWriteDelay(false)
const N = 100
buf := make([]byte, 20)
v := make([][]byte, 2)
for i := 0; i < N; i++ {
v[0] = []byte(fmt.Sprintf("hello%v", i))
v[1] = []byte(fmt.Sprintf("world%v", i))
msg := fmt.Sprintf("hello%vworld%v", i, i)
cli.WriteBuffers(v)
if n, err := cli.Read(buf); err == nil {
if string(buf[:n]) != msg {
t.Error(string(buf[:n]), msg)
}
} else {
panic(err)
}
}
cli.Close()
}
func TestTinyBufferReceiver(t *testing.T) {
port := int(atomic.AddUint32(&baseport, 1))
l := tinyBufferEchoServer(port)
defer l.Close()
cli, err := dialTinyBufferEcho(port)
if err != nil {
panic(err)
}
const N = 100
snd := byte(0)
fillBuffer := func(buf []byte) {
for i := 0; i < len(buf); i++ {
buf[i] = snd
snd++
}
}
rcv := byte(0)
check := func(buf []byte) bool {
for i := 0; i < len(buf); i++ {
if buf[i] != rcv {
return false
}
rcv++
}
return true
}
sndbuf := make([]byte, 7)
rcvbuf := make([]byte, 7)
for i := 0; i < N; i++ {
fillBuffer(sndbuf)
cli.Write(sndbuf)
if n, err := io.ReadFull(cli, rcvbuf); err == nil {
if !check(rcvbuf[:n]) {
t.Fail()
}
} else {
panic(err)
}
}
cli.Close()
}
func TestClose(t *testing.T) {
var n int
var err error
port := int(atomic.AddUint32(&baseport, 1))
l := echoServer(port)
defer l.Close()
cli, err := dialEcho(port)
if err != nil {
panic(err)
}
// double close
cli.Close()
if cli.Close() == nil {
t.Fatal("double close misbehavior")
}
// write after close
buf := make([]byte, 10)
n, err = cli.Write(buf)
if n != 0 || err == nil {
t.Fatal("write after close misbehavior")
}
// write, close, read, read
cli, err = dialEcho(port)
if err != nil {
panic(err)
}
if n, err = cli.Write(buf); err != nil {
t.Fatal("write misbehavior")
}
// wait until data arrival
time.Sleep(2 * time.Second)
// drain
cli.Close()
n, err = io.ReadFull(cli, buf)
if err != nil {
t.Fatal("closed conn drain bytes failed", err, n)
}
// after drain, read should return error
n, err = cli.Read(buf)
if n != 0 || err == nil {
t.Fatal("write->close->drain->read misbehavior", err, n)
}
cli.Close()
}
func TestParallel1024CLIENT_64BMSG_64CNT(t *testing.T) {
port := int(atomic.AddUint32(&baseport, 1))
l := echoServer(port)
defer l.Close()
var wg sync.WaitGroup
wg.Add(1024)
for i := 0; i < 1024; i++ {
go parallel_client(&wg, port)
}
wg.Wait()
}
func parallel_client(wg *sync.WaitGroup, port int) (err error) {
cli, err := dialEcho(port)
if err != nil {
panic(err)
}
err = echo_tester(cli, 64, 64)
cli.Close()
wg.Done()
return
}
func BenchmarkEchoSpeed4K(b *testing.B) {
speedclient(b, 4096)
}
func BenchmarkEchoSpeed64K(b *testing.B) {
speedclient(b, 65536)
}
func BenchmarkEchoSpeed512K(b *testing.B) {
speedclient(b, 524288)
}
func BenchmarkEchoSpeed1M(b *testing.B) {
speedclient(b, 1048576)
}
func speedclient(b *testing.B, nbytes int) {
port := int(atomic.AddUint32(&baseport, 1))
l := echoServer(port)
defer l.Close()
b.ReportAllocs()
cli, err := dialEcho(port)
if err != nil {
panic(err)
}
if err := echo_tester(cli, nbytes, b.N); err != nil {
b.Fail()
}
b.SetBytes(int64(nbytes))
cli.Close()
}
func BenchmarkSinkSpeed4K(b *testing.B) {
sinkclient(b, 4096)
}
func BenchmarkSinkSpeed64K(b *testing.B) {
sinkclient(b, 65536)
}
func BenchmarkSinkSpeed256K(b *testing.B) {
sinkclient(b, 524288)
}
func BenchmarkSinkSpeed1M(b *testing.B) {
sinkclient(b, 1048576)
}
func sinkclient(b *testing.B, nbytes int) {
port := int(atomic.AddUint32(&baseport, 1))
l := sinkServer(port)
defer l.Close()
b.ReportAllocs()
cli, err := dialSink(port)
if err != nil {
panic(err)
}
sink_tester(cli, nbytes, b.N)
b.SetBytes(int64(nbytes))
cli.Close()
}
func echo_tester(cli net.Conn, msglen, msgcount int) error {
buf := make([]byte, msglen)
for i := 0; i < msgcount; i++ {
// send packet
if _, err := cli.Write(buf); err != nil {
return err
}
// receive packet
nrecv := 0
for {
n, err := cli.Read(buf)
if err != nil {
return err
} else {
nrecv += n
if nrecv == msglen {
break
}
}
}
}
return nil
}
func sink_tester(cli *UDPSession, msglen, msgcount int) error {
// sender
buf := make([]byte, msglen)
for i := 0; i < msgcount; i++ {
if _, err := cli.Write(buf); err != nil {
return err
}
}
return nil
}
func TestSNMP(t *testing.T) {
t.Log(DefaultSnmp.Copy())
t.Log(DefaultSnmp.Header())
t.Log(DefaultSnmp.ToSlice())
DefaultSnmp.Reset()
t.Log(DefaultSnmp.ToSlice())
}
func TestListenerClose(t *testing.T) {
port := int(atomic.AddUint32(&baseport, 1))
l, err := ListenWithOptions(fmt.Sprintf("127.0.0.1:%v", port), nil, 10, 3)
if err != nil {
t.Fail()
}
l.SetReadDeadline(time.Now().Add(time.Second))
l.SetWriteDeadline(time.Now().Add(time.Second))
l.SetDeadline(time.Now().Add(time.Second))
time.Sleep(2 * time.Second)
if _, err := l.Accept(); err == nil {
t.Fail()
}
l.Close()
fakeaddr, _ := net.ResolveUDPAddr("udp6", "127.0.0.1:1111")
if l.closeSession(fakeaddr) {
t.Fail()
}
}
// A wrapper for net.PacketConn that remembers when Close has been called.
type closedFlagPacketConn struct {
net.PacketConn
Closed bool
}
func (c *closedFlagPacketConn) Close() error {
c.Closed = true
return c.PacketConn.Close()
}
func newClosedFlagPacketConn(c net.PacketConn) *closedFlagPacketConn {
return &closedFlagPacketConn{c, false}
}
// Listener should close a net.PacketConn that it created.
// https://github.com/xtaci/kcp-go/issues/165
func TestListenerOwnedPacketConn(t *testing.T) {
// ListenWithOptions creates its own net.PacketConn.
l, err := ListenWithOptions("127.0.0.1:0", nil, 0, 0)
if err != nil {
panic(err)
}
defer l.Close()
// Replace the internal net.PacketConn with one that remembers when it
// has been closed.
pconn := newClosedFlagPacketConn(l.conn)
l.conn = pconn
if pconn.Closed {
t.Fatal("owned PacketConn closed before Listener.Close()")
}
err = l.Close()
if err != nil {
panic(err)
}
if !pconn.Closed {
t.Fatal("owned PacketConn not closed after Listener.Close()")
}
}
// Listener should not close a net.PacketConn that it did not create.
// https://github.com/xtaci/kcp-go/issues/165
func TestListenerNonOwnedPacketConn(t *testing.T) {
// Create a net.PacketConn not owned by the Listener.
c, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
panic(err)
}
defer c.Close()
// Make it remember when it has been closed.
pconn := newClosedFlagPacketConn(c)
l, err := ServeConn(nil, 0, 0, pconn)
if err != nil {
panic(err)
}
defer l.Close()
if pconn.Closed {
t.Fatal("non-owned PacketConn closed before Listener.Close()")
}
err = l.Close()
if err != nil {
panic(err)
}
if pconn.Closed {
t.Fatal("non-owned PacketConn closed after Listener.Close()")
}
}
// UDPSession should close a net.PacketConn that it created.
// https://github.com/xtaci/kcp-go/issues/165
func TestUDPSessionOwnedPacketConn(t *testing.T) {
l := sinkServer(0)
defer l.Close()
// DialWithOptions creates its own net.PacketConn.
client, err := DialWithOptions(l.Addr().String(), nil, 0, 0)
if err != nil {
panic(err)
}
defer client.Close()
// Replace the internal net.PacketConn with one that remembers when it
// has been closed.
pconn := newClosedFlagPacketConn(client.conn)
client.conn = pconn
if pconn.Closed {
t.Fatal("owned PacketConn closed before UDPSession.Close()")
}
err = client.Close()
if err != nil {
panic(err)
}
if !pconn.Closed {
t.Fatal("owned PacketConn not closed after UDPSession.Close()")
}
}
// UDPSession should not close a net.PacketConn that it did not create.
// https://github.com/xtaci/kcp-go/issues/165
func TestUDPSessionNonOwnedPacketConn(t *testing.T) {
l := sinkServer(0)
defer l.Close()
// Create a net.PacketConn not owned by the UDPSession.
c, err := net.ListenPacket("udp", "127.0.0.1:0")
if err != nil {
panic(err)
}
defer c.Close()
// Make it remember when it has been closed.
pconn := newClosedFlagPacketConn(c)
client, err := NewConn2(l.Addr(), nil, 0, 0, pconn)
if err != nil {
panic(err)
}
defer client.Close()
if pconn.Closed {
t.Fatal("non-owned PacketConn closed before UDPSession.Close()")
}
err = client.Close()
if err != nil {
panic(err)
}
if pconn.Closed {
t.Fatal("non-owned PacketConn closed after UDPSession.Close()")
}
}