package graviton import "io" import "os" import "fmt" import "path/filepath" import "sync" import "encoding/binary" import "golang.org/x/xerrors" // all file operations will go through this // If this is implemented through an interface, it will trigger memory allocations on heap // this crude implementation serves the purpose and also allows to implement arbitary storage backends type file struct { diskfile *os.File // used for disk backend memoryfile []byte // used for memory backend size uint32 } type storage_layer_type int8 const ( unknown_layer storage_layer_type = iota // default is unknown layer disk memory ) // Store is the backend which is used to store the data in serialized form to disk. // The data is stored in files in split format and total number of files can be 4 billion. // each file is upto 2 GB in size, this limit has been placed to support FAT32 which restricts files to 4GB type Store struct { storage_layer storage_layer_type // identify storage layer base_directory string files map[uint32]*file findex uint32 versionrootfile *file // only maintains recent version records, each version is 8 bytes and stores the file index and fpos //internal_value_root *inner // internal append only value root commitsync sync.RWMutex // used to sync altroots value root, versioned root discsync sync.Mutex // used to syncronise disc swrites } // start a new memory backed store which may be useful for testing and other temporaray use cases. func NewMemStore() (*Store, error) { s := &Store{storage_layer: memory, files: map[uint32]*file{}} return s.init() } // open/create a disk based store, if the directory pre-exists, it is used as is. Since we are an append only keyvalue // store, we do not delete any data. func NewDiskStore(basepath string) (*Store, error) { if err := os.MkdirAll(basepath, 0700); err != nil { return nil, fmt.Errorf("direction creation err %s dirpath %s \n", err, basepath) } s := &Store{storage_layer: disk, base_directory: basepath, files: map[uint32]*file{}} return s.init() } func (store *Store) Close() { switch store.storage_layer { case disk: for _, f := range store.files { f.diskfile.Close() } store.versionrootfile.diskfile.Close() case memory: for _, f := range store.files { f.memoryfile = nil } default: panic("unknown storage layer") } } // init and load some items from the store func (s *Store) init() (*Store, error) { return s, s.loadfiles() } // 4 billion files each of 4 GB seems to be enough for quite some time, we will run out of handles much earlier // note that the structure is independant of these pointers and can thus be extended at any point in time in future func (s *Store) uint_to_filename(n uint32) string { switch s.storage_layer { case disk: d, c, b, a := n>>24, ((n >> 16) & 0xff), ((n >> 8) & 0xff), n return filepath.Join(s.base_directory, fmt.Sprintf("%d", d), fmt.Sprintf("%d", c), fmt.Sprintf("%d", b), fmt.Sprintf("%d", a)+".dfs") case memory: fallthrough default: panic("unknown storage layer") } } // load all files from the disk // we may need to increase file handles func (s *Store) loadfiles() error { if s.storage_layer == disk { if file_handle, err := os.OpenFile(filepath.Join(s.base_directory, "version_root.bin"), os.O_CREATE|os.O_RDWR, 0600); err != nil { return xerrors.Errorf("%w: index %d, filename %s", err, s.findex, s.uint_to_filename(uint32(s.findex))) } else { s.versionrootfile = &file{diskfile: file_handle} } } else if s.storage_layer == memory { s.versionrootfile = &file{memoryfile: []byte{}, size: uint32(0)} } else { return fmt.Errorf("unknown storage layer") } for i := uint32(0); i < (4*1024*1024*1024)-1; i++ { if s.storage_layer == disk { filename := s.uint_to_filename(uint32(i)) finfo, err := os.Stat(filename) if os.IsNotExist(err) { // path/to/whatever does not exist break } if finfo != nil && finfo.IsDir() { return fmt.Errorf("expected file but found directory at path %s", filename) } file_handle, err := os.OpenFile(filename, os.O_RDWR, 0600) if err != nil { return fmt.Errorf("%s: filename:%s", err, filename) } s.files[i] = &file{diskfile: file_handle, size: uint32(finfo.Size())} s.findex = i } else if s.storage_layer == memory { // nothing to do memory always starts afresh break } } if len(s.files) == 0 { return s.create_first_file() } return nil } func (s *Store) create_first_file() error { if s.storage_layer == disk { err := os.MkdirAll(filepath.Dir(s.uint_to_filename(0)), 0700) if err != nil { return fmt.Errorf("direction creation err %s filename %s \n", err, s.uint_to_filename(0)) } if file_handle, err := os.OpenFile(s.uint_to_filename(0), os.O_CREATE|os.O_RDWR, 0600); err != nil { return xerrors.Errorf("%w: index %d, filename %s", err, 0, s.uint_to_filename(uint32(0))) } else { file_handle.Write([]byte{0x0}) // write a byte so as mark 0,0 as invalid s.findex = 0 s.files[s.findex] = &file{diskfile: file_handle, size: uint32(1)} } } else if s.storage_layer == memory { s.findex = 0 s.files[s.findex] = &file{memoryfile: []byte{1}, size: uint32(1)} // write a byte so as mark 0,0 as invalid } return nil } // we are here means we have a currently open file // this function is single threaded func (s *Store) write(buf []byte) (uint32, uint32, error) { var done int var err error s.discsync.Lock() //defer s.discsync.Unlock() // defer has been removed to removed overhead cfile, ok := s.files[s.findex] if !ok || len(s.files) < 1 { s.discsync.Unlock() return 0, 0, fmt.Errorf("invalid file structures") } // // check whether we need to open a new file or overflowing if cfile.size+uint32(len(buf)) > MAX_FILE_SIZE || cfile.size+uint32(len(buf)) < cfile.size { s.findex++ if s.storage_layer == disk { err := os.MkdirAll(filepath.Dir(s.uint_to_filename(uint32(s.findex))), 0700) if err != nil { s.discsync.Unlock() return 0, 0, fmt.Errorf("direction creation err %s filename %s \n", err, s.uint_to_filename(s.findex)) } if file_handle, err := os.OpenFile(s.uint_to_filename(uint32(s.findex)), os.O_CREATE|os.O_RDWR, 0600); err != nil { s.discsync.Unlock() return 0, 0, xerrors.Errorf("%w: index %d, filename %s", err, s.findex, s.uint_to_filename(uint32(s.findex))) } else { s.files[s.findex] = &file{diskfile: file_handle} cfile = s.files[s.findex] } } else if s.storage_layer == memory { s.files[s.findex] = &file{memoryfile: []byte{}, size: uint32(0)} // write a byte so as mark 0,0 as invalid cfile = s.files[s.findex] } else { return 0, 0, fmt.Errorf("unknown storage layer") } } pos := cfile.size if s.storage_layer == disk { done, err = cfile.diskfile.WriteAt(buf, int64(cfile.size)) } else if s.storage_layer == memory { if int64(len(cfile.memoryfile)) != int64(cfile.size) { // fmt.Printf("filesize %d , len of memory %d\n", cfile.size,len(cfile.memoryfile)) return 0, 0, fmt.Errorf("probable store is closed") } cfile.memoryfile = append(cfile.memoryfile, buf...) done += len(buf) } cfile.size += uint32(done) s.discsync.Unlock() return s.findex, pos, err } func (s *Store) read(findex, fpos uint32, buf []byte) (int, error) { if cfile, ok := s.files[findex]; !ok { return 0, fmt.Errorf("findex not available") //xerrors.Errorf("data file (indexed at %d) is NOT available", findex) } else { if s.storage_layer == disk { c, err := cfile.diskfile.ReadAt(buf, int64(fpos)) return c, err } else if s.storage_layer == memory { if fpos < uint32(len(cfile.memoryfile)) { c := copy(buf, cfile.memoryfile[fpos:]) return c, nil } else if fpos == uint32(len(cfile.memoryfile)) { return 0, io.EOF } else { return 0, fmt.Errorf("out of range") } } else { return 0, fmt.Errorf("unknown storage layer") } } } // versions are 1 based func (s *Store) writeVersionData(version uint64, findex, fpos uint32) error { var buf [512]byte s.discsync.Lock() defer s.discsync.Unlock() binary.LittleEndian.PutUint32(buf[0:], findex) binary.LittleEndian.PutUint32(buf[4:], fpos) version-- if s.storage_layer == disk { if _, err := s.versionrootfile.diskfile.WriteAt(buf[:8], int64(version*8)); err != nil { return err } } else if s.storage_layer == memory { if uint64(len(s.versionrootfile.memoryfile)) <= (version+1)*8 { s.versionrootfile.memoryfile = append(s.versionrootfile.memoryfile, []byte{0, 0, 0, 0, 0, 0, 0, 0}...) } copy(s.versionrootfile.memoryfile[version*8:], buf[:8]) } else { return fmt.Errorf("unknown storage layer") } return nil } // versions are 1 based func (s *Store) ReadVersionData(version uint64) (findex uint32, fpos uint32, err error) { var buf [512]byte s.discsync.Lock() defer s.discsync.Unlock() version-- if s.storage_layer == disk { if _, err := s.versionrootfile.diskfile.ReadAt(buf[:8], int64(version*8)); err != nil { return 0, 0, err } else { findex = binary.LittleEndian.Uint32(buf[0:]) fpos = binary.LittleEndian.Uint32(buf[4:]) } } else if s.storage_layer == memory { if uint64(len(s.versionrootfile.memoryfile)) <= (version)*8 { return 0, 0, fmt.Errorf("invalid version %d %d", version, len(s.versionrootfile.memoryfile)) } else { findex = binary.LittleEndian.Uint32(s.versionrootfile.memoryfile[version*8+0:]) fpos = binary.LittleEndian.Uint32(s.versionrootfile.memoryfile[version*8+4:]) } } else { return 0, 0, fmt.Errorf("unknown storage layer") } return } func (s *Store) findhighestsnapshotinram() (index int, version uint64, findex, fpos uint32, err error) { if s.storage_layer == disk { var fstat os.FileInfo if fstat, err = s.versionrootfile.diskfile.Stat(); err != nil { return } if version = uint64(fstat.Size() / 8); version == 0 { return } findex, fpos, err = s.ReadVersionData(version) } else if s.storage_layer == memory { version = uint64(len(s.versionrootfile.memoryfile) / 8) if version == 0 { return } findex, fpos, err = s.ReadVersionData(version) } else { err = fmt.Errorf("unknown storage layer") } return }