diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 0bebcb9..d14b5f6 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -28,9 +28,10 @@ import "bytes" import "runtime/debug" import "strings" -//import "runtime" -//import "bufio" +import "runtime" +import "context" import "golang.org/x/crypto/sha3" +import "golang.org/x/sync/semaphore" import "github.com/go-logr/logr" import "sync/atomic" @@ -588,13 +589,19 @@ func (chain *Blockchain) Add_Complete_Block(cbl *block.Complete_Block) (err erro wg.Add(len(cbl.Txs)) // add total number of tx as work hf_version := chain.Get_Current_Version_at_Height(chain.Calculate_Height_At_Tips(bl.Tips)) + + sem := semaphore.NewWeighted(int64(runtime.NumCPU())) + for i := 0; i < len(cbl.Txs); i++ { + sem.Acquire(context.Background(), 1) go func(j int) { + defer sem.Release(1) + defer wg.Done() if err := chain.Verify_Transaction_NonCoinbase_CheckNonce_Tips(hf_version, cbl.Txs[j], bl.Tips); err != nil { // transaction verification failed atomic.AddInt32(&fail_count, 1) // increase fail count by 1 block_logger.Error(err, "tx nonce verification failed", "txid", cbl.Txs[j].GetHash()) } - wg.Done() + }(i) } @@ -611,13 +618,18 @@ func (chain *Blockchain) Add_Complete_Block(cbl *block.Complete_Block) (err erro wg := sync.WaitGroup{} wg.Add(len(cbl.Txs)) // add total number of tx as work + sem := semaphore.NewWeighted(int64(runtime.NumCPU())) + for i := 0; i < len(cbl.Txs); i++ { + sem.Acquire(context.Background(), 1) + go func(j int) { + defer sem.Release(1) + defer wg.Done() if err := chain.Verify_Transaction_NonCoinbase(cbl.Txs[j]); err != nil { // transaction verification failed atomic.AddInt32(&fail_count, 1) // increase fail count by 1 block_logger.Error(err, "tx verification failed", "txid", cbl.Txs[j].GetHash()) } - wg.Done() }(i) } @@ -1054,16 +1066,6 @@ func (chain *Blockchain) Get_Difficulty() uint64 { return chain.Get_Difficulty_At_Tips(chain.Get_TIPS()).Uint64() } -/* -func (chain *Blockchain) Get_Cumulative_Difficulty() uint64 { - - return 0 //chain.Load_Block_Cumulative_Difficulty(chain.Top_ID) -} - -func (chain *Blockchain) Get_Median_Block_Size() uint64 { // get current cached median size - return chain.Median_Block_Size -} -*/ func (chain *Blockchain) Get_Network_HashRate() uint64 { return chain.Get_Difficulty() } @@ -1162,7 +1164,7 @@ func (chain *Blockchain) Add_TX_To_Pool(tx *transaction.Transaction) error { } if err := chain.Verify_Transaction_NonCoinbase_CheckNonce_Tips(hf_version, tx, chain.Get_TIPS()); err != nil { // transaction verification failed - logger.V(1).Error(err, "Incoming TX nonce verification failed", "txid", txhash) + logger.V(1).Error(err, "Incoming TX nonce verification failed", "txid", txhash, "stacktrace", globals.StackTrace(false)) return fmt.Errorf("Incoming TX %s nonce verification failed, err %s", txhash, err) } @@ -1263,104 +1265,6 @@ func (chain *Blockchain) IS_TX_Valid(txhash crypto.Hash) (valid_blid crypto.Hash return } -/* - - -// runs the client protocol which includes the following operations -// if any TX are being duplicate or double-spend ignore them -// mark all the valid transactions as valid -// mark all invalid transactions as invalid -// calculate total fees based on valid TX -// we need NOT check ranges/ring signatures here, as they have been done already by earlier steps -func (chain *Blockchain) client_protocol(dbtx storage.DBTX, bl *block.Block, blid crypto.Hash, height int64, topoheight int64) (total_fees uint64) { - // run client protocol for all TXs - for i := range bl.Tx_hashes { - tx, err := chain.Load_TX_FROM_ID(dbtx, bl.Tx_hashes[i]) - if err != nil { - panic(fmt.Errorf("Cannot load tx for %x err %s ", bl.Tx_hashes[i], err)) - } - // mark TX found in this block also for explorer - chain.store_TX_in_Block(dbtx, blid, bl.Tx_hashes[i]) - - // check all key images as double spend, if double-spend detected mark invalid, else consider valid - if chain.Verify_Transaction_NonCoinbase_DoubleSpend_Check(dbtx, tx) { - - chain.consume_keyimages(dbtx, tx, height) // mark key images as consumed - total_fees += tx.RctSignature.Get_TX_Fee() - - chain.Store_TX_Height(dbtx, bl.Tx_hashes[i], topoheight) // link the tx with the topo height - - //mark tx found in this block is valid - chain.mark_TX(dbtx, blid, bl.Tx_hashes[i], true) - - } else { // TX is double spend or reincluded by 2 blocks simultaneously - rlog.Tracef(1,"Double spend TX is being ignored %s %s", blid, bl.Tx_hashes[i]) - chain.mark_TX(dbtx, blid, bl.Tx_hashes[i], false) - } - } - - return total_fees -} - -// this undoes everything that is done by client protocol -// NOTE: this will have any effect, only if client protocol has been run on this block earlier -func (chain *Blockchain) client_protocol_reverse(dbtx storage.DBTX, bl *block.Block, blid crypto.Hash) { - // run client protocol for all TXs - for i := range bl.Tx_hashes { - tx, err := chain.Load_TX_FROM_ID(dbtx, bl.Tx_hashes[i]) - if err != nil { - panic(fmt.Errorf("Cannot load tx for %x err %s ", bl.Tx_hashes[i], err)) - } - // only the valid TX must be revoked - if chain.IS_TX_Valid(dbtx, blid, bl.Tx_hashes[i]) { - chain.revoke_keyimages(dbtx, tx) // mark key images as not used - - chain.Store_TX_Height(dbtx, bl.Tx_hashes[i], -1) // unlink the tx with the topo height - - //mark tx found in this block is invalid - chain.mark_TX(dbtx, blid, bl.Tx_hashes[i], false) - - } else { // TX is double spend or reincluded by 2 blocks simultaneously - // invalid tx is related - } - } - - return -} - -// scavanger for transactions from rusty/stale tips to reinsert them into pool -func (chain *Blockchain) transaction_scavenger(dbtx storage.DBTX, blid crypto.Hash) { - defer func() { - if r := recover(); r != nil { - logger.Warnf("Recovered while transaction scavenging, Stack trace below ") - logger.Warnf("Stack trace \n%s", debug.Stack()) - } - }() - - logger.Debugf("scavenging transactions from blid %s", blid) - reachable_blocks := chain.BuildReachableBlocks(dbtx, []crypto.Hash{blid}) - reachable_blocks[blid] = true // add self - for k, _ := range reachable_blocks { - if chain.Is_Block_Orphan(k) { - bl, err := chain.Load_BL_FROM_ID(dbtx, k) - if err == nil { - for i := range bl.Tx_hashes { - tx, err := chain.Load_TX_FROM_ID(dbtx, bl.Tx_hashes[i]) - if err != nil { - rlog.Warnf("err while scavenging blid %s txid %s err %s", k, bl.Tx_hashes[i], err) - } else { - // add tx to pool, it will do whatever is necessarry - chain.Add_TX_To_Pool(tx) - } - } - } else { - rlog.Warnf("err while scavenging blid %s err %s", k, err) - } - } - } -} - -*/ // Finds whether a block is orphan // since we donot store any fields, we need to calculate/find the block as orphan // using an algorithm diff --git a/blockchain/miner_block.go b/blockchain/miner_block.go index a1a109b..7866580 100644 --- a/blockchain/miner_block.go +++ b/blockchain/miner_block.go @@ -20,7 +20,6 @@ import "fmt" import "bytes" import "sort" import "sync" -import "math/rand" import "runtime/debug" import "encoding/binary" @@ -181,56 +180,38 @@ func (chain *Blockchain) Create_new_miner_block(miner_address rpc.Address) (cbl var tx_hash_list_included []crypto.Hash // these tx will be included ( due to block size limit ) sizeoftxs := uint64(0) // size of all non coinbase tx included within this block - //fees_collected := uint64(0) - - _ = sizeoftxs // add upto 100 registration tx each registration tx is 99 bytes, so 100 tx will take 9900 bytes or 10KB { tx_hash_list_sorted := chain.Regpool.Regpool_List_TX() // hash of all tx expected to be included within this block , sorted by fees - for i := range tx_hash_list_sorted { - - tx := chain.Regpool.Regpool_Get_TX(tx_hash_list_sorted[i]) - if tx != nil { - _, err = balance_tree.Get(tx.MinerAddress[:]) - if err != nil { + if tx := chain.Regpool.Regpool_Get_TX(tx_hash_list_sorted[i]); tx != nil { + if _, err = balance_tree.Get(tx.MinerAddress[:]); err != nil { if xerrors.Is(err, graviton.ErrNotFound) { // address needs registration - cbl.Txs = append(cbl.Txs, tx) tx_hash_list_included = append(tx_hash_list_included, tx_hash_list_sorted[i]) - } else { - return } } } - } } hf_version := chain.Get_Current_Version_at_Height(height) //rlog.Infof("Total tx in pool %d", len(tx_hash_list_sorted)) - //reachable_key_images := chain.BuildReachabilityKeyImages(dbtx, &bl) // this requires only bl.Tips - - // select 10% tx based on fees - // select 90% tx randomly - // random selection helps us to easily reach 80 TPS + // select tx based on fees // first of lets find the tx fees collected by consuming txs from mempool tx_hash_list_sorted := chain.Mempool.Mempool_List_TX_SortedInfo() // hash of all tx expected to be included within this block , sorted by fees logger.V(8).Info("mempool returned tx list", "tx_list", tx_hash_list_sorted) var pre_check cbl_verify // used to verify sanity of new block - i := 0 - for ; i < len(tx_hash_list_sorted); i++ { - - if (sizeoftxs + tx_hash_list_sorted[i].Size) > (10*config.STARGATE_HE_MAX_BLOCK_SIZE)/100 { // limit block to max possible + for i := range tx_hash_list_sorted { + if (sizeoftxs + tx_hash_list_sorted[i].Size) > (99*config.STARGATE_HE_MAX_BLOCK_SIZE)/100 { // limit block to max possible break } - tx := chain.Mempool.Mempool_Get_TX(tx_hash_list_sorted[i].Hash) - if tx != nil { + if tx := chain.Mempool.Mempool_Get_TX(tx_hash_list_sorted[i].Hash); tx != nil { if int64(tx.Height) < height { if history[tx.BLID] != true { logger.V(8).Info("not selecting tx since the reference with which it was made is not in history", "txid", tx_hash_list_sorted[i].Hash) @@ -240,11 +221,10 @@ func (chain *Blockchain) Create_new_miner_block(miner_address rpc.Address) (cbl if nil == chain.Verify_Transaction_NonCoinbase_CheckNonce_Tips(hf_version, tx, bl.Tips) { if nil == pre_check.check(tx, false) { pre_check.check(tx, true) - //rlog.Tracef(1, "Adding Top Sorted tx %s to Complete_Block current size %.2f KB max possible %.2f KB\n", tx_hash_list_sorted[i].Hash, float32(sizeoftxs+tx_hash_list_sorted[i].Size)/1024.0, float32(config.STARGATE_HE_MAX_BLOCK_SIZE)/1024.0) sizeoftxs += tx_hash_list_sorted[i].Size cbl.Txs = append(cbl.Txs, tx) tx_hash_list_included = append(tx_hash_list_included, tx_hash_list_sorted[i].Hash) - logger.V(8).Info("tx selecting for mining ", "txlist", tx_hash_list_sorted[i].Hash) + logger.V(8).Info("tx selected for mining ", "txlist", tx_hash_list_sorted[i].Hash) } else { logger.V(8).Info("not selecting tx due to pre_check failure", "txid", tx_hash_list_sorted[i].Hash) } @@ -261,48 +241,6 @@ func (chain *Blockchain) Create_new_miner_block(miner_address rpc.Address) (cbl logger.V(8).Info("not selecting tx since tx is nil", "txid", tx_hash_list_sorted[i].Hash) } } - // any left over transactions, should be randomly selected - tx_hash_list_sorted = tx_hash_list_sorted[i:] - - // do random shuffling, can we get away with len/2 random shuffling - rand.Shuffle(len(tx_hash_list_sorted), func(i, j int) { - tx_hash_list_sorted[i], tx_hash_list_sorted[j] = tx_hash_list_sorted[j], tx_hash_list_sorted[i] - }) - - // if we were crossing limit, transactions would be randomly selected - // otherwise they will sorted by fees - - // now select as much as possible - for i := range tx_hash_list_sorted { - if (sizeoftxs + tx_hash_list_sorted[i].Size) > (config.STARGATE_HE_MAX_BLOCK_SIZE) { // limit block to max possible - break - } - - tx := chain.Mempool.Mempool_Get_TX(tx_hash_list_sorted[i].Hash) - if tx != nil { - if int64(tx.Height) < height { - if height-int64(tx.Height) < TX_VALIDITY_HEIGHT { - if history[tx.BLID] != true { - logger.V(8).Info("not selecting tx since the reference with which it was made is not in history", "txid", tx_hash_list_sorted[i].Hash) - continue - } - if nil == chain.Verify_Transaction_NonCoinbase_CheckNonce_Tips(hf_version, tx, bl.Tips) { - - if nil == pre_check.check(tx, false) { - pre_check.check(tx, true) - - //rlog.Tracef(1, "Adding Random tx %s to Complete_Block current size %.2f KB max possible %.2f KB\n", tx_hash_list_sorted[i].Hash, float32(sizeoftxs+tx_hash_list_sorted[i].Size)/1024.0, float32(config.STARGATE_HE_MAX_BLOCK_SIZE)/1024.0) - sizeoftxs += tx_hash_list_sorted[i].Size - cbl.Txs = append(cbl.Txs, tx) - tx_hash_list_included = append(tx_hash_list_included, tx_hash_list_sorted[i].Hash) - } - } - } - } - } - } - - // collect tx list + their fees // now we have all major parts of block, assemble the block bl.Major_Version = uint64(chain.Get_Current_Version_at_Height(height)) @@ -320,19 +258,13 @@ func (chain *Blockchain) Create_new_miner_block(miner_address rpc.Address) (cbl } // check whether the miner address is registered - - _, err = balance_tree.Get(bl.Miner_TX.MinerAddress[:]) - if err != nil { + if _, err = balance_tree.Get(bl.Miner_TX.MinerAddress[:]); err != nil { if xerrors.Is(err, graviton.ErrNotFound) { // address needs registration err = fmt.Errorf("miner address is not registered") - return - } else { - return } + return } - //bl.Prev_Hash = top_hash - for i := range tx_hash_list_included { bl.Tx_hashes = append(bl.Tx_hashes, tx_hash_list_included[i]) } @@ -615,26 +547,24 @@ func (chain *Blockchain) Accept_new_block(tstamp uint64, miniblock_blob []byte) for i := range bl.Tx_hashes { var tx *transaction.Transaction + var tx_bytes []byte if tx = chain.Mempool.Mempool_Get_TX(bl.Tx_hashes[i]); tx != nil { cbl.Txs = append(cbl.Txs, tx) continue } else if tx = chain.Regpool.Regpool_Get_TX(bl.Tx_hashes[i]); tx != nil { cbl.Txs = append(cbl.Txs, tx) continue - } - - var tx_bytes []byte - if tx_bytes, err = chain.Store.Block_tx_store.ReadTX(bl.Tx_hashes[i]); err != nil { - return - } - - tx = &transaction.Transaction{} - if err = tx.Deserialize(tx_bytes); err != nil { + } else if tx_bytes, err = chain.Store.Block_tx_store.ReadTX(bl.Tx_hashes[i]); err == nil { + tx = &transaction.Transaction{} + if err = tx.Deserialize(tx_bytes); err != nil { + logger.V(1).Error(err, "Tx could not be loaded from disk", "txid", bl.Tx_hashes[i].String()) + return + } + cbl.Txs = append(cbl.Txs, tx) + } else { logger.V(1).Error(err, "Tx not found in pool or DB, rejecting submitted block", "txid", bl.Tx_hashes[i].String()) return } - cbl.Txs = append(cbl.Txs, tx) - } cbl.Bl = &bl // the block is now complete, lets try to add it to chain diff --git a/config/version.go b/config/version.go index 78b173e..4d4b877 100644 --- a/config/version.go +++ b/config/version.go @@ -20,4 +20,4 @@ import "github.com/blang/semver/v4" // right now it has to be manually changed // do we need to include git commitsha?? -var Version = semver.MustParse("3.4.65-1.DEROHE.STARGATE+13112021") +var Version = semver.MustParse("3.4.67-1.DEROHE.STARGATE+14112021") diff --git a/dvm/dvm.go b/dvm/dvm.go index 5a9a84f..2e7b65e 100644 --- a/dvm/dvm.go +++ b/dvm/dvm.go @@ -590,7 +590,7 @@ func (dvm *DVM_Interpreter) interpret_PRINT(args []string) (newIP uint64, err er } } - _, err = fmt.Printf(strings.Trim(args[0], "\"")+"\n", params...) + //_, err = fmt.Printf(strings.Trim(args[0], "\"")+"\n", params...) } return } diff --git a/globals/globals.go b/globals/globals.go index e3d2297..306a979 100644 --- a/globals/globals.go +++ b/globals/globals.go @@ -369,5 +369,5 @@ func ParseAmount(str string) (amount uint64, err error) { // gets a stack trace of all func StackTrace(all bool) string { - return "" + return string(debug.Stack()) } diff --git a/p2p/bans.go b/p2p/bans.go index d3110a9..0065592 100644 --- a/p2p/bans.go +++ b/p2p/bans.go @@ -106,11 +106,13 @@ func save_ban_list() { // clean up ban list every 20 seconds func ban_clean_up_goroutine() { + + delay := time.NewTicker(20 * time.Second) for { select { case <-Exit_Event: return - case <-time.After(20 * time.Second): + case <-delay.C: } ban_clean_up() } diff --git a/p2p/chain_sync.go b/p2p/chain_sync.go index 3b5f9a4..e34d9d4 100644 --- a/p2p/chain_sync.go +++ b/p2p/chain_sync.go @@ -168,7 +168,11 @@ func (connection *Connection) process_object_response(response Objects, sent int if err != nil { // we have a block which could not be deserialized ban peer connection.logger.V(2).Error(err, "Incoming block could not be deserilised") connection.exit() - return nil + if syncing { + return nil + } else { + return err + } } // give the chain some more time to respond @@ -188,7 +192,11 @@ func (connection *Connection) process_object_response(response Objects, sent int connection.logger.V(2).Error(err, "Incoming TX could not be deserilised") connection.exit() - return nil + if syncing { + return nil + } else { + return err + } } cbl.Txs = append(cbl.Txs, &tx) } @@ -198,12 +206,20 @@ func (connection *Connection) process_object_response(response Objects, sent int if !ok && err == errormsg.ErrInvalidPoW { connection.logger.V(2).Error(err, "This peer should be banned") connection.exit() - return nil + if syncing { + return nil + } else { + return err + } } if !ok && err == errormsg.ErrPastMissing { connection.logger.V(2).Error(err, "Incoming Block could not be added due to missing past, so skipping future block") - return nil + if syncing { + return nil + } else { + return err + } } if !ok { diff --git a/p2p/chunk_server.go b/p2p/chunk_server.go index 24f7c07..05cb39e 100644 --- a/p2p/chunk_server.go +++ b/p2p/chunk_server.go @@ -11,6 +11,7 @@ import "github.com/fxamacker/cbor/v2" import "github.com/klauspost/reedsolomon" import "github.com/deroproject/derohe/block" +import "github.com/deroproject/derohe/globals" import "github.com/deroproject/derohe/errormsg" import "github.com/deroproject/derohe/config" import "github.com/deroproject/derohe/metrics" @@ -24,23 +25,24 @@ var single_construction sync.Mutex // used to single threaded processing while r const MAX_CHUNKS uint8 = 255 type Chunks_Per_Block_Data struct { - Chunks [MAX_CHUNKS]*Block_Chunk // nil means we donot have the chunk - Created time.Time // when was this structure allocated - Processed bool // whether processing has completed successfully - Complete bool // whether everything is complete + ChunkCollection [MAX_CHUNKS]*Block_Chunk // nil means we donot have the chunk + Created time.Time // when was this structure allocated + Processed bool // whether processing has completed successfully + Complete bool // whether everything is complete + Sent int64 // at what time, the original send sent it + bl *block.Block // this is used internally sync.Mutex } // cleans up chunks every minute func chunks_clean_up() { - for { time.Sleep(5 * time.Second) // cleanup every 5 seconds chunk_map.Range(func(key, value interface{}) bool { - chunk := value.(*Chunks_Per_Block_Data) - if time.Now().Sub(chunk.Created) > time.Second*180 { + chunks_per_block := value.(*Chunks_Per_Block_Data) + if time.Now().Sub(chunks_per_block.Created) > time.Second*180 { chunk_map.Delete(key) } return true @@ -50,16 +52,13 @@ func chunks_clean_up() { // return whether chunk exist func is_chunk_exist(hhash [32]byte, cid uint8) *Block_Chunk { - chunksi, ok := chunk_map.Load(hhash) + chunksi, ok := chunk_map.Load(fmt.Sprintf("%x", hhash)) if !ok { //debug.PrintStack() return nil } - chunks := chunksi.(*Chunks_Per_Block_Data) - chunks.Lock() - defer chunks.Unlock() - - return chunks.Chunks[cid] + chunks_per_block := chunksi.(*Chunks_Per_Block_Data) + return chunks_per_block.ChunkCollection[cid] } // feed a chunk until we are able to fully decode a chunk @@ -102,13 +101,33 @@ func (connection *Connection) feed_chunk(chunk *Block_Chunk, sent int64) error { return nil } - var bl block.Block + chunks_per_block := new(Chunks_Per_Block_Data) + if chunksi, ok := chunk_map.LoadOrStore(fmt.Sprintf("%x", chunk.HHash), chunks_per_block); ok { + chunks_per_block = chunksi.(*Chunks_Per_Block_Data) - var chunks *Chunks_Per_Block_Data - if chunksi, ok := chunk_map.Load(chunk.HHash); ok { - chunks = chunksi.(*Chunks_Per_Block_Data) - } else { + // make sure we are matching what is stored already + var existing *Block_Chunk + for j := range chunks_per_block.ChunkCollection { + if existing = chunks_per_block.ChunkCollection[j]; existing != nil { + break + } + } + if existing != nil { // compare current headers wuth what we have + if chunk.HHash != existing.HHash || len(chunk.CHUNK_HASH) != len(existing.CHUNK_HASH) { + return nil // this collision can never occur + } + for j := range chunk.CHUNK_HASH { + if chunk.CHUNK_HASH[j] != existing.CHUNK_HASH[j] { + return nil // again this is impossible + } + } + } + } + + if chunks_per_block.bl == nil { + + var bl block.Block if err := bl.Deserialize(chunk.BLOCK); err != nil { logger.V(1).Error(err, "error deserializing block") return nil @@ -134,39 +153,33 @@ func (connection *Connection) feed_chunk(chunk *Block_Chunk, sent int64) error { } } - chunks = new(Chunks_Per_Block_Data) - chunks.Created = time.Now() - chunk_map.Store(chunk.HHash, chunks) + chunks_per_block.Created = time.Now() + chunks_per_block.Sent = sent + chunks_per_block.bl = &bl + chunks_per_block.ChunkCollection[chunk.CHUNK_ID] = chunk } - if chunks.Processed { + if chunks_per_block.Processed { return nil } - if chunks.Chunks[chunk.CHUNK_ID] == nil { + if chunks_per_block.ChunkCollection[chunk.CHUNK_ID] == nil { + chunks_per_block.ChunkCollection[chunk.CHUNK_ID] = chunk broadcast_Chunk(chunk, 0, sent) // broadcast chunk INV } - chunks.Lock() - defer chunks.Unlock() - - if chunks.Processed { - return nil - } - - chunks.Chunks[chunk.CHUNK_ID] = chunk - chunk_count := 0 - for _, c := range chunks.Chunks { + for _, c := range chunks_per_block.ChunkCollection { if c != nil { chunk_count++ } } - logger.V(3).Info("Have chunks", "have", chunk_count, "total", chunk.CHUNK_COUNT) + logger.V(3).Info("Have chunks", "have", chunk_count, "total", chunk.CHUNK_COUNT, "tx_count", len(chunks_per_block.bl.Tx_hashes)) var cbl Complete_Block - if len(bl.Tx_hashes) >= 1 { // if txs are present, then we need to join chunks, else we are already done + cbl.Block = chunk.BLOCK + if len(chunks_per_block.bl.Tx_hashes) >= 1 { // if txs are present, then we need to join chunks, else we are already done if uint(chunk_count) < chunk.CHUNK_NEED { // we do not have enough chunks return nil @@ -174,10 +187,10 @@ func (connection *Connection) feed_chunk(chunk *Block_Chunk, sent int64) error { var shards [][]byte for i := 0; i < int(chunk.CHUNK_COUNT); i++ { - if chunks.Chunks[i] == nil { + if chunks_per_block.ChunkCollection[i] == nil { shards = append(shards, nil) } else { - shards = append(shards, chunks.Chunks[i].CHUNK_DATA) + shards = append(shards, chunks_per_block.ChunkCollection[i].CHUNK_DATA) } } @@ -187,7 +200,6 @@ func (connection *Connection) feed_chunk(chunk *Block_Chunk, sent int64) error { logger.V(3).Error(err, "error reconstructing data ") return nil } - chunks.Processed = true // we have successfully reconstructed data,so we give it a try var writer bytes.Buffer @@ -202,15 +214,19 @@ func (connection *Connection) feed_chunk(chunk *Block_Chunk, sent int64) error { } } - // first complete all our chunks, so as we can give to others - logger.V(2).Info("successfully reconstructed using chunks", "have", chunk_count, "total", chunk.CHUNK_COUNT) + chunks_per_block.Processed = true // we have successfully reconstructed data,so we give it a try - metrics.Set.GetOrCreateHistogram("block_propagation_duration_histogram_seconds").UpdateDuration(chunks.Created) - - cbl.Block = chunk.BLOCK object := Objects{CBlocks: []Complete_Block{cbl}} - if err := connection.processChunkedBlock(object, false, true, int(chunk.CHUNK_NEED), int(chunk.CHUNK_COUNT-chunk.CHUNK_NEED)); err != nil { + // first complete all our chunks, so as we can give to others + logger.V(2).Info("successfully reconstructed using chunks", "blid", chunks_per_block.bl.GetHash(), "have", chunk_count, "total", chunk.CHUNK_COUNT, "tx_count", len(cbl.Txs)) + + if chunks_per_block.Sent != 0 && chunks_per_block.Sent < globals.Time().UTC().UnixMicro() { + time_to_receive := float64(globals.Time().UTC().UnixMicro()-chunks_per_block.Sent) / 1000000 + metrics.Set.GetOrCreateHistogram("block_propagation_duration_histogram_seconds").Update(time_to_receive) + } + + if err := connection.processChunkedBlock(object, int(chunk.CHUNK_NEED), int(chunk.CHUNK_COUNT-chunk.CHUNK_NEED)); err != nil { //fmt.Printf("error inserting block received using chunks, err %s", err) } @@ -221,9 +237,9 @@ func (connection *Connection) feed_chunk(chunk *Block_Chunk, sent int64) error { func is_already_chunked_by_us(blid crypto.Hash, data_shard_count, parity_shard_count int) (hash [32]byte, chunk_count int) { chunk_map.Range(func(key, value interface{}) bool { - chunk := value.(*Chunks_Per_Block_Data) - for _, c := range chunk.Chunks { - if c != nil && c.BLID == blid && int(c.CHUNK_NEED) == data_shard_count && int(c.CHUNK_COUNT-c.CHUNK_NEED) == parity_shard_count && chunk.Complete { + chunks_per_block := value.(*Chunks_Per_Block_Data) + for _, c := range chunks_per_block.ChunkCollection { + if c != nil && c.BLID == blid && int(c.CHUNK_NEED) == data_shard_count && int(c.CHUNK_COUNT-c.CHUNK_NEED) == parity_shard_count && chunks_per_block.Complete { hash = c.HeaderHash() chunk_count = data_shard_count + parity_shard_count return false @@ -253,13 +269,13 @@ func convert_block_to_chunks(cbl *block.Complete_Block, data_shard_count, parity panic(err) } - if hhash, count := is_already_chunked_by_us(blid, data_shard_count, parity_shard_count); count > 0 { - return hhash, count - } + //if hhash, count := is_already_chunked_by_us(blid, data_shard_count, parity_shard_count); count > 0 { + // return hhash, count + //} // loop through all the data chunks and overide from there chunk_map.Range(func(key, value interface{}) bool { chunk := value.(*Chunks_Per_Block_Data) - for _, c := range chunk.Chunks { + for _, c := range chunk.ChunkCollection { if c != nil && c.BLID == blid { if data_shard_count != int(c.CHUNK_NEED) || parity_shard_count != int(c.CHUNK_COUNT-c.CHUNK_NEED) { data_shard_count = int(c.CHUNK_NEED) @@ -321,12 +337,12 @@ func convert_block_to_chunks(cbl *block.Complete_Block, data_shard_count, parity } chunks := new(Chunks_Per_Block_Data) for i := 0; i < data_shard_count+parity_shard_count; i++ { - chunks.Chunks[i] = &chunk[i] + chunks.ChunkCollection[i] = &chunk[i] } chunks.Created = time.Now() chunks.Processed = true chunks.Complete = true - chunk_map.Store(chunk[0].HeaderHash(), chunks) - return chunk[0].HeaderHash(), len(shards) + chunk_map.Store(fmt.Sprintf("%x", chunk[0].HHash), chunks) + return chunk[0].HeaderHash(), data_shard_count + parity_shard_count } diff --git a/p2p/common.go b/p2p/common.go index 6ed48cf..ac7b49e 100644 --- a/p2p/common.go +++ b/p2p/common.go @@ -80,7 +80,7 @@ func (connection *Connection) update(common *Common_Struct) { // parse delivered peer list as grey list if len(common.PeerList) > 1 { - connection.logger.V(2).Info("Peer provides peers", "count", len(common.PeerList)) + connection.logger.V(4).Info("Peer provides peers", "count", len(common.PeerList)) for i := range common.PeerList { if i < 13 { Peer_Add(&Peer{Address: common.PeerList[i].Addr, LastConnected: uint64(time.Now().UTC().Unix())}) diff --git a/p2p/connection_pool.go b/p2p/connection_pool.go index d7ab2fa..0010f12 100644 --- a/p2p/connection_pool.go +++ b/p2p/connection_pool.go @@ -30,7 +30,6 @@ import "strings" import "math/rand" import "sync/atomic" import "runtime/debug" -import "encoding/binary" import "github.com/go-logr/logr" @@ -77,17 +76,15 @@ type Connection struct { BytesOut uint64 // total bytes out Latency int64 // time.Duration // latency to this node when sending timed sync - Incoming bool // is connection incoming or outgoing - Addr *net.TCPAddr // endpoint on the other end - Port uint32 // port advertised by other end as its server,if it's 0 server cannot accept connections - Peer_ID uint64 // Remote peer id - SyncNode bool // whether the peer has been added to command line as sync node - Top_Version uint64 // current hard fork version supported by peer - TXpool_cache map[uint64]uint32 // used for ultra blocks in miner mode,cache where we keep TX which have been broadcasted to this peer - TXpool_cache_lock sync.RWMutex - ProtocolVersion string - Tag string // tag for the other end - DaemonVersion string + Incoming bool // is connection incoming or outgoing + Addr *net.TCPAddr // endpoint on the other end + Port uint32 // port advertised by other end as its server,if it's 0 server cannot accept connections + Peer_ID uint64 // Remote peer id + SyncNode bool // whether the peer has been added to command line as sync node + Top_Version uint64 // current hard fork version supported by peer + ProtocolVersion string + Tag string // tag for the other end + DaemonVersion string //Exit chan bool // Exit marker that connection needs to be killed ExitCounter int32 State uint32 // state of the connection @@ -105,6 +102,8 @@ type Connection struct { request_time atomic.Value //time.Time // used to track latency writelock sync.Mutex // used to Serialize writes + previous_mbl []byte // single slot cache + peer_sent_time time.Time // contains last time when peerlist was sent clock_index int @@ -394,98 +393,39 @@ func Peer_Direction_Count() (Incoming uint64, Outgoing uint64) { return } -func Broadcast_Block(cbl *block.Complete_Block, PeerID uint64) { +func broadcast_Block_tester(topo int64) (err error) { - //Broadcast_Block_Ultra(cbl,PeerID) + blid, err := chain.Load_Block_Topological_order_at_index(topo) + if err != nil { + return fmt.Errorf("err occurred topo %d err %s\n", topo, err) + } + var cbl block.Complete_Block + bl, err := chain.Load_BL_FROM_ID(blid) + if err != nil { + return err + } - Broadcast_Block_Coded(cbl, PeerID) + cbl.Bl = bl + for j := range bl.Tx_hashes { + var tx_bytes []byte + if tx_bytes, err = chain.Store.Block_tx_store.ReadTX(bl.Tx_hashes[j]); err != nil { + return err + } + var tx transaction.Transaction + if err = tx.Deserialize(tx_bytes); err != nil { + return err + } + cbl.Txs = append(cbl.Txs, &tx) // append all the txs + + } + + Broadcast_Block(&cbl, 0) + return nil } -// broad cast a block to all connected peers -// we can only broadcast a block which is in our db -// this function is trigger from 2 points, one when we receive a unknown block which can be successfully added to chain -// second from the blockchain which has to relay locally mined blocks as soon as possible -func Broadcast_Block_Ultra(cbl *block.Complete_Block, PeerID uint64) { // if peerid is provided it is skipped - var cblock_serialized Complete_Block - - defer globals.Recover(3) - - /*if IsSyncing() { // if we are syncing, do NOT broadcast the block - return - }*/ - - cblock_serialized.Block = cbl.Bl.Serialize() - - for i := range cbl.Txs { - cblock_serialized.Txs = append(cblock_serialized.Txs, cbl.Txs[i].Serialize()) - } - - our_height := chain.Get_Height() - // build the request once and dispatch it to all possible peers - count := 0 - unique_map := UniqueConnections() - - for _, v := range unique_map { - select { - case <-Exit_Event: - return - default: - } - if atomic.LoadUint32(&v.State) != HANDSHAKE_PENDING && PeerID != v.Peer_ID { // skip pre-handshake connections - - // if the other end is > 50 blocks behind, do not broadcast block to hime - // this is an optimisation, since if the other end is syncing - // every peer will keep on broadcasting and thus making it more lagging - // due to overheads - peer_height := atomic.LoadInt64(&v.Height) - if (our_height - peer_height) > 25 { - continue - } - - count++ - go func(connection *Connection) { - defer globals.Recover(3) - - { // everyone needs ultra compact block if possible - var peer_specific_block Objects - - var cblock Complete_Block - cblock.Block = cblock_serialized.Block - - sent := 0 - skipped := 0 - connection.TXpool_cache_lock.RLock() - for i := range cbl.Bl.Tx_hashes { - // in ultra compact mode send a transaction only if we know that we have not sent that transaction earlier - // send only tx not found in cache - - if _, ok := connection.TXpool_cache[binary.LittleEndian.Uint64(cbl.Bl.Tx_hashes[i][:])]; !ok { - cblock.Txs = append(cblock.Txs, cblock_serialized.Txs[i]) - sent++ - - } else { - skipped++ - } - - } - connection.TXpool_cache_lock.RUnlock() - - connection.logger.V(3).Info("Sending ultra block to peer ", "total", len(cbl.Bl.Tx_hashes), "tx skipped", skipped, "sent", sent) - peer_specific_block.CBlocks = append(peer_specific_block.CBlocks, cblock) - var dummy Dummy - fill_common(&peer_specific_block.Common) // fill common info - if err := connection.RConn.Client.Call("Peer.NotifyBlock", peer_specific_block, &dummy); err != nil { - return - } - connection.update(&dummy.Common) // update common information - - } - }(v) - } - - } - //rlog.Infof("Broadcasted block %s to %d peers", cbl.Bl.GetHash(), count) +func Broadcast_Block(cbl *block.Complete_Block, PeerID uint64) { + Broadcast_Block_Coded(cbl, PeerID) } // broad cast a block to all connected peers in cut up in chunks with erasure coding @@ -506,6 +446,8 @@ func broadcast_Block_Coded(cbl *block.Complete_Block, PeerID uint64, first_seen blid := cbl.Bl.GetHash() + logger.V(1).Info("Will broadcast block", "blid", blid, "tx_count", len(cbl.Bl.Tx_hashes), "txs", len(cbl.Txs)) + hhash, chunk_count := convert_block_to_chunks(cbl, 16, 32) our_height := chain.Get_Height() @@ -521,7 +463,7 @@ func broadcast_Block_Coded(cbl *block.Complete_Block, PeerID uint64, first_seen return default: } - if atomic.LoadUint32(&v.State) != HANDSHAKE_PENDING && PeerID != v.Peer_ID { // skip pre-handshake connections + if atomic.LoadUint32(&v.State) != HANDSHAKE_PENDING && PeerID != v.Peer_ID && v.Peer_ID != GetPeerID() { // skip pre-handshake connections // if the other end is > 50 blocks behind, do not broadcast block to hime // this is an optimisation, since if the other end is syncing @@ -588,7 +530,7 @@ func broadcast_Chunk(chunk *Block_Chunk, PeerID uint64, first_seen int64) { // i count := 0 unique_map := UniqueConnections() - chash := chunk.HeaderHash() + hhash := chunk.HHash for _, v := range unique_map { select { @@ -596,7 +538,7 @@ func broadcast_Chunk(chunk *Block_Chunk, PeerID uint64, first_seen int64) { // i return default: } - if atomic.LoadUint32(&v.State) != HANDSHAKE_PENDING && PeerID != v.Peer_ID { // skip pre-handshake connections + if atomic.LoadUint32(&v.State) != HANDSHAKE_PENDING && PeerID != v.Peer_ID && v.Peer_ID != GetPeerID() { // skip pre-handshake connections // if the other end is > 50 blocks behind, do not broadcast block to hime // this is an optimisation, since if the other end is syncing @@ -616,11 +558,11 @@ func broadcast_Chunk(chunk *Block_Chunk, PeerID uint64, first_seen int64) { // i var chunkid [33 + 32]byte copy(chunkid[:], chunk.BLID[:]) chunkid[32] = byte(chunk.CHUNK_ID) - copy(chunkid[33:], chash[:]) + copy(chunkid[33:], hhash[:]) peer_specific_list.Sent = first_seen peer_specific_list.Chunk_list = append(peer_specific_list.Chunk_list, chunkid) - connection.logger.V(3).Info("Sending erasure coded chunk to peer ", "cid", chunk.CHUNK_ID) + connection.logger.V(3).Info("Sending erasure coded chunk INV to peer ", "raw", fmt.Sprintf("%x", chunkid), "blid", fmt.Sprintf("%x", chunk.BLID), "cid", chunk.CHUNK_ID, "hhash", fmt.Sprintf("%x", hhash), "exists", nil != is_chunk_exist(hhash, uint8(chunk.CHUNK_ID))) var dummy Dummy fill_common(&peer_specific_list.Common) // fill common info if err := connection.RConn.Client.Call("Peer.NotifyINV", peer_specific_list, &dummy); err != nil { @@ -656,13 +598,15 @@ func broadcast_MiniBlock(mbl block.MiniBlock, PeerID uint64, first_seen int64) { count := 0 unique_map := UniqueConnections() + //connection.logger.V(4).Info("Sending mini block to peer ") + for _, v := range unique_map { select { case <-Exit_Event: return default: } - if atomic.LoadUint32(&v.State) != HANDSHAKE_PENDING && PeerID != v.Peer_ID { // skip pre-handshake connections + if atomic.LoadUint32(&v.State) != HANDSHAKE_PENDING && PeerID != v.Peer_ID && v.Peer_ID != GetPeerID() { // skip pre-handshake connections // if the other end is > 50 blocks behind, do not broadcast block to hime // this is an optimisation, since if the other end is syncing @@ -676,7 +620,7 @@ func broadcast_MiniBlock(mbl block.MiniBlock, PeerID uint64, first_seen int64) { count++ go func(connection *Connection) { defer globals.Recover(3) - connection.logger.V(4).Info("Sending mini block to peer ") + var dummy Dummy if err := connection.RConn.Client.Call("Peer.NotifyMiniBlock", peer_specific_block, &dummy); err != nil { return @@ -717,7 +661,7 @@ func broadcast_Tx(tx *transaction.Transaction, PeerID uint64, sent int64) (relay return default: } - if atomic.LoadUint32(&v.State) != HANDSHAKE_PENDING && PeerID != v.Peer_ID { // skip pre-handshake connections + if atomic.LoadUint32(&v.State) != HANDSHAKE_PENDING && PeerID != v.Peer_ID && v.Peer_ID != GetPeerID() { // skip pre-handshake connections // if the other end is > 50 blocks behind, do not broadcast block to hime // this is an optimisation, since if the other end is syncing @@ -736,29 +680,13 @@ func broadcast_Tx(tx *transaction.Transaction, PeerID uint64, sent int64) (relay } }() - resend := true - // disable cache if not possible due to options - // assuming the peer is good, he would like to obtain the tx ASAP - - connection.TXpool_cache_lock.Lock() - if _, ok := connection.TXpool_cache[binary.LittleEndian.Uint64(txhash[:])]; !ok { - connection.TXpool_cache[binary.LittleEndian.Uint64(txhash[:])] = uint32(time.Now().Unix()) - resend = true - } else { - resend = false - } - connection.TXpool_cache_lock.Unlock() - - if resend { - var dummy Dummy - fill_common(&dummy.Common) // fill common info - if err := connection.RConn.Client.Call("Peer.NotifyINV", request, &dummy); err != nil { - return - } - connection.update(&dummy.Common) // update common information - - atomic.AddInt32(&relayed_count, 1) + var dummy Dummy + fill_common(&dummy.Common) // fill common info + if err := connection.RConn.Client.Call("Peer.NotifyINV", request, &dummy); err != nil { + return } + connection.update(&dummy.Common) // update common information + atomic.AddInt32(&relayed_count, 1) }(v) } @@ -899,11 +827,12 @@ func Abs(n int64) int64 { // detect whether we are behind any of the connected peers and trigger sync ASAP // randomly with one of the peers func syncroniser() { + delay := time.NewTicker(time.Second) for { select { case <-Exit_Event: return - case <-time.After(1000 * time.Millisecond): + case <-delay.C: } calculate_network_time() // calculate time every sec diff --git a/p2p/controller.go b/p2p/controller.go index b2166c2..ffc68bf 100644 --- a/p2p/controller.go +++ b/p2p/controller.go @@ -266,11 +266,12 @@ func connect_with_endpoint(endpoint string, sync_node bool) { // maintains a persistant connection to endpoint // if connection drops, tries again after 4 secs func maintain_outgoing_priority_connection(endpoint string, sync_node bool) { + delay := time.NewTicker(4 * time.Second) for { select { case <-Exit_Event: return - case <-time.After(4 * time.Second): + case <-delay.C: } connect_with_endpoint(endpoint, sync_node) } @@ -278,12 +279,14 @@ func maintain_outgoing_priority_connection(endpoint string, sync_node bool) { // this will maintain connection to 1 seed node randomly func maintain_seed_node_connection() { - for { + delay := time.NewTicker(2 * time.Second) + + for { select { case <-Exit_Event: return - case <-time.After(2 * time.Second): + case <-delay.C: } endpoint := "" if globals.IsMainnet() { // choose mainnet seed node @@ -319,11 +322,13 @@ func maintain_connection_to_peers() { logger.Info("Min outgoing peers", "min-peers", Min_Peers) } + delay := time.NewTicker(time.Second) + for { select { case <-Exit_Event: return - case <-time.After(1000 * time.Millisecond): + case <-delay.C: } // check number of connections, if limit is reached, trigger new connections if we have peers diff --git a/p2p/rpc_handshake.go b/p2p/rpc_handshake.go index 9777e91..6619141 100644 --- a/p2p/rpc_handshake.go +++ b/p2p/rpc_handshake.go @@ -119,10 +119,8 @@ func (connection *Connection) dispatch_test_handshake() { Peer_Add(&p) } - connection.TXpool_cache = map[uint64]uint32{} - // parse delivered peer list as grey list - connection.logger.V(2).Info("Peer provides peers", "count", len(response.PeerList)) + connection.logger.V(4).Info("Peer provides peers", "count", len(response.PeerList)) for i := range response.PeerList { if i < 13 { Peer_Add(&Peer{Address: response.PeerList[i].Addr, LastConnected: uint64(time.Now().UTC().Unix())}) diff --git a/p2p/rpc_notifications.go b/p2p/rpc_notifications.go index 63a30c5..ac4d7e1 100644 --- a/p2p/rpc_notifications.go +++ b/p2p/rpc_notifications.go @@ -34,6 +34,8 @@ func (c *Connection) NotifyINV(request ObjectList, response *Dummy) (err error) var need ObjectList var dirty = false + c.logger.V(3).Info("incoming INV", "request", request) + if len(request.Block_list) >= 1 { // handle incoming blocks list for i := range request.Block_list { // if !chain.Is_Block_Topological_order(request.Block_list[i]) { // block is not in our chain @@ -78,7 +80,7 @@ func (c *Connection) NotifyINV(request ObjectList, response *Dummy) (err error) if !chain.Block_Exists(blid) { // check whether the block can be loaded from disk if nil == is_chunk_exist(hhash, cid) { // if chunk does not exist - // fmt.Printf("requesting chunk %d INV %x\n",cid, request.Chunk_list[i][:]) + c.logger.V(3).Info("requesting INV chunk", "blid", fmt.Sprintf("%x", blid), "cid", cid, "hhash", fmt.Sprintf("%x", hhash), "raw", fmt.Sprintf("%x", request.Chunk_list[i])) need.Chunk_list = append(need.Chunk_list, request.Chunk_list[i]) dirty = true } @@ -89,9 +91,8 @@ func (c *Connection) NotifyINV(request ObjectList, response *Dummy) (err error) if dirty { // request inventory only if we want it var oresponse Objects fill_common(&need.Common) // fill common info - //need.Sent = request.Sent // send time if err = c.RConn.Client.Call("Peer.GetObject", need, &oresponse); err != nil { - c.logger.V(2).Error(err, "Call failed GetObject", err) + c.logger.V(2).Error(err, "Call failed GetObject", "need_objects", need) c.exit() return } else { // process the response @@ -108,56 +109,6 @@ func (c *Connection) NotifyINV(request ObjectList, response *Dummy) (err error) } -// Peer has notified us of a new transaction -func (c *Connection) NotifyTx(request Objects, response *Dummy) error { - defer handle_connection_panic(c) - var err error - var tx transaction.Transaction - - c.update(&request.Common) // update common information - - if len(request.CBlocks) != 0 { - err = fmt.Errorf("Notify TX cannot notify blocks") - c.logger.V(3).Error(err, "Should be banned") - c.exit() - return err - } - - if len(request.Txs) != 1 { - err = fmt.Errorf("Notify TX can only notify 1 tx") - c.logger.V(3).Error(err, "Should be banned") - c.exit() - return err - } - - err = tx.Deserialize(request.Txs[0]) - if err != nil { // we have a tx which could not be deserialized ban peer - c.logger.V(3).Error(err, "Should be banned") - c.exit() - return err - } - - // track transaction propagation - if request.Sent != 0 && request.Sent < globals.Time().UTC().UnixMicro() { - time_to_receive := float64(globals.Time().UTC().UnixMicro()-request.Sent) / 1000000 - metrics.Set.GetOrCreateHistogram("tx_propagation_duration_histogram_seconds").Update(time_to_receive) - } - - // try adding tx to pool - if err = chain.Add_TX_To_Pool(&tx); err == nil { - // add tx to cache of the peer who sent us this tx - txhash := tx.GetHash() - c.TXpool_cache_lock.Lock() - c.TXpool_cache[binary.LittleEndian.Uint64(txhash[:])] = uint32(time.Now().Unix()) - c.TXpool_cache_lock.Unlock() - broadcast_Tx(&tx, 0, request.Sent) - } - - fill_common(&response.Common) // fill common info - - return nil -} - // only miniblocks carry extra info, which leads to better time tracking func (c *Connection) NotifyMiniBlock(request Objects, response *Dummy) (err error) { defer handle_connection_panic(c) @@ -171,11 +122,17 @@ func (c *Connection) NotifyMiniBlock(request Objects, response *Dummy) (err erro fill_common_T1(&request.Common) c.update(&request.Common) // update common information - for i := range request.MiniBlocks { + var mbl_arrays [][]byte + if len(c.previous_mbl) > 0 { + mbl_arrays = append(mbl_arrays, c.previous_mbl) + } + mbl_arrays = append(mbl_arrays, request.MiniBlocks...) + + for i := range mbl_arrays { var mbl block.MiniBlock var ok bool - if err = mbl.Deserialize(request.MiniBlocks[i]); err != nil { + if err = mbl.Deserialize(mbl_arrays[i]); err != nil { return err } @@ -196,8 +153,10 @@ func (c *Connection) NotifyMiniBlock(request Objects, response *Dummy) (err erro // first check whether the incoming minblock can be added to sub chains if !chain.MiniBlocks.IsConnected(mbl) { + c.previous_mbl = mbl.Serialize() c.logger.V(3).Error(err, "Disconnected miniblock") - return fmt.Errorf("Disconnected miniblock") + //return fmt.Errorf("Disconnected miniblock") + continue } var miner_hash crypto.Hash @@ -254,7 +213,7 @@ func (c *Connection) NotifyMiniBlock(request Objects, response *Dummy) (err erro if err, ok = chain.InsertMiniBlock(mbl); !ok { return err } else { // rebroadcast miniblock - broadcast_MiniBlock(mbl, c.Peer_ID, request.Sent) // do not send back to the original peer + defer broadcast_MiniBlock(mbl, c.Peer_ID, request.Sent) // do not send back to the original peer } } fill_common(&response.Common) // fill common info @@ -262,27 +221,7 @@ func (c *Connection) NotifyMiniBlock(request Objects, response *Dummy) (err erro return nil } -func (c *Connection) NotifyBlock(request Objects, response *Dummy) error { - defer handle_connection_panic(c) - var err error - if len(request.CBlocks) != 1 { - err = fmt.Errorf("Notify Block cannot only notify single block") - c.logger.V(3).Error(err, "Should be banned") - c.exit() - return err - } - c.update(&request.Common) // update common information - - err = c.processChunkedBlock(request, true, false, 0, 0) - if err != nil { // we have a block which could not be deserialized ban peer - return err - } - - fill_common(&response.Common) // fill common info - return nil -} - -func (c *Connection) processChunkedBlock(request Objects, isnotified bool, waschunked bool, data_shard_count, parity_shard_count int) error { +func (c *Connection) processChunkedBlock(request Objects, data_shard_count, parity_shard_count int) error { var err error var cbl block.Complete_Block // parse incoming block and deserialize it @@ -298,14 +237,6 @@ func (c *Connection) processChunkedBlock(request Objects, isnotified bool, wasch blid := bl.GetHash() - // track block propagation only if its notified - if isnotified { // otherwise its tracked in chunks - if request.Sent != 0 && request.Sent < globals.Time().UTC().UnixMicro() { - time_to_receive := float64(globals.Time().UTC().UnixMicro()-request.Sent) / 1000000 - metrics.Set.GetOrCreateHistogram("block_propagation_duration_histogram_seconds").Update(time_to_receive) - } - } - // object is already is in our chain, we need not relay it if chain.Is_Block_Topological_order(blid) || chain.Is_Block_Tip(blid) { return nil @@ -324,75 +255,9 @@ func (c *Connection) processChunkedBlock(request Objects, isnotified bool, wasch } cbl.Txs = append(cbl.Txs, &tx) } - - // fill all shards which we might be missing but only we received it in chunk - if waschunked { - convert_block_to_chunks(&cbl, data_shard_count, parity_shard_count) - } - } else { // the block is NOT complete, we consider it as an ultra compact block - c.logger.V(2).Info("Received an ultra compact block", "blid", blid, "txcount", len(bl.Tx_hashes), "skipped", len(bl.Tx_hashes)-len(request.CBlocks[0].Txs)) - for j := range request.CBlocks[0].Txs { - var tx transaction.Transaction - err = tx.Deserialize(request.CBlocks[0].Txs[j]) - if err != nil { // we have a tx which could not be deserialized ban peer - c.logger.V(3).Error(err, "tx cannot be deserialized.Should be banned") - c.exit() - return err - } - chain.Add_TX_To_Pool(&tx) // add tx to pool - } - - // lets build a complete block ( tx from db or mempool ) - for i := range bl.Tx_hashes { - - retry_count := 0 - retry_tx: - - if retry_count > 10 { - err = fmt.Errorf("TX %s could not be obtained after %d tries", bl.Tx_hashes[i], retry_count) - c.logger.V(3).Error(err, "Missing TX") - } - retry_count++ - - tx := chain.Mempool.Mempool_Get_TX(bl.Tx_hashes[i]) - if tx != nil { - cbl.Txs = append(cbl.Txs, tx) - continue - } else { - tx = chain.Regpool.Regpool_Get_TX(bl.Tx_hashes[i]) - if tx != nil { - cbl.Txs = append(cbl.Txs, tx) - continue - } - } - - var tx_bytes []byte - if tx_bytes, err = chain.Store.Block_tx_store.ReadTX(bl.Tx_hashes[i]); err != nil { - // the tx mentioned in ultra compact block could not be found, request a full block - - err = c.request_tx([][32]byte{bl.Tx_hashes[i]}, isnotified) - if err == nil { - goto retry_tx - } - //connection.Send_ObjectRequest([]crypto.Hash{blid}, []crypto.Hash{}) - //logger.Debugf("Ultra compact block %s missing TX %s, requesting full block", blid, bl.Tx_hashes[i]) - return err - } - - tx = &transaction.Transaction{} - if err = tx.Deserialize(tx_bytes); err != nil { - err = c.request_tx([][32]byte{bl.Tx_hashes[i]}, isnotified) - if err == nil { - goto retry_tx - } - // the tx mentioned in ultra compact block could not be found, request a full block - //connection.Send_ObjectRequest([]crypto.Hash{blid}, []crypto.Hash{}) - //logger.Debugf("Ultra compact block %s missing TX %s, requesting full block", blid, bl.Tx_hashes[i]) - return err - } - cbl.Txs = append(cbl.Txs, tx) // tx is from disk - } + c.logger.V(2).Info("Received an ultra compact block", "blid", blid, "txcount", len(bl.Tx_hashes), "skipped", len(bl.Tx_hashes)-len(request.CBlocks[0].Txs), "stacktrace", globals.StackTrace(false)) + // how } // make sure connection does not timeout and be killed while processing huge blocks diff --git a/p2p/rpc_object_request.go b/p2p/rpc_object_request.go index dad9d83..0248e23 100644 --- a/p2p/rpc_object_request.go +++ b/p2p/rpc_object_request.go @@ -68,7 +68,7 @@ func (connection *Connection) GetObject(request ObjectList, response *Objects) e copy(hhash[:], request.Chunk_list[i][33:]) if chunk := is_chunk_exist(hhash, cid); chunk == nil { - return fmt.Errorf("no such chunk %x%x%x cid %d %2x", blid, cid, hhash, cid, cid) + return fmt.Errorf("no such chunk %x %x %x cid %d %2x", blid, cid, hhash, cid, cid) } else { // we do have the chunk, pass it on response.Chunks = append(response.Chunks, *chunk) // append the chunk } diff --git a/vendor/github.com/hashicorp/yamux/session.go b/vendor/github.com/hashicorp/yamux/session.go index cac83bf..046a3d3 100644 --- a/vendor/github.com/hashicorp/yamux/session.go +++ b/vendor/github.com/hashicorp/yamux/session.go @@ -80,7 +80,7 @@ type Session struct { // or to directly send a header type sendReady struct { Hdr []byte - Body io.Reader + Body []byte Err chan error } @@ -352,7 +352,7 @@ func (s *Session) keepalive() { } // waitForSendErr waits to send a header, checking for a potential shutdown -func (s *Session) waitForSend(hdr header, body io.Reader) error { +func (s *Session) waitForSend(hdr header, body []byte) error { errCh := make(chan error, 1) return s.waitForSendErr(hdr, body, errCh) } @@ -360,7 +360,7 @@ func (s *Session) waitForSend(hdr header, body io.Reader) error { // waitForSendErr waits to send a header with optional data, checking for a // potential shutdown. Since there's the expectation that sends can happen // in a timely manner, we enforce the connection write timeout here. -func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error { +func (s *Session) waitForSendErr(hdr header, body []byte, errCh chan error) error { t := timerPool.Get() timer := t.(*time.Timer) timer.Reset(s.config.ConnectionWriteTimeout) @@ -440,7 +440,7 @@ func (s *Session) send() { // Send data from a body if given if ready.Body != nil { - _, err := io.Copy(s.conn, ready.Body) + _, err := s.conn.Write(ready.Body) if err != nil { s.logger.Printf("[ERR] yamux: Failed to write body: %v", err) asyncSendErr(ready.Err, err) diff --git a/vendor/github.com/hashicorp/yamux/stream.go b/vendor/github.com/hashicorp/yamux/stream.go index 4e2c5e5..f444bdc 100644 --- a/vendor/github.com/hashicorp/yamux/stream.go +++ b/vendor/github.com/hashicorp/yamux/stream.go @@ -169,7 +169,7 @@ func (s *Stream) Write(b []byte) (n int, err error) { func (s *Stream) write(b []byte) (n int, err error) { var flags uint16 var max uint32 - var body io.Reader + var body []byte START: s.stateLock.Lock() switch s.state { @@ -195,7 +195,7 @@ START: // Send up to our send window max = min(window, uint32(len(b))) - body = bytes.NewReader(b[:max]) + body = b[:max] // Send the header s.sendHdr.encode(typeData, flags, s.id, max)