6 Commits

14 changed files with 129 additions and 97 deletions

View File

@@ -367,16 +367,12 @@ public class DhfsFileServiceImpl implements DhfsFileService {
+ offset + " " + data.size());
}
if (size(fileUuid) < offset) {
truncate(fileUuid, offset);
file = remoteTx.getData(File.class, fileUuid).orElse(null);
}
NavigableMap<Long, JObjectKey> removedChunks = new TreeMap<>();
long realOffset = targetChunkAlignment >= 0 ? alignDown(offset, targetChunkAlignment) : offset;
long writeEnd = offset + data.size();
long start = realOffset;
long existingEnd = 0;
ByteString pendingPrefix = ByteString.empty();
ByteString pendingSuffix = ByteString.empty();
@@ -385,8 +381,8 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var curEntry = it.next();
long curChunkStart = curEntry.getKey().key();
var curChunkId = curEntry.getValue().ref();
long curChunkEnd = curChunkStart + getChunkSize(curChunkId);
long curChunkEnd = it.hasNext() ? it.peekNextKey().key() : curChunkStart + getChunkSize(curChunkId);
existingEnd = curChunkEnd;
if (curChunkEnd <= realOffset) break;
removedChunks.put(curEntry.getKey().key(), curChunkId);
@@ -408,12 +404,23 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
}
NavigableMap<Long, JObjectKey> newChunks = new TreeMap<>();
if (existingEnd < offset) {
if (!pendingPrefix.isEmpty()) {
int diff = Math.toIntExact(offset - existingEnd);
pendingPrefix = pendingPrefix.concat(ByteString.copyFrom(new byte[diff]));
} else {
fillZeros(existingEnd, offset, newChunks);
start = offset;
}
}
ByteString pendingWrites = pendingPrefix.concat(data).concat(pendingSuffix);
int combinedSize = pendingWrites.size();
NavigableMap<Long, JObjectKey> newChunks = new TreeMap<>();
{
int targetChunkSize = 1 << targetChunkAlignment;
int cur = 0;
@@ -476,38 +483,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
NavigableMap<Long, JObjectKey> newChunks = new TreeMap<>();
if (curSize < length) {
long combinedSize = (length - curSize);
long start = curSize;
// Hack
HashMap<Long, ChunkData> zeroCache = new HashMap<>();
{
long cur = 0;
while (cur < combinedSize) {
long end;
if (targetChunkSize <= 0)
end = combinedSize;
else {
if ((combinedSize - cur) > (targetChunkSize * 1.5)) {
end = cur + targetChunkSize;
} else {
end = combinedSize;
}
}
if (!zeroCache.containsKey(end - cur))
zeroCache.put(end - cur, createChunk(UnsafeByteOperations.unsafeWrap(new byte[Math.toIntExact(end - cur)])));
ChunkData newChunkData = zeroCache.get(end - cur);
newChunks.put(start, newChunkData.key());
start += newChunkData.data().size();
cur = end;
}
}
fillZeros(curSize, length, newChunks);
} else {
// Pair<JMapLongKey, JMapEntry<JMapLongKey>> first;
Pair<JMapLongKey, JMapEntry<JMapLongKey>> last;
@@ -572,6 +548,41 @@ public class DhfsFileServiceImpl implements DhfsFileService {
});
}
private void fillZeros(long fillStart, long length, NavigableMap<Long, JObjectKey> newChunks) {
long combinedSize = (length - fillStart);
long start = fillStart;
// Hack
HashMap<Long, ChunkData> zeroCache = new HashMap<>();
{
long cur = 0;
while (cur < combinedSize) {
long end;
if (targetChunkSize <= 0)
end = combinedSize;
else {
if ((combinedSize - cur) > (targetChunkSize * 1.5)) {
end = cur + targetChunkSize;
} else {
end = combinedSize;
}
}
if (!zeroCache.containsKey(end - cur))
zeroCache.put(end - cur, createChunk(UnsafeByteOperations.unsafeWrap(new byte[Math.toIntExact(end - cur)])));
ChunkData newChunkData = zeroCache.get(end - cur);
newChunks.put(start, newChunkData.key());
start += newChunkData.data().size();
cur = end;
}
}
}
@Override
public String readlink(JObjectKey uuid) {
return jObjectTxManager.executeTx(() -> {

View File

@@ -169,6 +169,7 @@ public abstract class DhfsFileServiceSimpleTestImpl {
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, fileService.read(uuid, 0, 10).get().toByteArray());
fileService.truncate(uuid, 20);
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, fileService.read(uuid, 0, 20).get().toByteArray());
fileService.write(uuid, 10, new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20});
Assertions.assertArrayEquals(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, fileService.read(uuid, 0, 20).get().toByteArray());
} finally {

View File

@@ -1,6 +1,7 @@
package com.usatiuk.objects.stores;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyMax;
import com.usatiuk.objects.JObjectKeyMin;
@@ -128,7 +129,12 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
public Optional<ByteString> readObject(JObjectKey name) {
assert !_closed;
var got = _db.get(_txn.get(), name.toByteBuffer());
var ret = Optional.ofNullable(got).map(ByteString::copyFrom);
var ret = Optional.ofNullable(got).map(read -> {
var uninitBb = UninitializedByteBuffer.allocateUninitialized(got.remaining());
uninitBb.put(got);
uninitBb.flip();
return UnsafeByteOperations.unsafeWrap(uninitBb);
});
return ret;
}
@@ -153,11 +159,8 @@ public class LmdbObjectPersistentStore implements ObjectPersistentStore {
var txn = _env.txnWrite();
try {
for (var written : names.written()) {
// TODO:
var bb = UninitializedByteBuffer.allocateUninitialized(written.getValue().size());
bb.put(written.getValue().asReadOnlyByteBuffer());
bb.flip();
_db.put(txn, written.getKey().toByteBuffer(), bb);
var putBb = _db.reserve(txn, written.getKey().toByteBuffer(), written.getValue().size());
written.getValue().copyTo(putBb);
}
for (JObjectKey key : names.deleted()) {
_db.delete(txn, key.toByteBuffer());

View File

@@ -3,7 +3,6 @@ package com.usatiuk.objects.stores;
import com.usatiuk.objects.JDataVersionedWrapper;
import com.usatiuk.objects.JDataVersionedWrapperImpl;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.objects.JObjectKeyImpl;
import com.usatiuk.objects.iterators.*;
import com.usatiuk.objects.snapshot.Snapshot;
import com.usatiuk.objects.transaction.TxCommitException;
@@ -115,10 +114,10 @@ public class WritebackObjectPersistentStore {
long diff = 0;
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
diff -= toCompress.size();
bundle.compress(toCompress);
}
diff += bundle.calculateTotalSize();
diff += bundle.size();
synchronized (_flushWaitSynchronizer) {
currentSize += diff;
}
@@ -130,11 +129,11 @@ public class WritebackObjectPersistentStore {
for (var e : bundle._entries.values()) {
switch (e) {
case TxBundle.CommittedEntry(JObjectKey key, JDataVersionedWrapper data, int size) -> {
Log.trace("Writing new " + key);
Log.tracev("Writing new {0}", key);
toWrite.add(Pair.of(key, data));
}
case TxBundle.DeletedEntry(JObjectKey key) -> {
Log.trace("Deleting from persistent storage " + key);
Log.tracev("Deleting from persistent storage {0}", key);
toDelete.add(key);
}
default -> throw new IllegalStateException("Unexpected value: " + e);
@@ -145,21 +144,21 @@ public class WritebackObjectPersistentStore {
new TxManifestObj<>(
Collections.unmodifiableList(toWrite),
Collections.unmodifiableList(toDelete)
), bundle.getId());
), bundle.id());
Log.trace("Bundle " + bundle.getId() + " committed");
Log.tracev("Bundle {0} committed", bundle.id());
while (true) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : bundle._entries.values()) {
var cur = curPwMap.get(e.key());
if (cur.bundleId() <= bundle.getId())
if (cur.bundleId() <= bundle.id())
curPwMap = curPwMap.minus(e.key());
}
var newCurPw = new PendingWriteData(
curPwMap,
bundle.getId(),
bundle.id(),
curPw.lastCommittedId()
);
if (_pendingWrites.compareAndSet(curPw, newCurPw))
@@ -168,15 +167,15 @@ public class WritebackObjectPersistentStore {
List<List<Runnable>> callbacks = new ArrayList<>();
synchronized (_notFlushedBundles) {
_lastWrittenId.set(bundle.getId());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.getId()) {
_lastWrittenId.set(bundle.id());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.id()) {
callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
}
}
callbacks.forEach(l -> l.forEach(Runnable::run));
synchronized (_flushWaitSynchronizer) {
currentSize -= bundle.calculateTotalSize();
currentSize -= bundle.size();
// FIXME:
if (currentSize <= sizeLimit || !_ready)
_flushWaitSynchronizer.notifyAll();
@@ -208,7 +207,7 @@ public class WritebackObjectPersistentStore {
long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled())
Log.trace("Thread " + Thread.currentThread().getName() + " waited for tx bundle for " + waited + " ms");
Log.tracev("Thread {0} waited for tx bundle for {1} ms", Thread.currentThread().getName(), waited);
wait = false;
}
}
@@ -219,13 +218,13 @@ public class WritebackObjectPersistentStore {
if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var target = _pendingBundles.poll();
long diff = -target.calculateTotalSize();
long diff = -target.size();
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.calculateTotalSize();
diff -= toCompress.size();
target.compress(toCompress);
}
diff += target.calculateTotalSize();
diff += target.size();
currentSize += diff;
_pendingBundles.addFirst(target);
}
@@ -241,17 +240,17 @@ public class WritebackObjectPersistentStore {
synchronized (_notFlushedBundles) {
bundle = new TxBundle(_lastCommittedId.incrementAndGet());
_pendingBundles.addLast(bundle);
_notFlushedBundles.put(bundle.getId(), bundle);
_notFlushedBundles.put(bundle.id(), bundle);
}
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.trace("Flushing object " + write.key());
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.getId()));
Log.tracev("Flushing object {0}", write.key());
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.id()));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.trace("Deleting object " + deleted.key());
Log.tracev("Deleting object {0}", deleted.key());
bundle.delete(deleted.key());
}
default -> {
@@ -266,10 +265,10 @@ public class WritebackObjectPersistentStore {
for (var e : ((TxBundle) bundle)._entries.values()) {
switch (e) {
case TxBundle.CommittedEntry c -> {
curPwMap = curPwMap.plus(c.key(), new PendingWrite(c.data, bundle.getId()));
curPwMap = curPwMap.plus(c.key(), new PendingWrite(c.data, bundle.id()));
}
case TxBundle.DeletedEntry d -> {
curPwMap = curPwMap.plus(d.key(), new PendingDelete(d.key, bundle.getId()));
curPwMap = curPwMap.plus(d.key(), new PendingDelete(d.key, bundle.id()));
}
default -> throw new IllegalStateException("Unexpected value: " + e);
}
@@ -278,7 +277,7 @@ public class WritebackObjectPersistentStore {
var newCurPw = new PendingWriteData(
curPwMap,
curPw.lastFlushedId(),
bundle.getId()
bundle.id()
);
if (!_pendingWrites.compareAndSet(curPw, newCurPw))
@@ -288,10 +287,10 @@ public class WritebackObjectPersistentStore {
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundle) bundle).calculateTotalSize();
currentSize += ((TxBundle) bundle).size();
}
return bundle.getId();
return bundle.id();
}
}
}
@@ -404,14 +403,14 @@ public class WritebackObjectPersistentStore {
private final ArrayList<Runnable> _callbacks = new ArrayList<>();
private long _txId;
private volatile boolean _ready = false;
private long _size = -1;
private long _size = 0;
private boolean _wasCommitted = false;
private TxBundle(long txId) {
_txId = txId;
}
public long getId() {
public long id() {
return _txId;
}
@@ -433,21 +432,23 @@ public class WritebackObjectPersistentStore {
}
}
public void commit(JDataVersionedWrapper obj) {
synchronized (_entries) {
_entries.put(obj.data().key(), new CommittedEntry(obj.data().key(), obj, obj.data().estimateSize()));
private void putEntry(BundleEntry entry) {
var old = _entries.put(entry.key(), entry);
if (old != null) {
_size -= old.size();
}
_size += entry.size();
}
public void commit(JDataVersionedWrapper obj) {
putEntry(new CommittedEntry(obj.data().key(), obj, obj.data().estimateSize()));
}
public void delete(JObjectKey obj) {
synchronized (_entries) {
_entries.put(obj, new DeletedEntry(obj));
}
putEntry(new DeletedEntry(obj));
}
public long calculateTotalSize() {
if (_size >= 0) return _size;
_size = _entries.values().stream().mapToInt(BundleEntry::size).sum();
public long size() {
return _size;
}
@@ -456,9 +457,16 @@ public class WritebackObjectPersistentStore {
throw new IllegalArgumentException("Compressing an older bundle into newer");
_txId = other._txId;
_size = -1;
_entries.putAll(other._entries);
for (var entry : other._entries.values()) {
putEntry(entry);
}
synchronized (_callbacks) {
assert !_wasCommitted;
assert !other._wasCommitted;
_callbacks.addAll(other._callbacks);
}
}
private interface BundleEntry {

View File

@@ -6,10 +6,11 @@ import com.usatiuk.objects.iterators.CloseableKvIterator;
import com.usatiuk.objects.iterators.IteratorStart;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.util.Optional;
@ApplicationScoped
@Singleton
public class CurrentTransaction implements Transaction {
@Inject
TransactionManager transactionManager;

View File

@@ -193,7 +193,7 @@ public class JObjectManager {
var dep = dependenciesLocked.get(read.getKey());
if (dep.isEmpty() != read.getValue().data().isEmpty()) {
Log.trace("Checking read dependency " + read.getKey() + " - not found");
Log.tracev("Checking read dependency {0} - not found", read.getKey());
throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().data().isEmpty());
}
@@ -205,11 +205,11 @@ public class JObjectManager {
}
if (dep.get().version() > snapshotId) {
Log.trace("Checking dependency " + read.getKey() + " - newer than");
Log.tracev("Checking dependency {0} - newer than", read.getKey());
throw new TxCommitException("Serialization hazard: " + dep.get().data().key() + " " + dep.get().version() + " vs " + snapshotId);
}
Log.trace("Checking dependency " + read.getKey() + " - ok with read");
Log.tracev("Checking dependency {0} - ok with read", read.getKey());
}
} else {
Log.tracev("Skipped dependency checks: no changes");

View File

@@ -3,11 +3,12 @@ package com.usatiuk.objects.transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.commons.lang3.tuple.Pair;
import java.util.Collection;
@ApplicationScoped
@Singleton
public class TransactionManagerImpl implements TransactionManager {
private static final ThreadLocal<TransactionPrivate> _currentTransaction = new ThreadLocal<>();
@Inject

View File

@@ -7,8 +7,9 @@ import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@ApplicationScoped
@Singleton
public class DeleterTxHook implements PreCommitTxHook {
@Inject
Transaction curTx;

View File

@@ -7,8 +7,9 @@ import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@ApplicationScoped
@Singleton
public class RefcounterTxHook implements PreCommitTxHook {
@Inject
Transaction curTx;

View File

@@ -8,8 +8,9 @@ import com.usatiuk.objects.transaction.PreCommitTxHook;
import com.usatiuk.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@ApplicationScoped
@Singleton
public class RemoteObjPusherTxHook implements PreCommitTxHook {
@Inject
Transaction curTx;

View File

@@ -9,8 +9,9 @@ import com.usatiuk.objects.transaction.PreCommitTxHook;
import com.usatiuk.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@ApplicationScoped
@Singleton
public class JMapHolderRefcounterTxHook implements PreCommitTxHook {
@Inject
Transaction curTx;

View File

@@ -9,8 +9,9 @@ import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@ApplicationScoped
@Singleton
public class JMapRefcounterTxHook implements PreCommitTxHook {
@Inject
Transaction curTx;

View File

@@ -8,9 +8,10 @@ import com.usatiuk.objects.transaction.PreCommitTxHook;
import com.usatiuk.objects.transaction.Transaction;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.eclipse.microprofile.config.inject.ConfigProperty;
@ApplicationScoped
@Singleton
public class AutosyncTxHook implements PreCommitTxHook {
@Inject
Transaction curTx;

View File

@@ -14,8 +14,9 @@ import com.usatiuk.objects.transaction.Transaction;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
@ApplicationScoped
@Singleton
public class PeerInfoCertUpdateTxHook implements PreCommitTxHook {
@Inject
Transaction curTx;