350 lines
10 KiB
Go
Raw Normal View History

2021-11-08 16:39:17 +00:00
package graviton
import "io"
import "os"
import "fmt"
import "path/filepath"
import "sync"
import "encoding/binary"
import "golang.org/x/xerrors"
// all file operations will go through this
// If this is implemented through an interface, it will trigger memory allocations on heap
// this crude implementation serves the purpose and also allows to implement arbitary storage backends
type file struct {
diskfile *os.File // used for disk backend
memoryfile []byte // used for memory backend
size uint32
}
type storage_layer_type int8
const (
unknown_layer storage_layer_type = iota // default is unknown layer
disk
memory
)
// Store is the backend which is used to store the data in serialized form to disk.
// The data is stored in files in split format and total number of files can be 4 billion.
// each file is upto 2 GB in size, this limit has been placed to support FAT32 which restricts files to 4GB
type Store struct {
storage_layer storage_layer_type // identify storage layer
base_directory string
files map[uint32]*file
findex uint32
versionrootfile *file // only maintains recent version records, each version is 8 bytes and stores the file index and fpos
//internal_value_root *inner // internal append only value root
commitsync sync.RWMutex // used to sync altroots value root, versioned root
discsync sync.Mutex // used to syncronise disc swrites
}
// start a new memory backed store which may be useful for testing and other temporaray use cases.
func NewMemStore() (*Store, error) {
s := &Store{storage_layer: memory, files: map[uint32]*file{}}
return s.init()
}
// open/create a disk based store, if the directory pre-exists, it is used as is. Since we are an append only keyvalue
// store, we do not delete any data.
func NewDiskStore(basepath string) (*Store, error) {
if err := os.MkdirAll(basepath, 0700); err != nil {
return nil, fmt.Errorf("direction creation err %s dirpath %s \n", err, basepath)
}
s := &Store{storage_layer: disk, base_directory: basepath, files: map[uint32]*file{}}
return s.init()
}
func (store *Store) Close() {
switch store.storage_layer {
case disk:
for _, f := range store.files {
f.diskfile.Close()
}
store.versionrootfile.diskfile.Close()
case memory:
for _, f := range store.files {
f.memoryfile = nil
}
default:
panic("unknown storage layer")
}
}
// init and load some items from the store
func (s *Store) init() (*Store, error) {
return s, s.loadfiles()
}
// 4 billion files each of 4 GB seems to be enough for quite some time, we will run out of handles much earlier
// note that the structure is independant of these pointers and can thus be extended at any point in time in future
func (s *Store) uint_to_filename(n uint32) string {
switch s.storage_layer {
case disk:
d, c, b, a := n>>24, ((n >> 16) & 0xff), ((n >> 8) & 0xff), n
return filepath.Join(s.base_directory, fmt.Sprintf("%d", d), fmt.Sprintf("%d", c), fmt.Sprintf("%d", b), fmt.Sprintf("%d", a)+".dfs")
case memory:
fallthrough
default:
panic("unknown storage layer")
}
}
// load all files from the disk
// we may need to increase file handles
func (s *Store) loadfiles() error {
if s.storage_layer == disk {
if file_handle, err := os.OpenFile(filepath.Join(s.base_directory, "version_root.bin"), os.O_CREATE|os.O_RDWR, 0600); err != nil {
return xerrors.Errorf("%w: index %d, filename %s", err, s.findex, s.uint_to_filename(uint32(s.findex)))
} else {
s.versionrootfile = &file{diskfile: file_handle}
}
} else if s.storage_layer == memory {
s.versionrootfile = &file{memoryfile: []byte{}, size: uint32(0)}
} else {
return fmt.Errorf("unknown storage layer")
}
for i := uint32(0); i < (4*1024*1024*1024)-1; i++ {
if s.storage_layer == disk {
filename := s.uint_to_filename(uint32(i))
finfo, err := os.Stat(filename)
if os.IsNotExist(err) { // path/to/whatever does not exist
break
}
if finfo != nil && finfo.IsDir() {
return fmt.Errorf("expected file but found directory at path %s", filename)
}
file_handle, err := os.OpenFile(filename, os.O_RDWR, 0600)
if err != nil {
return fmt.Errorf("%s: filename:%s", err, filename)
}
s.files[i] = &file{diskfile: file_handle, size: uint32(finfo.Size())}
s.findex = i
} else if s.storage_layer == memory {
// nothing to do memory always starts afresh
break
}
}
if len(s.files) == 0 {
return s.create_first_file()
}
return nil
}
func (s *Store) create_first_file() error {
if s.storage_layer == disk {
err := os.MkdirAll(filepath.Dir(s.uint_to_filename(0)), 0700)
if err != nil {
return fmt.Errorf("direction creation err %s filename %s \n", err, s.uint_to_filename(0))
}
if file_handle, err := os.OpenFile(s.uint_to_filename(0), os.O_CREATE|os.O_RDWR, 0600); err != nil {
return xerrors.Errorf("%w: index %d, filename %s", err, 0, s.uint_to_filename(uint32(0)))
} else {
file_handle.Write([]byte{0x0}) // write a byte so as mark 0,0 as invalid
s.findex = 0
s.files[s.findex] = &file{diskfile: file_handle, size: uint32(1)}
}
} else if s.storage_layer == memory {
s.findex = 0
s.files[s.findex] = &file{memoryfile: []byte{1}, size: uint32(1)} // write a byte so as mark 0,0 as invalid
}
return nil
}
// we are here means we have a currently open file
// this function is single threaded
func (s *Store) write(buf []byte) (uint32, uint32, error) {
var done int
var err error
s.discsync.Lock()
//defer s.discsync.Unlock() // defer has been removed to removed overhead
cfile, ok := s.files[s.findex]
if !ok || len(s.files) < 1 {
s.discsync.Unlock()
return 0, 0, fmt.Errorf("invalid file structures")
}
// // check whether we need to open a new file or overflowing
if cfile.size+uint32(len(buf)) > MAX_FILE_SIZE || cfile.size+uint32(len(buf)) < cfile.size {
s.findex++
if s.storage_layer == disk {
err := os.MkdirAll(filepath.Dir(s.uint_to_filename(uint32(s.findex))), 0700)
if err != nil {
s.discsync.Unlock()
return 0, 0, fmt.Errorf("direction creation err %s filename %s \n", err, s.uint_to_filename(s.findex))
}
if file_handle, err := os.OpenFile(s.uint_to_filename(uint32(s.findex)), os.O_CREATE|os.O_RDWR, 0600); err != nil {
s.discsync.Unlock()
return 0, 0, xerrors.Errorf("%w: index %d, filename %s", err, s.findex, s.uint_to_filename(uint32(s.findex)))
} else {
s.files[s.findex] = &file{diskfile: file_handle}
cfile = s.files[s.findex]
}
} else if s.storage_layer == memory {
s.files[s.findex] = &file{memoryfile: []byte{}, size: uint32(0)} // write a byte so as mark 0,0 as invalid
cfile = s.files[s.findex]
} else {
return 0, 0, fmt.Errorf("unknown storage layer")
}
}
pos := cfile.size
if s.storage_layer == disk {
done, err = cfile.diskfile.WriteAt(buf, int64(cfile.size))
} else if s.storage_layer == memory {
if int64(len(cfile.memoryfile)) != int64(cfile.size) {
// fmt.Printf("filesize %d , len of memory %d\n", cfile.size,len(cfile.memoryfile))
return 0, 0, fmt.Errorf("probable store is closed")
}
cfile.memoryfile = append(cfile.memoryfile, buf...)
done += len(buf)
}
cfile.size += uint32(done)
s.discsync.Unlock()
return s.findex, pos, err
}
func (s *Store) read(findex, fpos uint32, buf []byte) (int, error) {
if cfile, ok := s.files[findex]; !ok {
return 0, fmt.Errorf("findex not available") //xerrors.Errorf("data file (indexed at %d) is NOT available", findex)
} else {
if s.storage_layer == disk {
c, err := cfile.diskfile.ReadAt(buf, int64(fpos))
return c, err
} else if s.storage_layer == memory {
if fpos < uint32(len(cfile.memoryfile)) {
c := copy(buf, cfile.memoryfile[fpos:])
return c, nil
} else if fpos == uint32(len(cfile.memoryfile)) {
return 0, io.EOF
} else {
return 0, fmt.Errorf("out of range")
}
} else {
return 0, fmt.Errorf("unknown storage layer")
}
}
}
// versions are 1 based
func (s *Store) writeVersionData(version uint64, findex, fpos uint32) error {
var buf [512]byte
s.discsync.Lock()
defer s.discsync.Unlock()
binary.LittleEndian.PutUint32(buf[0:], findex)
binary.LittleEndian.PutUint32(buf[4:], fpos)
version--
if s.storage_layer == disk {
if _, err := s.versionrootfile.diskfile.WriteAt(buf[:8], int64(version*8)); err != nil {
return err
}
} else if s.storage_layer == memory {
if uint64(len(s.versionrootfile.memoryfile)) <= (version+1)*8 {
s.versionrootfile.memoryfile = append(s.versionrootfile.memoryfile, []byte{0, 0, 0, 0, 0, 0, 0, 0}...)
}
copy(s.versionrootfile.memoryfile[version*8:], buf[:8])
} else {
return fmt.Errorf("unknown storage layer")
}
return nil
}
// versions are 1 based
func (s *Store) ReadVersionData(version uint64) (findex uint32, fpos uint32, err error) {
var buf [512]byte
s.discsync.Lock()
defer s.discsync.Unlock()
version--
if s.storage_layer == disk {
if _, err := s.versionrootfile.diskfile.ReadAt(buf[:8], int64(version*8)); err != nil {
return 0, 0, err
} else {
findex = binary.LittleEndian.Uint32(buf[0:])
fpos = binary.LittleEndian.Uint32(buf[4:])
}
} else if s.storage_layer == memory {
if uint64(len(s.versionrootfile.memoryfile)) <= (version)*8 {
return 0, 0, fmt.Errorf("invalid version %d %d", version, len(s.versionrootfile.memoryfile))
} else {
findex = binary.LittleEndian.Uint32(s.versionrootfile.memoryfile[version*8+0:])
fpos = binary.LittleEndian.Uint32(s.versionrootfile.memoryfile[version*8+4:])
}
} else {
return 0, 0, fmt.Errorf("unknown storage layer")
}
return
}
func (s *Store) findhighestsnapshotinram() (index int, version uint64, findex, fpos uint32, err error) {
if s.storage_layer == disk {
var fstat os.FileInfo
if fstat, err = s.versionrootfile.diskfile.Stat(); err != nil {
return
}
if version = uint64(fstat.Size() / 8); version == 0 {
return
}
findex, fpos, err = s.ReadVersionData(version)
} else if s.storage_layer == memory {
version = uint64(len(s.versionrootfile.memoryfile) / 8)
if version == 0 {
return
}
findex, fpos, err = s.ReadVersionData(version)
} else {
err = fmt.Errorf("unknown storage layer")
}
return
}