342 lines
9.8 KiB
Go

package graviton
import "io"
import "os"
import "fmt"
import "path/filepath"
import "sync"
import "encoding/binary"
import "golang.org/x/xerrors"
// all file operations will go through this
// If this is implemented through an interface, it will trigger memory allocations on heap
// this crude implementation serves the purpose and also allows to implement arbitary storage backends
type file struct {
diskfile *os.File // used for disk backend
memoryfile []byte // used for memory backend
size uint32
}
type storage_layer_type int8
const (
unknown_layer storage_layer_type = iota // default is unknown layer
disk
memory
)
// Store is the backend which is used to store the data in serialized form to disk.
// The data is stored in files in split format and total number of files can be 4 billion.
// each file is upto 2 GB in size, this limit has been placed to support FAT32 which restricts files to 4GB
type Store struct {
storage_layer storage_layer_type // identify storage layer
base_directory string
files map[uint32]*file
findex uint32
versionrootfile *file // only maintains recent version records
version_index int // version index to rotate inside version data
version_data [512]byte // stores version data pointers, 20 * 24 , each record is 24 bytes
version_data_loaded bool // whether the version data loaded
//internal_value_root *inner // internal append only value root
commitsync sync.RWMutex // used to sync altroots value root, versioned root
discsync sync.Mutex // used to syncronise disc swrites
}
// start a new memory backed store which may be useful for testing and other temporaray use cases.
func NewMemStore() (*Store, error) {
s := &Store{storage_layer: memory, files: map[uint32]*file{}}
return s.init()
}
// open/create a disk based store, if the directory pre-exists, it is used as is. Since we are an append only keyvalue
// store, we do not delete any data.
func NewDiskStore(basepath string) (*Store, error) {
if err := os.MkdirAll(basepath, 0700); err != nil {
return nil, fmt.Errorf("direction creation err %s dirpath %s \n", err, basepath)
}
s := &Store{storage_layer: disk, base_directory: basepath, files: map[uint32]*file{}}
return s.init()
}
func (store *Store) Close() {
switch store.storage_layer {
case disk:
for _, f := range store.files {
f.diskfile.Close()
}
store.versionrootfile.diskfile.Close()
case memory:
for _, f := range store.files {
f.memoryfile = nil
}
default:
panic("unknown storage layer")
}
}
// init and load some items from the store
func (s *Store) init() (*Store, error) {
return s, s.loadfiles()
}
// 4 billion files each of 4 GB seems to be enough for quite some time, we will run out of handles much earlier
// note that the structure is independant of these pointers and can thus be extended at any point in time in future
func (s *Store) uint_to_filename(n uint32) string {
switch s.storage_layer {
case disk:
d, c, b, a := n>>24, ((n >> 16) & 0xff), ((n >> 8) & 0xff), n
return filepath.Join(s.base_directory, fmt.Sprintf("%d", d), fmt.Sprintf("%d", c), fmt.Sprintf("%d", b), fmt.Sprintf("%d", a)+".dfs")
case memory:
fallthrough
default:
panic("unknown storage layer")
}
}
// load all files from the disk
// we may need to increase file handles
func (s *Store) loadfiles() error {
if s.storage_layer == disk {
if file_handle, err := os.OpenFile(filepath.Join(s.base_directory, "version_root.bin"), os.O_CREATE|os.O_RDWR, 0600); err != nil {
return xerrors.Errorf("%w: index %d, filename %s", err, s.findex, s.uint_to_filename(uint32(s.findex)))
} else {
s.versionrootfile = &file{diskfile: file_handle}
}
} else if s.storage_layer == memory {
s.versionrootfile = &file{memoryfile: []byte{}, size: uint32(0)}
} else {
return fmt.Errorf("unknown storage layer")
}
for i := uint32(0); i < (4*1024*1024*1024)-1; i++ {
if s.storage_layer == disk {
filename := s.uint_to_filename(uint32(i))
finfo, err := os.Stat(filename)
if os.IsNotExist(err) { // path/to/whatever does not exist
break
}
if finfo != nil && finfo.IsDir() {
return fmt.Errorf("expected file but found directory at path %s", filename)
}
file_handle, err := os.OpenFile(filename, os.O_RDWR, 0600)
if err != nil {
return fmt.Errorf("%s: filename:%s", err, filename)
}
s.files[i] = &file{diskfile: file_handle, size: uint32(finfo.Size())}
s.findex = i
} else if s.storage_layer == memory {
// nothing to do memory always starts afresh
break
}
}
if len(s.files) == 0 {
return s.create_first_file()
}
return nil
}
func (s *Store) create_first_file() error {
if s.storage_layer == disk {
err := os.MkdirAll(filepath.Dir(s.uint_to_filename(0)), 0700)
if err != nil {
return fmt.Errorf("direction creation err %s filename %s \n", err, s.uint_to_filename(0))
}
if file_handle, err := os.OpenFile(s.uint_to_filename(0), os.O_CREATE|os.O_RDWR, 0600); err != nil {
return xerrors.Errorf("%w: index %d, filename %s", err, 0, s.uint_to_filename(uint32(0)))
} else {
file_handle.Write([]byte{0x0}) // write a byte so as mark 0,0 as invalid
s.findex = 0
s.files[s.findex] = &file{diskfile: file_handle, size: uint32(1)}
}
} else if s.storage_layer == memory {
s.findex = 0
s.files[s.findex] = &file{memoryfile: []byte{1}, size: uint32(1)} // write a byte so as mark 0,0 as invalid
}
return nil
}
// we are here means we have a currently open file
// this function is single threaded
func (s *Store) write(buf []byte) (uint32, uint32, error) {
var done int
var err error
s.discsync.Lock()
//defer s.discsync.Unlock() // defer has been removed to removed overhead
cfile, ok := s.files[s.findex]
if !ok || len(s.files) < 1 {
s.discsync.Unlock()
return 0, 0, fmt.Errorf("invalid file structures")
}
// // check whether we need to open a new file or overflowing
if cfile.size+uint32(len(buf)) > MAX_FILE_SIZE || cfile.size+uint32(len(buf)) < cfile.size {
s.findex++
if s.storage_layer == disk {
err := os.MkdirAll(filepath.Dir(s.uint_to_filename(uint32(s.findex))), 0700)
if err != nil {
s.discsync.Unlock()
return 0, 0, fmt.Errorf("direction creation err %s filename %s \n", err, s.uint_to_filename(s.findex))
}
if file_handle, err := os.OpenFile(s.uint_to_filename(uint32(s.findex)), os.O_CREATE|os.O_RDWR, 0600); err != nil {
s.discsync.Unlock()
return 0, 0, xerrors.Errorf("%w: index %d, filename %s", err, s.findex, s.uint_to_filename(uint32(s.findex)))
} else {
s.files[s.findex] = &file{diskfile: file_handle}
cfile = s.files[s.findex]
}
} else if s.storage_layer == memory {
s.files[s.findex] = &file{memoryfile: []byte{}, size: uint32(0)} // write a byte so as mark 0,0 as invalid
cfile = s.files[s.findex]
} else {
return 0, 0, fmt.Errorf("unknown storage layer")
}
}
pos := cfile.size
if s.storage_layer == disk {
done, err = cfile.diskfile.WriteAt(buf, int64(cfile.size))
} else if s.storage_layer == memory {
if int64(len(cfile.memoryfile)) != int64(cfile.size) {
// fmt.Printf("filesize %d , len of memory %d\n", cfile.size,len(cfile.memoryfile))
return 0, 0, fmt.Errorf("probable store is closed")
}
cfile.memoryfile = append(cfile.memoryfile, buf...)
done += len(buf)
}
cfile.size += uint32(done)
s.discsync.Unlock()
return s.findex, pos, err
}
func (s *Store) read(findex, fpos uint32, buf []byte) (int, error) {
if cfile, ok := s.files[findex]; !ok {
return 0, fmt.Errorf("findex not available") //xerrors.Errorf("data file (indexed at %d) is NOT available", findex)
} else {
if s.storage_layer == disk {
c, err := cfile.diskfile.ReadAt(buf, int64(fpos))
return c, err
} else if s.storage_layer == memory {
if fpos < uint32(len(cfile.memoryfile)) {
c := copy(buf, cfile.memoryfile[fpos:])
return c, nil
} else if fpos == uint32(len(cfile.memoryfile)) {
return 0, io.EOF
} else {
return 0, fmt.Errorf("out of range")
}
} else {
return 0, fmt.Errorf("unknown storage layer")
}
}
}
func (s *Store) writeVersionData(version uint64, findex, fpos uint32) error {
var buf [512]byte
s.discsync.Lock()
defer s.discsync.Unlock()
copy(buf[:], s.version_data[:])
index := (s.version_index + 1) % internal_MAX_VERSIONS_TO_KEEP
binary.LittleEndian.PutUint64(buf[index*internal_VERSION_RECORD_SIZE+0:], version)
binary.LittleEndian.PutUint64(buf[index*internal_VERSION_RECORD_SIZE+8:], uint64(findex))
binary.LittleEndian.PutUint64(buf[index*internal_VERSION_RECORD_SIZE+16:], uint64(fpos))
if s.storage_layer == disk {
if _, err := s.versionrootfile.diskfile.WriteAt(buf[:], 0); err != nil {
return err
}
} else if s.storage_layer == memory {
s.versionrootfile.memoryfile = append(s.versionrootfile.memoryfile[:0], buf[:]...)
} else {
return fmt.Errorf("unknown storage layer")
}
copy(s.version_data[:], buf[:])
s.version_index = index
return nil
}
// load recent snapshot list to ram
func (s *Store) loadsnapshottablestoram() (err error) {
var buf [512]byte
if s.storage_layer == disk {
if finfo, err := s.versionrootfile.diskfile.Stat(); err == nil { // if newly created file, it return 0
if finfo.Size() == 0 {
copy(s.version_data[:], buf[:])
s.version_data_loaded = true
return nil
}
} else {
return err
}
} else if s.storage_layer == memory {
if len(s.versionrootfile.memoryfile) == 0 {
copy(s.version_data[:], buf[:])
s.version_data_loaded = true
return nil
}
} else {
return fmt.Errorf("unknown storage layer")
}
var bytes_count int
if s.storage_layer == disk {
bytes_count, err = s.versionrootfile.diskfile.ReadAt(buf[:], 0)
} else if s.storage_layer == memory {
bytes_count = copy(buf[:], s.versionrootfile.memoryfile)
}
if bytes_count == 512 {
copy(s.version_data[:], buf[:])
s.version_data_loaded = true
s.version_index, _, _, _ = s.findhighestsnapshotinram() // setup index properly
return nil
}
return err
}