diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index e7ba2bc..75fa4ae 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -1503,6 +1503,7 @@ func (chain *Blockchain) get_ordered_past(tip crypto.Hash, tillheight int64) (or // this will flip chain top, depending on which block has more work // more worked block is normally identified in < 2 secs func (chain *Blockchain) flip_top() { + return chain.Lock() defer chain.Unlock() diff --git a/blockchain/hardcoded_contracts.go b/blockchain/hardcoded_contracts.go index a15b5b7..b30fd9e 100644 --- a/blockchain/hardcoded_contracts.go +++ b/blockchain/hardcoded_contracts.go @@ -1,4 +1,4 @@ -// Copyright 2017-2021 DERO Project. All rights reserved. +// Copyright 2017-2022 DERO Project. All rights reserved. // Use of this source code in any form is governed by RESEARCH license. // license can be found in the LICENSE file. // GPG: 0F39 E425 8C65 3947 702A 8234 08B2 0360 A03A 9DE8 @@ -18,30 +18,8 @@ package blockchain // this file installs hard coded contracts -//import "fmt" import _ "embed" -/* -import "strings" -import "strconv" -import "encoding/hex" -import "encoding/binary" -import "math/big" -import "golang.org/x/xerrors" - - -import "github.com/deroproject/derohe/cryptography/bn256" -import "github.com/deroproject/derohe/transaction" -import "github.com/deroproject/derohe/config" -import "github.com/deroproject/derohe/premine" -import "github.com/deroproject/derohe/globals" -import "github.com/deroproject/derohe/block" -import "github.com/deroproject/derohe/rpc" - - - -*/ - import "github.com/deroproject/graviton" import "github.com/deroproject/derohe/dvm" import "github.com/deroproject/derohe/cryptography/crypto" @@ -49,28 +27,45 @@ import "github.com/deroproject/derohe/cryptography/crypto" //go:embed hardcoded_sc/nameservice.bas var source_nameservice string +//go:embed hardcoded_sc/nameservice_updateable.bas +var source_nameservice_updateable string + // process the miner tx, giving fees, miner rewatd etc func (chain *Blockchain) install_hardcoded_contracts(cache map[crypto.Hash]*graviton.Tree, ss *graviton.Snapshot, balance_tree *graviton.Tree, sc_tree *graviton.Tree, height uint64) (err error) { - if height != 0 { + if height == 0 { + + if _, _, err = dvm.ParseSmartContract(source_nameservice); err != nil { + logger.Error(err, "error Parsing hard coded sc") + panic(err) + return + } + + var name crypto.Hash + name[31] = 1 + if err = chain.install_hardcoded_sc(cache, ss, balance_tree, sc_tree, source_nameservice, name); err != nil { + panic(err) + return + } + return } - if _, _, err = dvm.ParseSmartContract(source_nameservice); err != nil { - logger.Error(err, "error Parsing hard coded sc") - panic(err) - return - } + if height == 21480 { // update SC at specific height + if _, _, err = dvm.ParseSmartContract(source_nameservice_updateable); err != nil { + logger.Error(err, "error Parsing hard coded sc") + panic(err) + return + } - var name crypto.Hash - name[31] = 1 - if err = chain.install_hardcoded_sc(cache, ss, balance_tree, sc_tree, source_nameservice, name); err != nil { - panic(err) - return + var name crypto.Hash + name[31] = 1 + if err = chain.install_hardcoded_sc(cache, ss, balance_tree, sc_tree, source_nameservice_updateable, name); err != nil { + panic(err) + return + } } - //fmt.Printf("source code embedded %s\n",source_nameservice) - return } diff --git a/blockchain/hardcoded_sc/nameservice_updateable.bas b/blockchain/hardcoded_sc/nameservice_updateable.bas new file mode 100644 index 0000000..6858c0a --- /dev/null +++ b/blockchain/hardcoded_sc/nameservice_updateable.bas @@ -0,0 +1,54 @@ +/* Name Service SMART CONTRACT in DVM-BASIC. + Allows a user to register names which could be looked by wallets for easy to use name while transfer +*/ + + + // This function is used to initialize parameters during install time + Function Initialize() Uint64 + 10 RETURN 0 + End Function + + // Register a name, limit names of 5 or less length + Function Register(name String) Uint64 + 10 IF EXISTS(name) THEN GOTO 50 // if name is already used, it cannot reregistered + 15 IF STRLEN(name) >= 64 THEN GOTO 50 // skip names misuse + 20 IF STRLEN(name) >= 6 THEN GOTO 40 + 30 IF SIGNER() == address_raw("deto1qyvyeyzrcm2fzf6kyq7egkes2ufgny5xn77y6typhfx9s7w3mvyd5qqynr5hx") THEN GOTO 40 + 35 IF SIGNER() != address_raw("deto1qy0ehnqjpr0wxqnknyc66du2fsxyktppkr8m8e6jvplp954klfjz2qqdzcd8p") THEN GOTO 50 + 40 STORE(name,SIGNER()) + 50 RETURN 0 + End Function + + + // This function is used to change owner of Name is an string form of address + Function TransferOwnership(name String,newowner String) Uint64 + 10 IF LOAD(name) != SIGNER() THEN GOTO 30 + 20 STORE(name,ADDRESS_RAW(newowner)) + 30 RETURN 0 + End Function + + + // This function is used to change SC owner + Function TransferSCOwnership(newowner String) Uint64 + 10 IF LOAD("owner") == SIGNER() THEN GOTO 30 + 20 RETURN 1 + 30 STORE("own1",ADDRESS_RAW(newowner)) + 40 RETURN 0 + End Function + + // Until the new owner claims ownership, existing owner remains owner + Function ClaimSCOwnership() Uint64 + 10 IF LOAD("own1") == SIGNER() THEN GOTO 30 + 20 RETURN 1 + 30 STORE("owner",SIGNER()) // ownership claim successful + 40 RETURN 0 + End Function + + // If signer is owner, provide him rights to update code anytime + // make sure update is always available to SC + Function UpdateCode(SC_CODE String) Uint64 + 10 IF LOAD("owner") == SIGNER() THEN GOTO 30 + 20 RETURN 1 + 30 UPDATE_SC_CODE(SC_CODE) + 40 RETURN 0 + End Function diff --git a/blockchain/hardcoded_sc/warning_DO_NOT_TOUCH_THIS_FOLDER.txt b/blockchain/hardcoded_sc/warning_DO_NOT_TOUCH_THIS_FOLDER.txt new file mode 100644 index 0000000..e69de29 diff --git a/blockchain/hardfork_core.go b/blockchain/hardfork_core.go index 3636ff2..844ba99 100644 --- a/blockchain/hardfork_core.go +++ b/blockchain/hardfork_core.go @@ -46,7 +46,7 @@ var mainnet_hard_forks = []Hard_fork{ // {1, 0,0,0,0,true}, // dummy entry so as we can directly use the fork index into this entry {1, 0, 0, 0, 0, true}, // version 1 hard fork where genesis block landed and chain migration occurs // version 1 has difficulty hardcoded to 1 - //{2, 95551, 0, 0, 0, true}, // version 2 hard fork where Atlantis bootstraps , it's mandatory + //{2, 10, 0, 0, 0, true}, // version 2 hard fork where SC gets update functionality , it's mandatory // {3, 721000, 0, 0, 0, true}, // version 3 hard fork emission fix, it's mandatory } diff --git a/blockchain/storetopo.go b/blockchain/storetopo.go index c352cf9..0ec2e28 100644 --- a/blockchain/storetopo.go +++ b/blockchain/storetopo.go @@ -91,6 +91,8 @@ func (s *storetopofs) Write(index int64, blid [32]byte, state_version uint64, he _, err = s.topomapping.WriteAt(buf[:], index*TOPORECORD_SIZE) + s.topomapping.Sync() // looks like this is the cause of corruption + return err } diff --git a/build_all.sh b/build_all.sh index 76a2c36..e8beb55 100644 --- a/build_all.sh +++ b/build_all.sh @@ -17,7 +17,7 @@ bash $ABSDIR/build_package.sh "./cmd/explorer" bash $ABSDIR/build_package.sh "./cmd/dero-wallet-cli" bash $ABSDIR/build_package.sh "./cmd/dero-miner" #bash $ABSDIR/build_package.sh "./cmd/simulator" -bash $ABSDIR/build_package.sh "./cmd/rpc_examples/pong_server" +#bash $ABSDIR/build_package.sh "./cmd/rpc_examples/pong_server" for d in build/*; do cp Start.md "$d"; done diff --git a/cmd/dero-wallet-cli/prompt.go b/cmd/dero-wallet-cli/prompt.go index 80de635..c86b185 100644 --- a/cmd/dero-wallet-cli/prompt.go +++ b/cmd/dero-wallet-cli/prompt.go @@ -895,7 +895,6 @@ func usage(w io.Writer) { io.WriteString(w, "\t\033[1mhelp\033[0m\t\tthis help\n") io.WriteString(w, "\t\033[1maddress\033[0m\t\tDisplay user address\n") io.WriteString(w, "\t\033[1mbalance\033[0m\t\tDisplay user balance\n") - io.WriteString(w, "\t\033[1mget_tx_key\033[0m\tDisplay tx proof to prove receiver for specific transaction\n") io.WriteString(w, "\t\033[1mintegrated_address\033[0m\tDisplay random integrated address (with encrypted payment ID)\n") io.WriteString(w, "\t\033[1mmenu\033[0m\t\tEnable menu mode\n") io.WriteString(w, "\t\033[1mrescan_bc\033[0m\tRescan blockchain to re-obtain transaction history \n") @@ -909,7 +908,6 @@ func usage(w io.Writer) { io.WriteString(w, "\t\033[1mtransfer\033[0m\tTransfer/Send DERO to another address\n") io.WriteString(w, "\t\t\tEg. transfer
\n") io.WriteString(w, "\t\033[1mtransfer_all\033[0m\tTransfer everything to another address\n") - io.WriteString(w, "\t\033[1mflush\033[0m\tFlush local wallet pool (for testing purposes)\n") io.WriteString(w, "\t\033[1mversion\033[0m\t\tShow version\n") io.WriteString(w, "\t\033[1mbye\033[0m\t\tQuit wallet\n") io.WriteString(w, "\t\033[1mexit\033[0m\t\tQuit wallet\n") diff --git a/cmd/derod/main.go b/cmd/derod/main.go index beacdc4..1b25385 100644 --- a/cmd/derod/main.go +++ b/cmd/derod/main.go @@ -223,6 +223,42 @@ func main() { p2p.Broadcast_MiniBlock(mbl, peerid) } + { + current_blid, err := chain.Load_Block_Topological_order_at_index(17600) + if err == nil { + + current_blid := current_blid + for { + height := chain.Load_Height_for_BL_ID(current_blid) + + if height < 17500 { + break + } + + r, err := chain.Store.Topo_store.Read(int64(height)) + if err != nil { + panic(err) + } + if r.BLOCK_ID != current_blid { + fmt.Printf("Fixing corruption r %+v , current_blid %s current_blid_height %d\n", r, current_blid, height) + + fix_commit_version, err := chain.ReadBlockSnapshotVersion(current_blid) + if err != nil { + panic(err) + } + + chain.Store.Topo_store.Write(int64(height), current_blid, fix_commit_version, int64(height)) + + } + + fix_bl, err := chain.Load_BL_FROM_ID(current_blid) + if err != nil { + panic(err) + } + current_blid = fix_bl.Tips[0] + } + } + } globals.Cron.Start() // start cron jobs // This tiny goroutine continuously updates status as required @@ -624,6 +660,50 @@ restart_loop: err, _ = chain.Add_Complete_Block(cbl) fmt.Printf("err adding block %s\n", err) + case command == "fix": + tips := chain.Get_TIPS() + + current_blid := tips[0] + for { + height := chain.Load_Height_for_BL_ID(current_blid) + + //fmt.Printf("checking height %d\n", height) + + if height < 1 { + break + } + + r, err := chain.Store.Topo_store.Read(int64(height)) + if err != nil { + panic(err) + } + if r.BLOCK_ID != current_blid { + fmt.Printf("corruption due to XYZ r %+v , current_blid %s current_blid_height\n", r, current_blid, height) + + fix_commit_version, err := chain.ReadBlockSnapshotVersion(current_blid) + if err != nil { + panic(err) + } + + chain.Store.Topo_store.Write(int64(height), current_blid, fix_commit_version, int64(height)) + + } + + fix_bl, err := chain.Load_BL_FROM_ID(current_blid) + if err != nil { + panic(err) + } + + current_blid = fix_bl.Tips[0] + + /* fix_commit_version, err = chain.ReadBlockSnapshotVersion(current_block_id) + if err != nil { + panic(err) + } + */ + + } + case command == "print_block": fmt.Printf("printing block\n") diff --git a/cmd/derod/rpc/websocket_getwork_server.go b/cmd/derod/rpc/websocket_getwork_server.go index b3e7adb..535fd90 100644 --- a/cmd/derod/rpc/websocket_getwork_server.go +++ b/cmd/derod/rpc/websocket_getwork_server.go @@ -75,11 +75,8 @@ func SendJob() { var params rpc.GetBlockTemplate_Result - var buf bytes.Buffer - encoder := json.NewEncoder(&buf) - // get a block template, and then we will fill the address here as optimization - bl, mbl, _, _, err := chain.Create_new_block_template_mining(chain.IntegratorAddress()) + bl, mbl_main, _, _, err := chain.Create_new_block_template_mining(chain.IntegratorAddress()) if err != nil { return } @@ -94,7 +91,7 @@ func SendJob() { params.Height = bl.Height params.Prev_Hash = prev_hash - if mbl.HighDiff { + if mbl_main.HighDiff { diff.Mul(diff, new(big.Int).SetUint64(config.MINIBLOCK_HIGHDIFF)) } params.Difficultyuint64 = diff.Uint64() @@ -102,33 +99,44 @@ func SendJob() { client_list_mutex.Lock() defer client_list_mutex.Unlock() - for k, v := range client_list { - if !mbl.Final { //write miners address only if possible - copy(mbl.KeyHash[:], v.address_sum[:]) - } + for rk, rv := range client_list { - for i := range mbl.Nonce { // give each user different work - mbl.Nonce[i] = globals.Global_Random.Uint32() // fill with randomness - } + go func(k *websocket.Conn, v *user_session) { + defer globals.Recover(2) + var buf bytes.Buffer + encoder := json.NewEncoder(&buf) - if v.lasterr != "" { - params.LastError = v.lasterr - v.lasterr = "" - } + mbl := mbl_main - if !v.valid_address && !chain.IsAddressHashValid(false, v.address_sum) { - params.LastError = "unregistered miner or you need to wait 15 mins" - } else { - params.LastError = "" - v.valid_address = true - } - params.Blockhashing_blob = fmt.Sprintf("%x", mbl.Serialize()) - params.Blocks = v.blocks - params.MiniBlocks = v.miniblocks + if !mbl.Final { //write miners address only if possible + copy(mbl.KeyHash[:], v.address_sum[:]) + } - encoder.Encode(params) - k.WriteMessage(websocket.TextMessage, buf.Bytes()) - buf.Reset() + for i := range mbl.Nonce { // give each user different work + mbl.Nonce[i] = globals.Global_Random.Uint32() // fill with randomness + } + + if v.lasterr != "" { + params.LastError = v.lasterr + v.lasterr = "" + } + + if !v.valid_address && !chain.IsAddressHashValid(false, v.address_sum) { + params.LastError = "unregistered miner or you need to wait 15 mins" + } else { + params.LastError = "" + v.valid_address = true + } + params.Blockhashing_blob = fmt.Sprintf("%x", mbl.Serialize()) + params.Blocks = v.blocks + params.MiniBlocks = v.miniblocks + + encoder.Encode(params) + k.SetWriteDeadline(time.Now().Add(100 * time.Millisecond)) + k.WriteMessage(websocket.TextMessage, buf.Bytes()) + buf.Reset() + + }(rk, rv) } @@ -139,7 +147,7 @@ func newUpgrader() *websocket.Upgrader { u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) { // echo - c.WriteMessage(messageType, data) + //c.WriteMessage(messageType, data) if messageType != websocket.TextMessage { return @@ -148,7 +156,7 @@ func newUpgrader() *websocket.Upgrader { sess := c.Session().(*user_session) client_list_mutex.Lock() - client_list_mutex.Unlock() + defer client_list_mutex.Unlock() var p rpc.SubmitBlock_Params @@ -180,8 +188,9 @@ func newUpgrader() *websocket.Upgrader { }) u.OnClose(func(c *websocket.Conn, err error) { client_list_mutex.Lock() + defer client_list_mutex.Unlock() delete(client_list, c) - client_list_mutex.Unlock() + }) return u @@ -213,8 +222,9 @@ func onWebsocket(w http.ResponseWriter, r *http.Request) { wsConn.SetSession(&session) client_list_mutex.Lock() + defer client_list_mutex.Unlock() client_list[wsConn] = &session - client_list_mutex.Unlock() + } func Getwork_server() { diff --git a/config/version.go b/config/version.go index 960e48e..a2255da 100644 --- a/config/version.go +++ b/config/version.go @@ -20,4 +20,4 @@ import "github.com/blang/semver/v4" // right now it has to be manually changed // do we need to include git commitsha?? -var Version = semver.MustParse("3.4.123-0.DEROHE.STARGATE+26022022") +var Version = semver.MustParse("3.4.130-0.DEROHE.STARGATE+26022022") diff --git a/p2p/controller.go b/p2p/controller.go index 86cd803..40b02e0 100644 --- a/p2p/controller.go +++ b/p2p/controller.go @@ -71,6 +71,8 @@ var ClockOffset time.Duration //Clock Offset related to all the peer2 connected var backoff = map[string]int64{} // if server receives a connection, then it will not initiate connection to that ip for another 60 secs var backoff_mutex = sync.Mutex{} +var Min_Peers = int64(31) // we need to expose this to be modifieable at runtime without taking daemon offline + // return true if we should back off else we can connect func shouldwebackoff(ip string) bool { backoff_mutex.Lock() @@ -250,7 +252,7 @@ func P2P_engine() { func tunekcp(conn *kcp.UDPSession) { conn.SetACKNoDelay(true) - conn.SetNoDelay(1, 10, 2, 1) // tuning paramters for local stack + conn.SetNoDelay(0, 40, 0, 0) // tuning paramters for local stack } // will try to connect with given endpoint @@ -396,7 +398,6 @@ func maintain_seed_node_connection() { // keep building connections to network, we are talking outgoing connections func maintain_connection_to_peers() { - Min_Peers := int64(31) // we need to expose this to be modifieable at runtime without taking daemon offline // check how many connections are active if _, ok := globals.Arguments["--min-peers"]; ok && globals.Arguments["--min-peers"] != nil { // user specified a limit, use it if possible i, err := strconv.ParseInt(globals.Arguments["--min-peers"].(string), 10, 64) @@ -469,6 +470,13 @@ func P2P_Server_v2() { connection := &Connection{Client: c, Conn: conn, ConnTls: tlsconn, Addr: remote_addr, State: HANDSHAKE_PENDING, Incoming: true} connection.logger = logger.WithName("incoming").WithName(remote_addr.String()) + in, out := Peer_Direction_Count() + + if int64(in+out) > Min_Peers { // do not allow incoming ddos + connection.exit() + return + } + c.State.Set("c", connection) // set pointer to connection //connection.logger.Info("connected OnConnect") diff --git a/p2p/peer_pool.go b/p2p/peer_pool.go index a7bb78a..785b277 100644 --- a/p2p/peer_pool.go +++ b/p2p/peer_pool.go @@ -323,3 +323,21 @@ func get_peer_list() (peers []Peer_Info) { } return } + +func get_peer_list_specific(addr string) (peers []Peer_Info) { + plist := get_peer_list() + sort.SliceStable(plist, func(i, j int) bool { return plist[i].Addr < plist[j].Addr }) + + if len(plist) <= 7 { + peers = plist + } else { + index := sort.Search(len(plist), func(i int) bool { return plist[i].Addr < addr }) + for i := range plist { + peers = append(peers, plist[(i+index)%len(plist)]) + if len(peers) >= 7 { + break + } + } + } + return peers +} diff --git a/p2p/rpc_handshake.go b/p2p/rpc_handshake.go index d70eaec..a4da23d 100644 --- a/p2p/rpc_handshake.go +++ b/p2p/rpc_handshake.go @@ -45,8 +45,6 @@ func (handshake *Handshake_Struct) Fill() { // handshake.Flags = // add any flags necessary - //scan our peer list and send peers which have been recently communicated - handshake.PeerList = get_peer_list() copy(handshake.Network_ID[:], globals.Config.Network_ID[:]) } @@ -57,6 +55,9 @@ func (connection *Connection) dispatch_test_handshake() { var request, response Handshake_Struct request.Fill() + //scan our peer list and send peers which have been recently communicated + request.PeerList = get_peer_list_specific(Address(connection)) + ctx, _ := context.WithTimeout(context.Background(), 4*time.Second) if err := connection.Client.CallWithContext(ctx, "Peer.Handshake", request, &response); err != nil { connection.logger.V(4).Error(err, "cannot handshake") diff --git a/vendor/github.com/lesismal/nbio/.github/workflows/close_inactive_issues.yml b/vendor/github.com/lesismal/nbio/.github/workflows/close_inactive_issues.yml new file mode 100644 index 0000000..246c34d --- /dev/null +++ b/vendor/github.com/lesismal/nbio/.github/workflows/close_inactive_issues.yml @@ -0,0 +1,22 @@ +name: Close inactive issues +on: + schedule: + - cron: "30 1 * * *" + +jobs: + close-issues: + runs-on: ubuntu-latest + permissions: + issues: write + pull-requests: write + steps: + - uses: actions/stale@v3 + with: + days-before-issue-stale: 30 + days-before-issue-close: 14 + stale-issue-label: "stale" + stale-issue-message: "This issue is stale because it has been open for 30 days with no activity." + close-issue-message: "This issue was closed because it has been inactive for 14 days since being marked as stale." + days-before-pr-stale: -1 + days-before-pr-close: -1 + repo-token: ${{ secrets.GITHUB_TOKEN }} diff --git a/vendor/github.com/lesismal/nbio/README.md b/vendor/github.com/lesismal/nbio/README.md index 4b587d6..2a4d144 100644 --- a/vendor/github.com/lesismal/nbio/README.md +++ b/vendor/github.com/lesismal/nbio/README.md @@ -61,16 +61,17 @@ ## Features -- [x] linux: epoll +- [x] linux: epoll, both ET/LT(default) supported - [x] macos(bsd): kqueue - [x] windows: golang std net - [x] nbio.Conn implements a non-blocking net.Conn(except windows) - [x] writev supported -- [x] least dependency +- [x] concurrent write/close supported(both nbio.Conn and nbio/nbhttp/websocket.Conn) - [x] TLS supported - [x] HTTP/HTTPS 1.x -- [x] Websocket, [Passes the Autobahn Test Suite](https://lesismal.github.io/nbio/websocket) -- [ ] HTTP 2.0 +- [x] Websocket, [Passes the Autobahn Test Suite](https://lesismal.github.io/nbio/websocket), `OnOpen/OnMessage/OnClose` order guaranteed +- [ ] HTTP 2.0(no plans, 2.0 is not good enough) + ## Installation diff --git a/vendor/github.com/lesismal/nbio/autobahn/main.go b/vendor/github.com/lesismal/nbio/autobahn/main.go index e9d0446..c0373a3 100644 --- a/vendor/github.com/lesismal/nbio/autobahn/main.go +++ b/vendor/github.com/lesismal/nbio/autobahn/main.go @@ -240,7 +240,7 @@ func min(a, b int) int { func handlerIndex() func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { if *verbose { - log.Printf("reqeust to %s", r.URL) + log.Printf("reqeust to %s", r.URL.Path) } if r.URL.Path != "/" { w.WriteHeader(http.StatusNotFound) diff --git a/vendor/github.com/lesismal/nbio/conn_unix.go b/vendor/github.com/lesismal/nbio/conn_unix.go index 4d1382c..6f26f82 100644 --- a/vendor/github.com/lesismal/nbio/conn_unix.go +++ b/vendor/github.com/lesismal/nbio/conn_unix.go @@ -61,9 +61,9 @@ func (c *Conn) Read(b []byte) (int, error) { c.mux.Unlock() return 0, errClosed } - c.mux.Unlock() - + n, err := syscall.Read(c.fd, b) + c.mux.Unlock() if err == nil { c.g.afterRead(c) } diff --git a/vendor/github.com/lesismal/nbio/gopher.go b/vendor/github.com/lesismal/nbio/gopher.go index 634733a..5aa2596 100644 --- a/vendor/github.com/lesismal/nbio/gopher.go +++ b/vendor/github.com/lesismal/nbio/gopher.go @@ -6,6 +6,7 @@ package nbio import ( "container/heap" + "context" "net" "runtime" "sync" @@ -78,7 +79,7 @@ type Config struct { LockPoller bool // EpollMod sets the epoll mod, EPOLLLT by default. - EpollMod int + EpollMod uint32 } // Gopher is a manager of poller. @@ -87,6 +88,8 @@ type Gopher struct { mux sync.Mutex tmux sync.Mutex + wgConn sync.WaitGroup + Name string network string @@ -97,7 +100,7 @@ type Gopher struct { maxWriteBufferSize int maxReadTimesPerEventLoop int minConnCacheSize int - epollMod int + epollMod uint32 lockListener bool lockPoller bool @@ -130,40 +133,68 @@ type Gopher struct { Execute func(f func()) } -// Stop pollers. +// Stop closes listeners/pollers/conns/timer. func (g *Gopher) Stop() { - g.onStop() - - g.trigger.Stop() - close(g.chTimer) - for _, l := range g.listeners { l.stop() } - for i := 0; i < g.pollerNum; i++ { - g.pollers[i].stop() - } + g.mux.Lock() conns := g.connsStd g.connsStd = map[*Conn]struct{}{} connsUnix := g.connsUnix g.mux.Unlock() + g.wgConn.Done() for c := range conns { if c != nil { - c.Close() + cc := c + g.atOnce(func() { + cc.Close() + }) } } for _, c := range connsUnix { if c != nil { - go c.Close() + cc := c + g.atOnce(func() { + cc.Close() + }) } } + g.wgConn.Wait() + time.Sleep(time.Second / 5) + + g.onStop() + + g.trigger.Stop() + close(g.chTimer) + + for i := 0; i < g.pollerNum; i++ { + g.pollers[i].stop() + } + g.Wait() logging.Info("Gopher[%v] stop", g.Name) } +// Shutdown stops Gopher gracefully with context. +func (g *Gopher) Shutdown(ctx context.Context) error { + ch := make(chan struct{}) + go func() { + g.Stop() + close(ch) + }() + + select { + case <-ch: + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + // AddConn adds conn to a poller. func (g *Gopher) AddConn(conn net.Conn) (*Conn, error) { c, err := NBConn(conn) @@ -179,7 +210,10 @@ func (g *Gopher) OnOpen(h func(c *Conn)) { if h == nil { panic("invalid nil handler") } - g.onOpen = h + g.onOpen = func(c *Conn) { + g.wgConn.Add(1) + h(c) + } } // OnClose registers callback for disconnected. @@ -188,9 +222,10 @@ func (g *Gopher) OnClose(h func(c *Conn, err error)) { panic("invalid nil handler") } g.onClose = func(c *Conn, err error) { - g.atOnce(func() { - h(c, err) - }) + // g.atOnce(func() { + defer g.wgConn.Done() + h(c, err) + // }) } } @@ -425,6 +460,7 @@ func (g *Gopher) PollerBuffer(c *Conn) []byte { } func (g *Gopher) initHandlers() { + g.wgConn.Add(1) g.OnOpen(func(c *Conn) {}) g.OnClose(func(c *Conn, err error) {}) // g.OnRead(func(c *Conn, b []byte) ([]byte, error) { diff --git a/vendor/github.com/lesismal/nbio/nbhttp/client.go b/vendor/github.com/lesismal/nbio/nbhttp/client.go index 6cb3232..6398f5f 100644 --- a/vendor/github.com/lesismal/nbio/nbhttp/client.go +++ b/vendor/github.com/lesismal/nbio/nbhttp/client.go @@ -173,9 +173,7 @@ func (c *Client) Do(req *http.Request, handler func(res *http.Response, conn net handler(nil, nil, err) return } - if hc.closed { - hc.Reset() - } + hc.Reset() hc.Do(req, func(res *http.Response, conn net.Conn, err error) { hcs.releaseConn(hc) handler(res, conn, err) diff --git a/vendor/github.com/lesismal/nbio/nbhttp/client_conn.go b/vendor/github.com/lesismal/nbio/nbhttp/client_conn.go index 27941b1..d0766b1 100644 --- a/vendor/github.com/lesismal/nbio/nbhttp/client_conn.go +++ b/vendor/github.com/lesismal/nbio/nbhttp/client_conn.go @@ -50,9 +50,13 @@ type ClientConn struct { // Reset . func (c *ClientConn) Reset() { - c.conn = nil - c.handlers = nil - c.closed = false + c.mux.Lock() + if c.closed { + c.conn = nil + c.handlers = nil + c.closed = false + } + c.mux.Unlock() } // OnClose . @@ -78,21 +82,24 @@ func (c *ClientConn) Close() { // CloseWithError . func (c *ClientConn) CloseWithError(err error) { c.mux.Lock() - closed := c.closed - c.closed = true - c.mux.Unlock() - if !closed { + defer c.mux.Unlock() + if !c.closed { + c.closed = true c.closeWithErrorWithoutLock(err) } } func (c *ClientConn) closeWithErrorWithoutLock(err error) { + if err == nil { + err = io.EOF + } for _, h := range c.handlers { h.h(nil, c.conn, err) } c.handlers = nil if c.conn != nil { c.conn.Close() + c.conn = nil } if c.onClose != nil { c.onClose() diff --git a/vendor/github.com/lesismal/nbio/nbhttp/engine.go b/vendor/github.com/lesismal/nbio/nbhttp/engine.go index 74a012d..95a98ba 100644 --- a/vendor/github.com/lesismal/nbio/nbhttp/engine.go +++ b/vendor/github.com/lesismal/nbio/nbhttp/engine.go @@ -329,12 +329,16 @@ func (e *Engine) stopListeners() { // Start . func (e *Engine) Start() error { - err := e.startListeners() + err := e.Gopher.Start() if err != nil { return err } - - return e.Gopher.Start() + err = e.startListeners() + if err != nil { + e.Gopher.Stop() + return err + } + return err } // Stop . @@ -387,9 +391,9 @@ func (e *Engine) Shutdown(ctx context.Context) error { } Exit: - e.Stop() + err := e.Shutdown(ctx) logging.Info("Gopher[%v] shutdown", e.Gopher.Name) - return nil + return err } // InitTLSBuffers . diff --git a/vendor/github.com/lesismal/nbio/nbhttp/server.go b/vendor/github.com/lesismal/nbio/nbhttp/server.go index 537d93f..25ed433 100644 --- a/vendor/github.com/lesismal/nbio/nbhttp/server.go +++ b/vendor/github.com/lesismal/nbio/nbhttp/server.go @@ -47,5 +47,7 @@ func NewServerTLS(conf Config, v ...interface{}) *Server { conf.TLSConfig = tlsConfig } } + conf.AddrsTLS = append(conf.AddrsTLS, conf.Addrs...) + conf.Addrs = nil return &Server{Engine: NewEngine(conf)} } diff --git a/vendor/github.com/lesismal/nbio/net_unix.go b/vendor/github.com/lesismal/nbio/net_unix.go index 4c0a052..93d4385 100644 --- a/vendor/github.com/lesismal/nbio/net_unix.go +++ b/vendor/github.com/lesismal/nbio/net_unix.go @@ -40,11 +40,11 @@ func dupStdConn(conn net.Conn) (*Conn, error) { conn.Close() - err = syscall.SetNonblock(newFd, true) - if err != nil { - syscall.Close(newFd) - return nil, err - } + // err = syscall.SetNonblock(newFd, true) + // if err != nil { + // syscall.Close(newFd) + // return nil, err + // } return &Conn{ fd: newFd, diff --git a/vendor/github.com/lesismal/nbio/poller_epoll.go b/vendor/github.com/lesismal/nbio/poller_epoll.go index 9c92e46..7c00da2 100644 --- a/vendor/github.com/lesismal/nbio/poller_epoll.go +++ b/vendor/github.com/lesismal/nbio/poller_epoll.go @@ -28,13 +28,9 @@ const ( ) const ( - epollEventsRead = syscall.EPOLLPRI | syscall.EPOLLIN - epollEventsWrite = syscall.EPOLLOUT - epollEventsReadWrite = syscall.EPOLLPRI | syscall.EPOLLIN | syscall.EPOLLOUT - epollEventsError = syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP - - epollEventsReadET = syscall.EPOLLPRI | syscall.EPOLLIN | EPOLLET - epollEventsReadWriteET = syscall.EPOLLPRI | syscall.EPOLLIN | syscall.EPOLLOUT | EPOLLET + epollEventsRead = syscall.EPOLLPRI | syscall.EPOLLIN + epollEventsWrite = syscall.EPOLLOUT + epollEventsError = syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP ) type poller struct { @@ -142,7 +138,7 @@ func (p *poller) readWriteLoop() { msec := -1 events := make([]syscall.EpollEvent, 1024) - if p.g.onRead == nil && p.g.epollMod == syscall.EPOLLET { + if p.g.onRead == nil && p.g.epollMod == EPOLLET { p.g.maxReadTimesPerEventLoop = 1<<31 - 1 } @@ -192,7 +188,7 @@ func (p *poller) readWriteLoop() { if errors.Is(err, syscall.EAGAIN) { break } - if err != nil || n == 0 { + if err != nil { c.closeWithError(err) } if n < len(buffer) { @@ -224,11 +220,11 @@ func (p *poller) stop() { } func (p *poller) addRead(fd int) error { - switch int64(p.g.epollMod) { - case int64(EPOLLET): - return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{Fd: int32(fd), Events: epollEventsReadET}) + switch p.g.epollMod { + case EPOLLET: + return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{Fd: int32(fd), Events: syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLPRI | syscall.EPOLLIN | EPOLLET}) default: - return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{Fd: int32(fd), Events: epollEventsRead}) + return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{Fd: int32(fd), Events: syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLPRI | syscall.EPOLLIN}) } } @@ -237,11 +233,11 @@ func (p *poller) addRead(fd int) error { // } func (p *poller) modWrite(fd int) error { - switch int64(p.g.epollMod) { - case int64(EPOLLET): - return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{Fd: int32(fd), Events: epollEventsReadWriteET}) + switch p.g.epollMod { + case EPOLLET: + return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd, &syscall.EpollEvent{Fd: int32(fd), Events: syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLPRI | syscall.EPOLLIN | syscall.EPOLLOUT | EPOLLET}) default: - return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_MOD, fd, &syscall.EpollEvent{Fd: int32(fd), Events: epollEventsReadWrite}) + return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_MOD, fd, &syscall.EpollEvent{Fd: int32(fd), Events: syscall.EPOLLERR | syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLPRI | syscall.EPOLLIN | syscall.EPOLLOUT}) } } diff --git a/walletapi/rpcserver/rpc_transfer.go b/walletapi/rpcserver/rpc_transfer.go index 9a07385..b66aeb9 100644 --- a/walletapi/rpcserver/rpc_transfer.go +++ b/walletapi/rpcserver/rpc_transfer.go @@ -20,6 +20,7 @@ import "fmt" import "sync" import "context" import "runtime/debug" +import "encoding/base64" import "github.com/deroproject/derohe/rpc" import "github.com/deroproject/derohe/transaction" import "github.com/deroproject/derohe/cryptography/crypto" @@ -54,7 +55,13 @@ func Transfer(ctx context.Context, p rpc.Transfer_Params) (result rpc.Transfer_R //fmt.Printf("incoming transfer params %+v\n", p) - if p.SC_Code != "" { + if len(p.SC_Code) >= 1 { // decode SC from base64 if possible, since json has limitations + if sc, err := base64.StdEncoding.DecodeString(p.SC_Code); err == nil { + p.SC_Code = string(sc) + } + } + + if p.SC_Code != "" && p.SC_ID == "" { p.SC_RPC = append(p.SC_RPC, rpc.Argument{Name: rpc.SCACTION, DataType: rpc.DataUint64, Value: uint64(rpc.SC_INSTALL)}) p.SC_RPC = append(p.SC_RPC, rpc.Argument{Name: rpc.SCCODE, DataType: rpc.DataString, Value: p.SC_Code}) } @@ -62,6 +69,9 @@ func Transfer(ctx context.Context, p rpc.Transfer_Params) (result rpc.Transfer_R if p.SC_ID != "" { p.SC_RPC = append(p.SC_RPC, rpc.Argument{Name: rpc.SCACTION, DataType: rpc.DataUint64, Value: uint64(rpc.SC_CALL)}) p.SC_RPC = append(p.SC_RPC, rpc.Argument{Name: rpc.SCID, DataType: rpc.DataHash, Value: crypto.HashHexToHash(p.SC_ID)}) + if p.SC_Code != "" { + p.SC_RPC = append(p.SC_RPC, rpc.Argument{Name: rpc.SCCODE, DataType: rpc.DataString, Value: p.SC_Code}) + } } var tx *transaction.Transaction diff --git a/walletapi/wallet_transfer.go b/walletapi/wallet_transfer.go index 67af1f9..161b164 100644 --- a/walletapi/wallet_transfer.go +++ b/walletapi/wallet_transfer.go @@ -63,6 +63,10 @@ func (w *Wallet_Memory) TransferPayload0(transfers []rpc.Transfer, ringsize uint w.transfer_mutex.Lock() defer w.transfer_mutex.Unlock() + //if len(transfers) == 0 { + // return nil, fmt.Error("transfers is nil, cannot send.") + //} + if ringsize == 0 { ringsize = uint64(w.account.Ringsize) // use wallet ringsize, if ringsize not provided } else { // we need to use supplied ringsize