11 Commits

Author SHA1 Message Date
4bd7266c89 don't try to read objects we know don't exist when committing
such as new chunks with random ids
2025-04-13 11:57:22 +02:00
bb65aab166 Server: remember opened files ids 2025-04-13 11:52:50 +02:00
a4810c7ee4 Objects: fix cache reading twice 2025-04-12 20:49:29 +02:00
e42e076b77 Objects: disable dhfs.objects.lru.print-stats by default
quite annoying
2025-04-09 11:02:46 +02:00
513cbd717d Server: remove "dhfs.objects" from proto files 2025-04-05 20:17:29 +02:00
075867daaa Server: less bad chunk merging 2025-04-05 17:50:43 +02:00
8e4ea67e53 Server: a little jmap cleanup 2025-04-04 17:26:12 +02:00
fb128882cb Server: use StringUtils.leftPad for JMapLongKey toString
much faster, without regex parsing every time!
2025-04-03 23:00:36 +02:00
cb8c50000a Objects: simplify merging iterator even more^2
initialMaxValue streams can be simplified too
2025-04-03 22:31:59 +02:00
4c5cbfb5bf Objects: simplify merging iterator even more
no need for the hashmap step of iterator construction
2025-04-03 22:23:23 +02:00
6bcec4a260 Objects: simplify merging iterator
remove the first match "optimization", as it doesn't really
matter with the separate read object methods
2025-04-03 22:13:34 +02:00
18 changed files with 209 additions and 357 deletions

View File

@@ -4,71 +4,56 @@ import io.quarkus.logging.Log;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvIterator<K, V> {
private final NavigableMap<K, CloseableKvIterator<K, V>> _sortedIterators = new TreeMap<>();
private final String _name;
private final IteratorStart _initialStartType;
private final K _initialStartKey;
private final List<IterProdFn<K, V>> _pendingIterators;
private Map<CloseableKvIterator<K, V>, Integer> _iterators;
// Fast path for the first element
private FirstMatchState<K, V> _firstMatchState;
private final Map<CloseableKvIterator<K, V>, Integer> _iterators;
public MergingKvIterator(String name, IteratorStart startType, K startKey, List<IterProdFn<K, V>> iterators) {
_goingForward = true;
_name = name;
_initialStartType = startType;
_initialStartKey = startKey;
{
int counter = 0;
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
for (var iteratorFn : iterators) {
var iterator = iteratorFn.get(startType, startKey);
if ((counter == 0) // Not really a requirement but simplifies some things for now
&& (startType == IteratorStart.GE || startType == IteratorStart.LE)
&& iterator.hasNext()
&& iterator.peekNextKey().equals(startKey)) {
_firstMatchState = new FirstMatchFound<>(iterator);
_pendingIterators = iterators;
Log.tracev("{0} Created fast match: {1}", _name, _firstMatchState);
return;
}
iteratorsTmp.put(iterator, counter++);
}
_iterators = Map.copyOf(iteratorsTmp);
_pendingIterators = null;
}
_iterators = Map.ofEntries(
IntStream.range(0, iterators.size())
.mapToObj(i -> Pair.of(iterators.get(i).get(startType, startKey), i))
.toArray(Pair[]::new)
);
_firstMatchState = new FirstMatchNone<>();
doInitialAdvance();
}
@SafeVarargs
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
this(name, startType, startKey, List.of(iterators));
}
private void doInitialAdvance() {
if (_initialStartType == IteratorStart.LT || _initialStartType == IteratorStart.LE) {
if (startType == IteratorStart.LT || startType == IteratorStart.LE) {
// Starting at a greatest key less than/less or equal than:
// We have a bunch of iterators that have given us theirs "greatest LT/LE key"
// now we need to pick the greatest of those to start with
// But if some of them don't have a lesser key, we need to pick the smallest of those
var found = _iterators.keySet().stream()
.filter(CloseableKvIterator::hasNext)
.map((i) -> {
var peeked = i.peekNextKey();
// Log.warnv("peeked: {0}, from {1}", peeked, i.getClass());
return peeked;
}).distinct().collect(Collectors.partitioningBy(e -> _initialStartType == IteratorStart.LE ? e.compareTo(_initialStartKey) <= 0 : e.compareTo(_initialStartKey) < 0));
K greatestLess = null;
K smallestMore = null;
for (var it : _iterators.keySet()) {
if (it.hasNext()) {
var peeked = it.peekNextKey();
if (startType == IteratorStart.LE ? peeked.compareTo(startKey) <= 0 : peeked.compareTo(startKey) < 0) {
if (greatestLess == null || peeked.compareTo(greatestLess) > 0) {
greatestLess = peeked;
}
} else {
if (smallestMore == null || peeked.compareTo(smallestMore) < 0) {
smallestMore = peeked;
}
}
}
}
K initialMaxValue;
if (!found.get(true).isEmpty())
initialMaxValue = found.get(true).stream().max(Comparator.naturalOrder()).orElse(null);
if (greatestLess != null)
initialMaxValue = greatestLess;
else
initialMaxValue = found.get(false).stream().min(Comparator.naturalOrder()).orElse(null);
initialMaxValue = smallestMore;
if (initialMaxValue == null) {
// Empty iterators
}
for (var iterator : _iterators.keySet()) {
while (iterator.hasNext() && iterator.peekNextKey().compareTo(initialMaxValue) < 0) {
@@ -82,7 +67,7 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
}
Log.tracev("{0} Initialized: {1}", _name, _sortedIterators);
switch (_initialStartType) {
switch (startType) {
// case LT -> {
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) < 0;
// }
@@ -90,37 +75,17 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
// assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(initialStartKey) <= 0;
// }
case GT -> {
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(_initialStartKey) > 0;
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) > 0;
}
case GE -> {
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(_initialStartKey) >= 0;
assert _sortedIterators.isEmpty() || _sortedIterators.firstKey().compareTo(startKey) >= 0;
}
}
}
private void doHydrate() {
if (_firstMatchState instanceof FirstMatchNone) {
return;
}
boolean consumed = _firstMatchState instanceof FirstMatchConsumed;
if (_firstMatchState instanceof FirstMatchFound(CloseableKvIterator iterator)) {
iterator.close();
}
_firstMatchState = new FirstMatchNone<>();
{
int counter = 0;
var iteratorsTmp = new HashMap<CloseableKvIterator<K, V>, Integer>();
for (var iteratorFn : _pendingIterators) {
var iterator = iteratorFn.get(consumed ? IteratorStart.GT : IteratorStart.GE, _initialStartKey);
iteratorsTmp.put(iterator, counter++);
}
_iterators = Map.copyOf(iteratorsTmp);
}
doInitialAdvance();
@SafeVarargs
public MergingKvIterator(String name, IteratorStart startType, K startKey, IterProdFn<K, V>... iterators) {
this(name, startType, startKey, List.of(iterators));
}
private void advanceIterator(CloseableKvIterator<K, V> iterator) {
@@ -151,17 +116,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
protected void reverse() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
doHydrate();
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
}
default -> {
}
}
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
Log.tracev("{0} Reversing from {1}", _name, cur);
_goingForward = !_goingForward;
@@ -185,18 +139,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
protected K peekImpl() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
return firstMatchFound.iterator.peekNextKey();
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
break;
}
default -> {
}
}
if (_sortedIterators.isEmpty())
throw new NoSuchElementException();
return _goingForward ? _sortedIterators.firstKey() : _sortedIterators.lastKey();
@@ -204,22 +146,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
protected void skipImpl() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
var curVal = firstMatchFound.iterator.next();
firstMatchFound.iterator.close();
_firstMatchState = new FirstMatchConsumed<>();
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, firstMatchFound.iterator, curVal, _sortedIterators.keySet());
return;
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
break;
}
default -> {
}
}
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
if (cur == null) {
throw new NoSuchElementException();
@@ -231,38 +157,11 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
protected boolean hasImpl() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
return true;
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
break;
}
default -> {
}
}
return !_sortedIterators.isEmpty();
}
@Override
protected Pair<K, V> nextImpl() {
switch (_firstMatchState) {
case FirstMatchFound<K, V> firstMatchFound -> {
var curVal = firstMatchFound.iterator.next();
firstMatchFound.iterator.close();
_firstMatchState = new FirstMatchConsumed<>();
// Log.tracev("{0} Read from {1}: {2}, next: {3}", _name, firstMatchFound.iterator, curVal, _sortedIterators.keySet());
return curVal;
}
case FirstMatchConsumed<K, V> firstMatchConsumed -> {
doHydrate();
break;
}
default -> {
}
}
var cur = _goingForward ? _sortedIterators.pollFirstEntry() : _sortedIterators.pollLastEntry();
if (cur == null) {
throw new NoSuchElementException();
@@ -275,9 +174,6 @@ public class MergingKvIterator<K extends Comparable<K>, V> extends ReversibleKvI
@Override
public void close() {
if (_firstMatchState instanceof FirstMatchFound(CloseableKvIterator iterator)) {
iterator.close();
}
for (CloseableKvIterator<K, V> iterator : _iterators.keySet()) {
iterator.close();
}

View File

@@ -42,13 +42,16 @@ public class CachingObjectPersistentStore {
if (old != null)
newSize -= old.size();
TreePMap<JObjectKey, CacheEntry> newCache = map().plus(key, entry);
TreePMap<JObjectKey, CacheEntry> newCache = map();
while (newSize > sizeLimit) {
var del = newCache.firstEntry();
newCache = newCache.minusFirstEntry();
newSize -= del.getValue().size();
}
newCache = newCache.plus(key, entry);
return new Cache(
newCache,
newSize,
@@ -189,7 +192,7 @@ public class CachingObjectPersistentStore {
}
var read = _backing.readObject(name);
maybeCache(name, read);
return _backing.readObject(name);
return read;
}
@Override

View File

@@ -43,4 +43,9 @@ public class CurrentTransaction implements Transaction {
public <T extends JData> void put(JData obj) {
transactionManager.current().put(obj);
}
@Override
public <T extends JData> void putNew(JData obj) {
transactionManager.current().putNew(obj);
}
}

View File

@@ -1,11 +1,11 @@
package com.usatiuk.objects.transaction;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.snapshot.SnapshotManager;
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
import io.quarkus.logging.Log;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
@@ -16,7 +16,6 @@ import jakarta.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
import java.util.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
@@ -106,10 +105,17 @@ public class JObjectManager {
for (var entry : curIteration.entrySet()) {
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
if (entry.getValue() instanceof TxRecord.TxObjectRecordNewWrite<?> newWrite) {
lastCurHookSeen.put(entry.getKey(), entry.getValue());
hook.onCreate(newWrite.key(), newWrite.data());
continue;
}
var oldObj = getPrev.apply(entry.getKey());
lastCurHookSeen.put(entry.getKey(), entry.getValue());
switch (entry.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
case TxRecord.TxObjectRecordWriteChecked<?> write -> {
if (oldObj == null) {
hook.onCreate(write.key(), write.data());
} else {
@@ -221,8 +227,10 @@ public class JObjectManager {
writes.values().stream()
.filter(r -> {
if (!same)
if (r instanceof TxRecord.TxObjectRecordWrite<?>(JData data)) {
if (r instanceof TxRecord.TxObjectRecordWriteChecked<?>(JData data)) {
var dep = dependenciesLocked.get(data.key());
if (dep == null)
return true;
if (dep.isPresent() && dep.get().version() > snapshotId) {
Log.trace("Skipping write " + data.key() + " - dependency " + dep.get().version() + " vs " + snapshotId);
return false;

View File

@@ -14,6 +14,7 @@ public interface Transaction extends TransactionHandle {
<T extends JData> Optional<T> get(Class<T> type, JObjectKey key, LockingStrategy strategy);
<T extends JData> void put(JData obj);
<T extends JData> void putNew(JData obj);
void delete(JObjectKey key);

View File

@@ -57,6 +57,7 @@ public class TransactionFactoryImpl implements TransactionFactory {
private class TransactionImpl implements TransactionPrivate {
private final Map<JObjectKey, TransactionObject<?>> _readSet = new HashMap<>();
private final NavigableMap<JObjectKey, TxRecord.TxObjectRecord<?>> _writes = new TreeMap<>();
private final HashSet<JObjectKey> _totallyNew = new HashSet<>();
private final List<Runnable> _onCommit = new ArrayList<>();
private final List<Runnable> _onFlush = new ArrayList<>();
private final Snapshot<JObjectKey, JDataVersionedWrapper> _snapshot;
@@ -97,8 +98,13 @@ public class TransactionFactoryImpl implements TransactionFactory {
var got = _readSet.get(key);
if (got == null) {
if (_totallyNew.contains(key)) {
return Optional.empty();
}
var read = _snapshot.readObject(key);
_readSet.put(key, new TransactionObjectNoLock<>(read));
// Log.infov("Read object {0} from source, type {1}", key, type);
return read.map(JDataVersionedWrapper::data).map(type::cast);
}
@@ -175,8 +181,15 @@ public class TransactionFactoryImpl implements TransactionFactory {
@Override
public void put(JData obj) {
_writes.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWrite<>(obj));
_writes.put(obj.key(), new TxRecord.TxObjectRecordWriteChecked<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordWriteChecked<>(obj));
}
@Override
public <T extends JData> void putNew(JData obj) {
_writes.put(obj.key(), new TxRecord.TxObjectRecordNewWrite<>(obj));
_newWrites.put(obj.key(), new TxRecord.TxObjectRecordNewWrite<>(obj));
_totallyNew.add(obj.key());
}
@Override

View File

@@ -8,7 +8,18 @@ public class TxRecord {
JObjectKey key();
}
public record TxObjectRecordWrite<T extends JData>(JData data) implements TxObjectRecord<T> {
public interface TxObjectRecordWrite<T> extends TxObjectRecord<T> {
JData data();
}
public record TxObjectRecordWriteChecked<T extends JData>(JData data) implements TxObjectRecordWrite<T> {
@Override
public JObjectKey key() {
return data.key();
}
}
public record TxObjectRecordNewWrite<T extends JData>(JData data) implements TxObjectRecordWrite<T> {
@Override
public JObjectKey key() {
return data.key();
@@ -17,4 +28,5 @@ public class TxRecord {
public record TxObjectRecordDeleted(JObjectKey key) implements TxObjectRecord<JData> {
}
}

View File

@@ -1,7 +1,7 @@
dhfs.objects.persistence=lmdb
dhfs.objects.writeback.limit=134217728
dhfs.objects.lru.limit=134217728
dhfs.objects.lru.print-stats=true
dhfs.objects.lru.print-stats=false
dhfs.objects.lock_timeout_secs=15
dhfs.objects.persistence.files.root=${HOME}/dhfs_default/data/objs
dhfs.objects.persistence.snapshot-extra-checks=false

View File

@@ -1,9 +1,9 @@
package com.usatiuk.dhfs;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.dhfs.repository.PersistentPeerDataService;
import com.usatiuk.dhfs.repository.RemoteObjectServiceClient;
import com.usatiuk.dhfs.repository.SyncHandler;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log;
@@ -123,6 +123,11 @@ public class RemoteTransaction {
curTx.put(newData);
}
public <T extends JDataRemote> void putDataNew(T obj) {
curTx.putNew(new RemoteObjectMeta(obj, persistentPeerDataService.getSelfUuid()));
curTx.putNew(new RemoteObjectDataWrapper<>(obj));
}
public Optional<RemoteObjectMeta> getMeta(JObjectKey key) {
return getMeta(key, LockingStrategy.OPTIMISTIC);
}

View File

@@ -7,10 +7,6 @@ import com.usatiuk.dhfs.RemoteObjectMeta;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.files.objects.ChunkData;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyImpl;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.dhfs.jkleppmanntree.JKleppmannTreeManager;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNode;
import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMeta;
@@ -19,10 +15,13 @@ import com.usatiuk.dhfs.jkleppmanntree.structs.JKleppmannTreeNodeMetaFile;
import com.usatiuk.dhfs.jmap.JMapEntry;
import com.usatiuk.dhfs.jmap.JMapHelper;
import com.usatiuk.dhfs.jmap.JMapLongKey;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
import com.usatiuk.objects.JData;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.iterators.IteratorStart;
import com.usatiuk.objects.transaction.LockingStrategy;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.dhfs.utils.StatusRuntimeExceptionNoStacktrace;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -48,21 +47,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@Inject
TransactionManager jObjectTxManager;
@ConfigProperty(name = "dhfs.files.target_chunk_alignment")
int targetChunkAlignment;
@ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize;
@ConfigProperty(name = "dhfs.files.write_merge_threshold")
float writeMergeThreshold;
@ConfigProperty(name = "dhfs.files.write_merge_max_chunk_to_take")
float writeMergeMaxChunkToTake;
@ConfigProperty(name = "dhfs.files.write_merge_limit")
float writeMergeLimit;
@ConfigProperty(name = "dhfs.files.write_last_chunk_limit")
float writeLastChunkLimit;
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
boolean useHashForChunks;
@@ -91,7 +81,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
private ChunkData createChunk(ByteString bytes) {
var newChunk = new ChunkData(JObjectKey.of(UUID.randomUUID().toString()), bytes);
remoteTx.putData(newChunk);
remoteTx.putDataNew(newChunk);
return newChunk;
}
@@ -356,22 +346,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
return readChunk(uuid).size();
}
private void cleanupChunks(File f, Collection<JObjectKey> uuids) {
// FIXME:
// var inFile = useHashForChunks ? new HashSet<>(f.getChunks().values()) : Collections.emptySet();
// for (var cuuid : uuids) {
// try {
// if (inFile.contains(cuuid)) continue;
// jObjectManager.get(cuuid)
// .ifPresent(jObject -> jObject.runWriteLocked(JObjectManager.ResolutionStrategy.NO_RESOLUTION,
// (m, d, b, v) -> {
// m.removeRef(f.getName());
// return null;
// }));
// } catch (Exception e) {
// Log.error("Error when cleaning chunk " + cuuid, e);
// }
// }
private long alignDown(long num, long n) {
return num & -(1L << n);
}
@Override
@@ -380,7 +356,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
if (offset < 0)
throw new StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("Offset should be more than zero: " + offset));
// FIXME:
var file = remoteTx.getData(File.class, fileUuid, LockingStrategy.WRITE).orElse(null);
if (file == null) {
Log.error("File not found when trying to write: " + fileUuid);
@@ -397,144 +372,58 @@ public class DhfsFileServiceImpl implements DhfsFileService {
file = remoteTx.getData(File.class, fileUuid).orElse(null);
}
Pair<JMapLongKey, JMapEntry<JMapLongKey>> first;
Pair<JMapLongKey, JMapEntry<JMapLongKey>> last;
Log.tracev("Getting last");
try (var it = jMapHelper.getIterator(file, IteratorStart.LT, JMapLongKey.of(offset + data.size()))) {
last = it.hasNext() ? it.next() : null;
Log.tracev("Last: {0}", last);
}
NavigableMap<Long, JObjectKey> removedChunks = new TreeMap<>();
long start = 0;
long realOffset = targetChunkAlignment >= 0 ? alignDown(offset, targetChunkAlignment) : offset;
long writeEnd = offset + data.size();
long start = realOffset;
ByteString pendingPrefix = ByteString.empty();
ByteString pendingSuffix = ByteString.empty();
try (var it = jMapHelper.getIterator(file, IteratorStart.LE, JMapLongKey.of(offset))) {
first = it.hasNext() ? it.next() : null;
Log.tracev("First: {0}", first);
boolean empty = last == null;
if (first != null && getChunkSize(first.getValue().ref()) + first.getKey().key() <= offset) {
first = null;
last = null;
start = offset;
} else if (!empty) {
assert first != null;
removedChunks.put(first.getKey().key(), first.getValue().ref());
while (it.hasNext() && it.peekNextKey().compareTo(last.getKey()) <= 0) {
var next = it.next();
Log.tracev("Next: {0}", next);
removedChunks.put(next.getKey().key(), next.getValue().ref());
try (var it = jMapHelper.getIterator(file, IteratorStart.LE, JMapLongKey.of(realOffset))) {
while (it.hasNext()) {
var curEntry = it.next();
long curChunkStart = curEntry.getKey().key();
var curChunkId = curEntry.getValue().ref();
long curChunkEnd = curChunkStart + getChunkSize(curChunkId);
if (curChunkEnd <= realOffset) break;
removedChunks.put(curEntry.getKey().key(), curChunkId);
if (curChunkStart < offset) {
if (curChunkStart < start)
start = curChunkStart;
var readChunk = readChunk(curChunkId);
pendingPrefix = pendingPrefix.concat(readChunk.substring(0, Math.min(readChunk.size(), (int) (offset - curChunkStart))));
}
removedChunks.put(last.getKey().key(), last.getValue().ref());
start = first.getKey().key();
if (curChunkEnd > writeEnd) {
var readChunk = readChunk(curChunkId);
pendingSuffix = pendingSuffix.concat(readChunk.substring((int) (writeEnd - curChunkStart), readChunk.size()));
}
if (curChunkEnd >= writeEnd) break;
}
}
// NavigableMap<Long, JObjectKey> beforeFirst = first != null ? chunksAll.headMap(first.getKey(), false) : Collections.emptyNavigableMap();
// NavigableMap<Long, JObjectKey> afterLast = last != null ? chunksAll.tailMap(last.getKey(), false) : Collections.emptyNavigableMap();
// if (first != null && (getChunkSize(first.getValue()) + first.getKey() <= offset)) {
// beforeFirst = chunksAll;
// afterLast = Collections.emptyNavigableMap();
// first = null;
// last = null;
// start = offset;
// } else if (!chunksAll.isEmpty()) {
// var between = chunksAll.subMap(first.getKey(), true, last.getKey(), true);
// removedChunks.putAll(between);
// start = first.getKey();
// }
ByteString pendingWrites = ByteString.empty();
if (first != null && first.getKey().key() < offset) {
var chunkBytes = readChunk(first.getValue().ref());
pendingWrites = pendingWrites.concat(chunkBytes.substring(0, (int) (offset - first.getKey().key())));
}
pendingWrites = pendingWrites.concat(data);
if (last != null) {
var lchunkBytes = readChunk(last.getValue().ref());
if (last.getKey().key() + lchunkBytes.size() > offset + data.size()) {
var startInFile = offset + data.size();
var startInChunk = startInFile - last.getKey().key();
pendingWrites = pendingWrites.concat(lchunkBytes.substring((int) startInChunk, lchunkBytes.size()));
}
}
ByteString pendingWrites = pendingPrefix.concat(data).concat(pendingSuffix);
int combinedSize = pendingWrites.size();
if (targetChunkSize > 0) {
// if (combinedSize < (targetChunkSize * writeMergeThreshold)) {
// boolean leftDone = false;
// boolean rightDone = false;
// while (!leftDone && !rightDone) {
// if (beforeFirst.isEmpty()) leftDone = true;
// if (!beforeFirst.isEmpty() || !leftDone) {
// var takeLeft = beforeFirst.lastEntry();
//
// var cuuid = takeLeft.getValue();
//
// if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
// leftDone = true;
// continue;
// }
//
// if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
// leftDone = true;
// continue;
// }
//
// // FIXME: (and test this)
// beforeFirst = beforeFirst.headMap(takeLeft.getKey(), false);
// start = takeLeft.getKey();
// pendingWrites = readChunk(cuuid).concat(pendingWrites);
// combinedSize += getChunkSize(cuuid);
// removedChunks.put(takeLeft.getKey(), takeLeft.getValue());
// }
// if (afterLast.isEmpty()) rightDone = true;
// if (!afterLast.isEmpty() && !rightDone) {
// var takeRight = afterLast.firstEntry();
//
// var cuuid = takeRight.getValue();
//
// if (getChunkSize(cuuid) >= (targetChunkSize * writeMergeMaxChunkToTake)) {
// rightDone = true;
// continue;
// }
//
// if ((combinedSize + getChunkSize(cuuid)) > (targetChunkSize * writeMergeLimit)) {
// rightDone = true;
// continue;
// }
//
// // FIXME: (and test this)
// afterLast = afterLast.tailMap(takeRight.getKey(), false);
// pendingWrites = pendingWrites.concat(readChunk(cuuid));
// combinedSize += getChunkSize(cuuid);
// removedChunks.put(takeRight.getKey(), takeRight.getValue());
// }
// }
// }
}
NavigableMap<Long, JObjectKey> newChunks = new TreeMap<>();
{
int targetChunkSize = 1 << targetChunkAlignment;
int cur = 0;
while (cur < combinedSize) {
int end;
if (targetChunkSize <= 0)
if (targetChunkAlignment < 0)
end = combinedSize;
else {
if ((combinedSize - cur) > (targetChunkSize * writeLastChunkLimit)) {
end = Math.min(cur + targetChunkSize, combinedSize);
} else {
end = combinedSize;
}
}
else
end = Math.min(cur + targetChunkSize, combinedSize);
var thisChunk = pendingWrites.substring(cur, end);
@@ -557,7 +446,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
remoteTx.putData(file);
cleanupChunks(file, removedChunks.values());
return (long) data.size();
});
@@ -576,17 +464,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
if (length == 0) {
try (var it = jMapHelper.getIterator(file, IteratorStart.GE, JMapLongKey.of(0))) {
while (it.hasNext()) {
var next = it.next();
jMapHelper.delete(file, next.getKey());
}
}
// var oldChunks = file.chunks();
//
// file = file.withChunks(TreePMap.empty()).withMTime(System.currentTimeMillis());
jMapHelper.deleteAll(file);
remoteTx.putData(file);
// cleanupChunks(file, oldChunks.values());
return true;
}
@@ -689,7 +568,6 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
remoteTx.putData(file);
cleanupChunks(file, removedChunks.values());
return true;
});
}

View File

@@ -7,6 +7,7 @@ import com.usatiuk.dhfs.files.service.DirectoryNotEmptyException;
import com.usatiuk.dhfs.files.service.GetattrRes;
import com.usatiuk.dhfs.supportlib.UninitializedByteBuffer;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.quarkus.logging.Log;
@@ -30,6 +31,8 @@ import ru.serce.jnrfuse.struct.Timespec;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static jnr.posix.FileStat.*;
@@ -50,6 +53,24 @@ public class DhfsFuse extends FuseStubFS {
@Inject
DhfsFileService fileService;
private final ConcurrentHashMap<Long, JObjectKey> _openHandles = new ConcurrentHashMap<>();
private final AtomicLong _fh = new AtomicLong(1);
private long allocateHandle(JObjectKey key) {
while (true) {
var newFh = _fh.getAndIncrement();
if (newFh == 0) continue;
if (_openHandles.putIfAbsent(newFh, key) == null) {
return newFh;
}
}
}
private JObjectKey getFromHandle(long handle) {
assert handle != 0;
return _openHandles.get(handle);
}
void init(@Observes @Priority(100000) StartupEvent event) {
if (!enabled) return;
Paths.get(root).toFile().mkdirs();
@@ -174,7 +195,9 @@ public class DhfsFuse extends FuseStubFS {
@Override
public int open(String path, FuseFileInfo fi) {
try {
if (fileService.open(path).isEmpty()) return -ErrorCodes.ENOENT();
var opened = fileService.open(path);
if (opened.isEmpty()) return -ErrorCodes.ENOENT();
fi.fh.set(allocateHandle(opened.get()));
return 0;
} catch (Throwable e) {
Log.error("When open " + path, e);
@@ -182,15 +205,20 @@ public class DhfsFuse extends FuseStubFS {
}
}
@Override
public int release(String path, FuseFileInfo fi) {
assert fi.fh.get() != 0;
_openHandles.remove(fi.fh.get());
return 0;
}
@Override
public int read(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
if (size < 0) return -ErrorCodes.EINVAL();
if (offset < 0) return -ErrorCodes.EINVAL();
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var file = fileOpt.get();
var read = fileService.read(fileOpt.get(), offset, (int) size);
var fileKey = getFromHandle(fi.fh.get());
var read = fileService.read(fileKey, offset, (int) size);
if (read.isEmpty()) return 0;
UnsafeByteOperations.unsafeWriteTo(read.get(), new JnrPtrByteOutput(jnrPtrByteOutputAccessors, buf, size));
return read.get().size();
@@ -204,8 +232,7 @@ public class DhfsFuse extends FuseStubFS {
public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
if (offset < 0) return -ErrorCodes.EINVAL();
try {
var fileOpt = fileService.open(path);
if (fileOpt.isEmpty()) return -ErrorCodes.ENOENT();
var fileKey = getFromHandle(fi.fh.get());
var buffer = UninitializedByteBuffer.allocateUninitialized((int) size);
if (buffer.isDirect()) {
@@ -218,7 +245,7 @@ public class DhfsFuse extends FuseStubFS {
buf.get(0, buffer.array(), 0, (int) size);
}
var written = fileService.write(fileOpt.get(), offset, UnsafeByteOperations.unsafeWrap(buffer));
var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer));
return written.intValue();
} catch (Throwable e) {
Log.error("When writing " + path, e);
@@ -231,7 +258,8 @@ public class DhfsFuse extends FuseStubFS {
try {
var ret = fileService.create(path, mode);
if (ret.isEmpty()) return -ErrorCodes.ENOSPC();
else return 0;
fi.fh.set(allocateHandle(ret.get()));
return 0;
} catch (Throwable e) {
Log.error("When creating " + path, e);
return -ErrorCodes.EIO();

View File

@@ -17,23 +17,27 @@ public class JMapHelper {
Transaction curTx;
static <K extends JMapKey> JObjectKey makePrefix(JObjectKey holder) {
return JObjectKey.of(holder.name() + "/");
return JObjectKey.of(holder.name() + "=");
}
static <K extends JMapKey> JObjectKey makeKeyFirst(JObjectKey holder) {
return JObjectKey.of(holder.name() + "<");
}
static <K extends JMapKey> JObjectKey makeKey(JObjectKey holder, K key) {
return JObjectKey.of(makePrefix(holder).name() + key.toString());
}
static <K extends JMapKey> JObjectKey makeKeyLast(JObjectKey holder) {
return JObjectKey.of(holder.name() + ">");
}
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, IteratorStart start, K key) {
return new JMapIterator<>(curTx.getIterator(start, makeKey(holder.key(), key)), holder);
}
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder, K key) {
return getIterator(holder, IteratorStart.GE, key);
}
public <K extends JMapKey> CloseableKvIterator<K, JMapEntry<K>> getIterator(JMapHolder<K> holder) {
return new JMapIterator<>(curTx.getIterator(IteratorStart.GE, makePrefix(holder.key())), holder);
return new JMapIterator<>(curTx.getIterator(IteratorStart.GT, makeKeyFirst(holder.key())), holder);
}
public <K extends JMapKey> void put(JMapHolder<K> holder, K key, JObjectKey ref) {

View File

@@ -1,5 +1,7 @@
package com.usatiuk.dhfs.jmap;
import org.apache.commons.lang3.StringUtils;
import javax.annotation.Nonnull;
import java.io.Serializable;
@@ -14,7 +16,7 @@ public record JMapLongKey(long key) implements JMapKey, Comparable<JMapKey>, Ser
@Override
public String toString() {
return String.format("%016d", key);
return StringUtils.leftPad(String.valueOf(key), 20, '0');
}
@Override

View File

@@ -4,7 +4,7 @@ option java_multiple_files = true;
option java_package = "com.usatiuk.dhfs.repository.peerdiscovery";
option java_outer_classname = "DhfsObjectPeerDiscoveryApi";
package dhfs.objects.peerdiscovery;
package dhfs.peerdiscovery;
message PeerDiscoveryInfo {
string uuid = 1;

View File

@@ -4,7 +4,7 @@ option java_multiple_files = true;
option java_package = "com.usatiuk.dhfs.persistence";
option java_outer_classname = "DhfsObjectPersistence";
package dhfs.objects.persistence;
package dhfs.persistence;
message JObjectKeyP {
string name = 1;

View File

@@ -1,12 +1,12 @@
syntax = "proto3";
import "dhfs_objects_serial.proto";
import "dhfs_serial.proto";
option java_multiple_files = true;
option java_package = "com.usatiuk.dhfs.repository";
option java_outer_classname = "DhfsObjectSyncApi";
package dhfs.objects.sync;
package dhfs.sync;
service DhfsObjectSyncGrpc {
rpc OpPush (OpPushRequest) returns (OpPushReply) {}
@@ -22,22 +22,22 @@ message PingRequest {}
message PingReply {}
message GetObjectRequest {
dhfs.objects.persistence.JObjectKeyP name = 2;
dhfs.persistence.JObjectKeyP name = 2;
}
message GetObjectReply {
dhfs.objects.persistence.ObjectChangelog changelog = 5;
dhfs.objects.persistence.JDataRemoteDtoP pushedData = 6;
dhfs.persistence.ObjectChangelog changelog = 5;
dhfs.persistence.JDataRemoteDtoP pushedData = 6;
}
message CanDeleteRequest {
dhfs.objects.persistence.JObjectKeyP name = 2;
repeated dhfs.objects.persistence.JObjectKeyP ourReferrers = 3;
dhfs.persistence.JObjectKeyP name = 2;
repeated dhfs.persistence.JObjectKeyP ourReferrers = 3;
}
message CanDeleteReply {
bool deletionCandidate = 2;
repeated dhfs.objects.persistence.JObjectKeyP referrers = 3;
repeated dhfs.persistence.JObjectKeyP referrers = 3;
}
message OpPushRequest {

View File

@@ -15,13 +15,7 @@ dhfs.fuse.debug=false
dhfs.fuse.enabled=true
dhfs.files.allow_recursive_delete=false
dhfs.files.target_chunk_size=2097152
# Writes strictly smaller than this will try to merge with blocks nearby
dhfs.files.write_merge_threshold=0.8
# If a merge would result in a block of greater size than this, stop merging
dhfs.files.write_merge_limit=1.2
# Don't take blocks of this size and above when merging
dhfs.files.write_merge_max_chunk_to_take=1
dhfs.files.write_last_chunk_limit=1.5
dhfs.files.target_chunk_alignment=19
dhfs.objects.deletion.delay=1000
dhfs.objects.deletion.can-delete-retry-delay=10000
dhfs.objects.ref_verification=true

View File

@@ -1,12 +1,12 @@
package com.usatiuk.dhfs.files;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.dhfs.TempDataProfile;
import com.usatiuk.dhfs.files.objects.File;
import com.usatiuk.dhfs.files.service.DhfsFileService;
import com.usatiuk.dhfs.RemoteTransaction;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.transaction.Transaction;
import com.usatiuk.objects.transaction.TransactionManager;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.RepeatedTest;
@@ -27,6 +27,7 @@ class Profiles {
protected void getConfigOverrides(Map<String, String> ret) {
ret.put("dhfs.fuse.enabled", "false");
ret.put("dhfs.files.target_chunk_size", "-1");
ret.put("dhfs.files.target_chunk_alignment", "-1");
}
}
@@ -35,6 +36,7 @@ class Profiles {
protected void getConfigOverrides(Map<String, String> ret) {
ret.put("dhfs.fuse.enabled", "false");
ret.put("dhfs.files.target_chunk_size", "3");
ret.put("dhfs.files.target_chunk_alignment", "2");
}
}
}
@@ -150,6 +152,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
fileService.truncate(uuid, 20);
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray());
fileService.write(uuid, 5, new byte[]{10, 11, 12, 13, 14, 15, 16, 17});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 10, 11, 12, 13, 14, 15, 16, 17, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray());
}