Compare commits
No commits in common. "398f5ba442b8f9785b7a1615d72fc44e93da96f7" and "6fe051eea5ebd119fa48feedb0a0ec60d1e4420b" have entirely different histories.
398f5ba442
...
6fe051eea5
2 changed files with 6 additions and 185 deletions
|
@ -3,141 +3,21 @@
|
|||
package externalsort
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"container/heap"
|
||||
"io"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// lineReader is simply a wrapper around bufffered reader
|
||||
type lineReader struct {
|
||||
br bufio.Reader
|
||||
}
|
||||
|
||||
func (lr *lineReader) ReadLine() (string, error) {
|
||||
s, err := lr.br.ReadString('\n')
|
||||
if err != nil {
|
||||
return s, err
|
||||
}
|
||||
s = s[:len(s)-1]
|
||||
return s, err
|
||||
}
|
||||
|
||||
type lineWriter struct {
|
||||
w io.Writer
|
||||
}
|
||||
|
||||
func (lw *lineWriter) Write(l string) error {
|
||||
_, err := lw.w.Write(append([]byte(l), '\n'))
|
||||
return err
|
||||
}
|
||||
|
||||
func NewReader(r io.Reader) LineReader {
|
||||
return &lineReader{*bufio.NewReader(r)}
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func NewWriter(w io.Writer) LineWriter {
|
||||
return &lineWriter{w}
|
||||
}
|
||||
|
||||
// item for the reader heap
|
||||
// stores reader to get the next line from,
|
||||
// the line from reader which is used for sorting the heap
|
||||
// lastLine bool which determines whether the item has no more lines to read
|
||||
// and it can be safely removed from the heap
|
||||
type readerHeapItem struct {
|
||||
reader *LineReader
|
||||
line string
|
||||
lastLine bool
|
||||
}
|
||||
|
||||
type readerHeap []readerHeapItem
|
||||
|
||||
func (h readerHeap) Len() int {
|
||||
return len(h)
|
||||
}
|
||||
|
||||
func (h readerHeap) Less(i, j int) bool {
|
||||
return h[i].line < h[j].line
|
||||
}
|
||||
|
||||
func (h readerHeap) Swap(i, j int) {
|
||||
h[i], h[j] = h[j], h[i]
|
||||
}
|
||||
|
||||
func (h *readerHeap) Push(x any) {
|
||||
*h = append(*h, x.(readerHeapItem))
|
||||
}
|
||||
|
||||
func (h *readerHeap) Pop() any {
|
||||
old := *h
|
||||
n := len(old)
|
||||
x := old[n-1]
|
||||
*h = old[0 : n-1]
|
||||
return x
|
||||
}
|
||||
|
||||
// push reader back to the heap if it still has lines
|
||||
// and update its lastLine value
|
||||
// otherwise do nothing
|
||||
// returns any errors that occured while reading (expect io.EOF)
|
||||
func pushIfNotLast(h *readerHeap, reader *LineReader) error {
|
||||
line, err := (*reader).ReadLine()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return err
|
||||
}
|
||||
heap.Push(h, readerHeapItem{reader, line, true})
|
||||
return nil
|
||||
}
|
||||
heap.Push(h, readerHeapItem{reader, line, false})
|
||||
return nil
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func Merge(w LineWriter, readers ...LineReader) error {
|
||||
h := make(readerHeap, 0, len(readers))
|
||||
heap.Init(&h)
|
||||
// push initial values to heap
|
||||
for _, reader := range readers {
|
||||
err := pushIfNotLast(&h, &reader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// write lines from the top of the heap while
|
||||
// the heap is not empty
|
||||
for len(h) > 0 {
|
||||
it := heap.Pop(&h).(readerHeapItem)
|
||||
w.Write(it.line)
|
||||
if it.lastLine {
|
||||
continue
|
||||
}
|
||||
err := pushIfNotLast(&h, it.reader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func Sort(w io.Writer, in ...string) error {
|
||||
readers := make([]LineReader, 0, len(in))
|
||||
for _, fn := range in {
|
||||
b, err := os.ReadFile(fn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s := strings.TrimSuffix(string(b), "\n")
|
||||
// do not sort empty files
|
||||
if len(s) == 0 {
|
||||
continue
|
||||
}
|
||||
lines := strings.Split(s, "\n")
|
||||
sort.Strings(lines)
|
||||
s = strings.Join(lines, "\n")
|
||||
readers = append(readers, NewReader(strings.NewReader(s)))
|
||||
}
|
||||
return Merge(NewWriter(w), readers...)
|
||||
panic("implement me")
|
||||
}
|
||||
|
|
|
@ -2,67 +2,8 @@
|
|||
|
||||
package retryupdate
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/gofrs/uuid"
|
||||
"gitlab.com/slon/shad-go/retryupdate/kvapi"
|
||||
)
|
||||
|
||||
var (
|
||||
authError *kvapi.AuthError
|
||||
conflictError *kvapi.ConflictError
|
||||
)
|
||||
import "gitlab.com/slon/shad-go/retryupdate/kvapi"
|
||||
|
||||
func UpdateValue(c kvapi.Client, key string, updateFn func(oldValue *string) (newValue string, err error)) error {
|
||||
newVersion := uuid.Must(uuid.NewV4())
|
||||
get:
|
||||
for {
|
||||
old, err := c.Get(&kvapi.GetRequest{Key: key})
|
||||
var value *string
|
||||
oldUuid := uuid.Nil
|
||||
if err != nil {
|
||||
switch {
|
||||
case errors.As(err, &authError):
|
||||
return err
|
||||
case !errors.Is(err, kvapi.ErrKeyNotFound):
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
value = &old.Value
|
||||
oldUuid = old.Version
|
||||
}
|
||||
for {
|
||||
new, err := updateFn(value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = c.Set(&kvapi.SetRequest{
|
||||
Key: key,
|
||||
Value: new,
|
||||
OldVersion: oldUuid,
|
||||
NewVersion: newVersion,
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
switch {
|
||||
case errors.As(err, &conflictError):
|
||||
if conflictError.ExpectedVersion == newVersion {
|
||||
return nil
|
||||
}
|
||||
continue get
|
||||
case errors.Is(err, kvapi.ErrKeyNotFound):
|
||||
value = nil
|
||||
oldUuid = uuid.Nil
|
||||
continue
|
||||
case errors.As(err, &authError):
|
||||
return err
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
panic("implement me")
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue