382 lines
9.5 KiB
Go
382 lines
9.5 KiB
Go
package kcp
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"sync/atomic"
|
|
|
|
"github.com/klauspost/reedsolomon"
|
|
)
|
|
|
|
const (
|
|
fecHeaderSize = 6
|
|
fecHeaderSizePlus2 = fecHeaderSize + 2 // plus 2B data size
|
|
typeData = 0xf1
|
|
typeParity = 0xf2
|
|
fecExpire = 60000
|
|
rxFECMulti = 3 // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory
|
|
)
|
|
|
|
// fecPacket is a decoded FEC packet
|
|
type fecPacket []byte
|
|
|
|
func (bts fecPacket) seqid() uint32 { return binary.LittleEndian.Uint32(bts) }
|
|
func (bts fecPacket) flag() uint16 { return binary.LittleEndian.Uint16(bts[4:]) }
|
|
func (bts fecPacket) data() []byte { return bts[6:] }
|
|
|
|
// fecElement has auxcilliary time field
|
|
type fecElement struct {
|
|
fecPacket
|
|
ts uint32
|
|
}
|
|
|
|
// fecDecoder for decoding incoming packets
|
|
type fecDecoder struct {
|
|
rxlimit int // queue size limit
|
|
dataShards int
|
|
parityShards int
|
|
shardSize int
|
|
rx []fecElement // ordered receive queue
|
|
|
|
// caches
|
|
decodeCache [][]byte
|
|
flagCache []bool
|
|
|
|
// zeros
|
|
zeros []byte
|
|
|
|
// RS decoder
|
|
codec reedsolomon.Encoder
|
|
|
|
// auto tune fec parameter
|
|
autoTune autoTune
|
|
}
|
|
|
|
func newFECDecoder(dataShards, parityShards int) *fecDecoder {
|
|
if dataShards <= 0 || parityShards <= 0 {
|
|
return nil
|
|
}
|
|
|
|
dec := new(fecDecoder)
|
|
dec.dataShards = dataShards
|
|
dec.parityShards = parityShards
|
|
dec.shardSize = dataShards + parityShards
|
|
dec.rxlimit = rxFECMulti * dec.shardSize
|
|
codec, err := reedsolomon.New(dataShards, parityShards)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
dec.codec = codec
|
|
dec.decodeCache = make([][]byte, dec.shardSize)
|
|
dec.flagCache = make([]bool, dec.shardSize)
|
|
dec.zeros = make([]byte, mtuLimit)
|
|
return dec
|
|
}
|
|
|
|
// decode a fec packet
|
|
func (dec *fecDecoder) decode(in fecPacket) (recovered [][]byte) {
|
|
// sample to auto FEC tuner
|
|
if in.flag() == typeData {
|
|
dec.autoTune.Sample(true, in.seqid())
|
|
} else {
|
|
dec.autoTune.Sample(false, in.seqid())
|
|
}
|
|
|
|
// check if FEC parameters is out of sync
|
|
var shouldTune bool
|
|
if int(in.seqid())%dec.shardSize < dec.dataShards {
|
|
if in.flag() != typeData { // expect typeData
|
|
shouldTune = true
|
|
}
|
|
} else {
|
|
if in.flag() != typeParity {
|
|
shouldTune = true
|
|
}
|
|
}
|
|
|
|
if shouldTune {
|
|
autoDS := dec.autoTune.FindPeriod(true)
|
|
autoPS := dec.autoTune.FindPeriod(false)
|
|
|
|
// edges found, we can tune parameters now
|
|
if autoDS > 0 && autoPS > 0 && autoDS < 256 && autoPS < 256 {
|
|
// and make sure it's different
|
|
if autoDS != dec.dataShards || autoPS != dec.parityShards {
|
|
dec.dataShards = autoDS
|
|
dec.parityShards = autoPS
|
|
dec.shardSize = autoDS + autoPS
|
|
dec.rxlimit = rxFECMulti * dec.shardSize
|
|
codec, err := reedsolomon.New(autoDS, autoPS)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
dec.codec = codec
|
|
dec.decodeCache = make([][]byte, dec.shardSize)
|
|
dec.flagCache = make([]bool, dec.shardSize)
|
|
//log.Println("autotune to :", dec.dataShards, dec.parityShards)
|
|
}
|
|
}
|
|
}
|
|
|
|
// insertion
|
|
n := len(dec.rx) - 1
|
|
insertIdx := 0
|
|
for i := n; i >= 0; i-- {
|
|
if in.seqid() == dec.rx[i].seqid() { // de-duplicate
|
|
return nil
|
|
} else if _itimediff(in.seqid(), dec.rx[i].seqid()) > 0 { // insertion
|
|
insertIdx = i + 1
|
|
break
|
|
}
|
|
}
|
|
|
|
// make a copy
|
|
pkt := fecPacket(xmitBuf.Get().([]byte)[:len(in)])
|
|
copy(pkt, in)
|
|
elem := fecElement{pkt, currentMs()}
|
|
|
|
// insert into ordered rx queue
|
|
if insertIdx == n+1 {
|
|
dec.rx = append(dec.rx, elem)
|
|
} else {
|
|
dec.rx = append(dec.rx, fecElement{})
|
|
copy(dec.rx[insertIdx+1:], dec.rx[insertIdx:]) // shift right
|
|
dec.rx[insertIdx] = elem
|
|
}
|
|
|
|
// shard range for current packet
|
|
shardBegin := pkt.seqid() - pkt.seqid()%uint32(dec.shardSize)
|
|
shardEnd := shardBegin + uint32(dec.shardSize) - 1
|
|
|
|
// max search range in ordered queue for current shard
|
|
searchBegin := insertIdx - int(pkt.seqid()%uint32(dec.shardSize))
|
|
if searchBegin < 0 {
|
|
searchBegin = 0
|
|
}
|
|
searchEnd := searchBegin + dec.shardSize - 1
|
|
if searchEnd >= len(dec.rx) {
|
|
searchEnd = len(dec.rx) - 1
|
|
}
|
|
|
|
// re-construct datashards
|
|
if searchEnd-searchBegin+1 >= dec.dataShards {
|
|
var numshard, numDataShard, first, maxlen int
|
|
|
|
// zero caches
|
|
shards := dec.decodeCache
|
|
shardsflag := dec.flagCache
|
|
for k := range dec.decodeCache {
|
|
shards[k] = nil
|
|
shardsflag[k] = false
|
|
}
|
|
|
|
// shard assembly
|
|
for i := searchBegin; i <= searchEnd; i++ {
|
|
seqid := dec.rx[i].seqid()
|
|
if _itimediff(seqid, shardEnd) > 0 {
|
|
break
|
|
} else if _itimediff(seqid, shardBegin) >= 0 {
|
|
shards[seqid%uint32(dec.shardSize)] = dec.rx[i].data()
|
|
shardsflag[seqid%uint32(dec.shardSize)] = true
|
|
numshard++
|
|
if dec.rx[i].flag() == typeData {
|
|
numDataShard++
|
|
}
|
|
if numshard == 1 {
|
|
first = i
|
|
}
|
|
if len(dec.rx[i].data()) > maxlen {
|
|
maxlen = len(dec.rx[i].data())
|
|
}
|
|
}
|
|
}
|
|
|
|
if numDataShard == dec.dataShards {
|
|
// case 1: no loss on data shards
|
|
dec.rx = dec.freeRange(first, numshard, dec.rx)
|
|
} else if numshard >= dec.dataShards {
|
|
// case 2: loss on data shards, but it's recoverable from parity shards
|
|
for k := range shards {
|
|
if shards[k] != nil {
|
|
dlen := len(shards[k])
|
|
shards[k] = shards[k][:maxlen]
|
|
copy(shards[k][dlen:], dec.zeros)
|
|
} else if k < dec.dataShards {
|
|
shards[k] = xmitBuf.Get().([]byte)[:0]
|
|
}
|
|
}
|
|
if err := dec.codec.ReconstructData(shards); err == nil {
|
|
for k := range shards[:dec.dataShards] {
|
|
if !shardsflag[k] {
|
|
// recovered data should be recycled
|
|
recovered = append(recovered, shards[k])
|
|
}
|
|
}
|
|
}
|
|
dec.rx = dec.freeRange(first, numshard, dec.rx)
|
|
}
|
|
}
|
|
|
|
// keep rxlimit
|
|
if len(dec.rx) > dec.rxlimit {
|
|
if dec.rx[0].flag() == typeData { // track the unrecoverable data
|
|
atomic.AddUint64(&DefaultSnmp.FECShortShards, 1)
|
|
}
|
|
dec.rx = dec.freeRange(0, 1, dec.rx)
|
|
}
|
|
|
|
// timeout policy
|
|
current := currentMs()
|
|
numExpired := 0
|
|
for k := range dec.rx {
|
|
if _itimediff(current, dec.rx[k].ts) > fecExpire {
|
|
numExpired++
|
|
continue
|
|
}
|
|
break
|
|
}
|
|
if numExpired > 0 {
|
|
dec.rx = dec.freeRange(0, numExpired, dec.rx)
|
|
}
|
|
return
|
|
}
|
|
|
|
// free a range of fecPacket
|
|
func (dec *fecDecoder) freeRange(first, n int, q []fecElement) []fecElement {
|
|
for i := first; i < first+n; i++ { // recycle buffer
|
|
xmitBuf.Put([]byte(q[i].fecPacket))
|
|
}
|
|
|
|
if first == 0 && n < cap(q)/2 {
|
|
return q[n:]
|
|
}
|
|
copy(q[first:], q[first+n:])
|
|
return q[:len(q)-n]
|
|
}
|
|
|
|
// release all segments back to xmitBuf
|
|
func (dec *fecDecoder) release() {
|
|
if n := len(dec.rx); n > 0 {
|
|
dec.rx = dec.freeRange(0, n, dec.rx)
|
|
}
|
|
}
|
|
|
|
type (
|
|
// fecEncoder for encoding outgoing packets
|
|
fecEncoder struct {
|
|
dataShards int
|
|
parityShards int
|
|
shardSize int
|
|
paws uint32 // Protect Against Wrapped Sequence numbers
|
|
next uint32 // next seqid
|
|
|
|
shardCount int // count the number of datashards collected
|
|
maxSize int // track maximum data length in datashard
|
|
|
|
headerOffset int // FEC header offset
|
|
payloadOffset int // FEC payload offset
|
|
|
|
// caches
|
|
shardCache [][]byte
|
|
encodeCache [][]byte
|
|
|
|
// zeros
|
|
zeros []byte
|
|
|
|
// RS encoder
|
|
codec reedsolomon.Encoder
|
|
}
|
|
)
|
|
|
|
func newFECEncoder(dataShards, parityShards, offset int) *fecEncoder {
|
|
if dataShards <= 0 || parityShards <= 0 {
|
|
return nil
|
|
}
|
|
enc := new(fecEncoder)
|
|
enc.dataShards = dataShards
|
|
enc.parityShards = parityShards
|
|
enc.shardSize = dataShards + parityShards
|
|
enc.paws = 0xffffffff / uint32(enc.shardSize) * uint32(enc.shardSize)
|
|
enc.headerOffset = offset
|
|
enc.payloadOffset = enc.headerOffset + fecHeaderSize
|
|
|
|
codec, err := reedsolomon.New(dataShards, parityShards)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
enc.codec = codec
|
|
|
|
// caches
|
|
enc.encodeCache = make([][]byte, enc.shardSize)
|
|
enc.shardCache = make([][]byte, enc.shardSize)
|
|
for k := range enc.shardCache {
|
|
enc.shardCache[k] = make([]byte, mtuLimit)
|
|
}
|
|
enc.zeros = make([]byte, mtuLimit)
|
|
return enc
|
|
}
|
|
|
|
// encodes the packet, outputs parity shards if we have collected quorum datashards
|
|
// notice: the contents of 'ps' will be re-written in successive calling
|
|
func (enc *fecEncoder) encode(b []byte) (ps [][]byte) {
|
|
// The header format:
|
|
// | FEC SEQID(4B) | FEC TYPE(2B) | SIZE (2B) | PAYLOAD(SIZE-2) |
|
|
// |<-headerOffset |<-payloadOffset
|
|
enc.markData(b[enc.headerOffset:])
|
|
binary.LittleEndian.PutUint16(b[enc.payloadOffset:], uint16(len(b[enc.payloadOffset:])))
|
|
|
|
// copy data from payloadOffset to fec shard cache
|
|
sz := len(b)
|
|
enc.shardCache[enc.shardCount] = enc.shardCache[enc.shardCount][:sz]
|
|
copy(enc.shardCache[enc.shardCount][enc.payloadOffset:], b[enc.payloadOffset:])
|
|
enc.shardCount++
|
|
|
|
// track max datashard length
|
|
if sz > enc.maxSize {
|
|
enc.maxSize = sz
|
|
}
|
|
|
|
// Generation of Reed-Solomon Erasure Code
|
|
if enc.shardCount == enc.dataShards {
|
|
// fill '0' into the tail of each datashard
|
|
for i := 0; i < enc.dataShards; i++ {
|
|
shard := enc.shardCache[i]
|
|
slen := len(shard)
|
|
copy(shard[slen:enc.maxSize], enc.zeros)
|
|
}
|
|
|
|
// construct equal-sized slice with stripped header
|
|
cache := enc.encodeCache
|
|
for k := range cache {
|
|
cache[k] = enc.shardCache[k][enc.payloadOffset:enc.maxSize]
|
|
}
|
|
|
|
// encoding
|
|
if err := enc.codec.Encode(cache); err == nil {
|
|
ps = enc.shardCache[enc.dataShards:]
|
|
for k := range ps {
|
|
enc.markParity(ps[k][enc.headerOffset:])
|
|
ps[k] = ps[k][:enc.maxSize]
|
|
}
|
|
}
|
|
|
|
// counters resetting
|
|
enc.shardCount = 0
|
|
enc.maxSize = 0
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (enc *fecEncoder) markData(data []byte) {
|
|
binary.LittleEndian.PutUint32(data, enc.next)
|
|
binary.LittleEndian.PutUint16(data[4:], typeData)
|
|
enc.next++
|
|
}
|
|
|
|
func (enc *fecEncoder) markParity(data []byte) {
|
|
binary.LittleEndian.PutUint32(data, enc.next)
|
|
binary.LittleEndian.PutUint16(data[4:], typeParity)
|
|
// sequence wrap will only happen at parity shard
|
|
enc.next = (enc.next + 1) % enc.paws
|
|
}
|