8 Commits

13 changed files with 253 additions and 224 deletions

View File

@@ -1,17 +1,16 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsapp.Main"/>
<module name="dhfs-app"/>
<option name="VM_PARAMETERS"
value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011"/>
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*"/>
<option name="ENABLED" value="true"/>
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true"/>
</method>
</configuration>
<configuration default="false" name="Main 2" type="QsApplicationConfigurationType" factoryName="QuarkusApplication">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsapp.Main" />
<module name="dhfs-app" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+DebugNonSafepoints --enable-preview --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/2/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/2/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/2/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=false -Dquarkus.http.port=9020 -Dquarkus.http.ssl-port=9021 -Ddhfs.peerdiscovery.preset-uuid=22000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=11000000-0000-0000-0000-000000000000:127.0.0.1:9010:9011" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>

View File

@@ -1,18 +1,16 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication"
nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsapp.Main"/>
<module name="dhfs-app"/>
<option name="VM_PARAMETERS"
value="-XX:+UnlockDiagnosticVMOptions -XX:+UseParallelGC -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=9010 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0"/>
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*"/>
<option name="ENABLED" value="true"/>
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true"/>
</method>
</configuration>
<configuration default="false" name="Main" type="QsApplicationConfigurationType" factoryName="QuarkusApplication" nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="com.usatiuk.dhfsapp.Main" />
<module name="dhfs-app" />
<option name="VM_PARAMETERS" value="-XX:+UnlockDiagnosticVMOptions -XX:+UseParallelGC --enable-preview -XX:+DebugNonSafepoints --add-exports java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-exports java.base/jdk.internal.access=ALL-UNNAMED -ea -Xmx2G -Ddhfs.webui.root=$ProjectFileDir$/../webui/dist -Ddhfs.fuse.root=${HOME}/dhfs_test/1/fuse -Ddhfs.objects.persistence.files.root=${HOME}/dhfs_test/1/data -Ddhfs.objects.persistence.stuff.root=${HOME}/dhfs_test/1/data/stuff -Ddhfs.objects.peerdiscovery.broadcast=true -Dquarkus.http.port=9010 -Dquarkus.http.ssl-port=9011 -Ddhfs.peerdiscovery.preset-uuid=11000000-0000-0000-0000-000000000000 -Ddhfs.peerdiscovery.static-peers=22000000-0000-0000-0000-000000000000:127.0.0.1:9020:9021 -Dquarkus.http.host=0.0.0.0" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.usatiuk.dhfs.remoteobj.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>

View File

@@ -14,8 +14,9 @@ dhfs.objects.persistence.stuff.root=${HOME}/dhfs_default/data/stuff
dhfs.fuse.debug=false
dhfs.fuse.enabled=true
dhfs.files.allow_recursive_delete=false
dhfs.files.target_chunk_size=2097152
dhfs.files.target_chunk_alignment=19
dhfs.files.target_chunk_size=524288
dhfs.files.max_chunk_size=524288
dhfs.files.target_chunk_alignment=17
dhfs.objects.deletion.delay=1000
dhfs.objects.deletion.can-delete-retry-delay=10000
dhfs.objects.ref_verification=true

View File

@@ -70,6 +70,7 @@ public class DhfsImage implements Future<String> {
"--add-exports", "java.base/sun.nio.ch=ALL-UNNAMED",
"--add-exports", "java.base/jdk.internal.access=ALL-UNNAMED",
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--enable-preview",
"-Ddhfs.objects.peerdiscovery.interval=1s",
"-Ddhfs.objects.invalidation.delay=100",
"-Ddhfs.objects.deletion.delay=0",

View File

@@ -55,6 +55,9 @@ public class DhfsFileServiceImpl implements DhfsFileService {
@ConfigProperty(name = "dhfs.files.target_chunk_size")
int targetChunkSize;
@ConfigProperty(name = "dhfs.files.max_chunk_size", defaultValue = "524288")
int maxChunkSize;
@ConfigProperty(name = "dhfs.files.use_hash_for_chunks")
boolean useHashForChunks;
@@ -360,16 +363,10 @@ public class DhfsFileServiceImpl implements DhfsFileService {
var file = remoteTx.getData(File.class, fileUuid, LockingStrategy.WRITE).orElse(null);
if (file == null) {
Log.error("File not found when trying to write: " + fileUuid);
return -1L;
throw new StatusRuntimeException(Status.NOT_FOUND.withDescription("File not found when trying to write: " + fileUuid));
}
if (writeLogging) {
Log.info("Writing to file: " + file.key() + " size=" + size(fileUuid) + " "
+ offset + " " + data.size());
}
NavigableMap<Long, JObjectKey> removedChunks = new TreeMap<>();
Map<Long, JObjectKey> removedChunks = new HashMap<>();
long realOffset = targetChunkAlignment >= 0 ? alignDown(offset, targetChunkAlignment) : offset;
long writeEnd = offset + data.size();
@@ -407,7 +404,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
}
NavigableMap<Long, JObjectKey> newChunks = new TreeMap<>();
Map<Long, JObjectKey> newChunks = new HashMap<>();
if (existingEnd < offset) {
if (!pendingPrefix.isEmpty()) {
@@ -424,12 +421,13 @@ public class DhfsFileServiceImpl implements DhfsFileService {
int combinedSize = pendingWrites.size();
{
int targetChunkSize = 1 << targetChunkAlignment;
int cur = 0;
while (cur < combinedSize) {
int end;
if (targetChunkAlignment < 0)
if (combinedSize - cur < maxChunkSize)
end = combinedSize;
else if (targetChunkAlignment < 0)
end = combinedSize;
else
end = Math.min(cur + targetChunkSize, combinedSize);
@@ -550,7 +548,7 @@ public class DhfsFileServiceImpl implements DhfsFileService {
});
}
private void fillZeros(long fillStart, long length, NavigableMap<Long, JObjectKey> newChunks) {
private void fillZeros(long fillStart, long length, Map<Long, JObjectKey> newChunks) {
long combinedSize = (length - fillStart);
long start = fillStart;

View File

@@ -1,12 +1,14 @@
package com.usatiuk.dhfsfuse;
import com.google.protobuf.UnsafeByteOperations;
import com.kenai.jffi.MemoryIO;
import com.sun.security.auth.module.UnixSystem;
import com.usatiuk.dhfsfs.service.DhfsFileService;
import com.usatiuk.dhfsfs.service.DirectoryNotEmptyException;
import com.usatiuk.dhfsfs.service.GetattrRes;
import com.usatiuk.kleppmanntree.AlreadyExistsException;
import com.usatiuk.objects.JObjectKey;
import com.usatiuk.utils.UninitializedByteBuffer;
import com.usatiuk.utils.UnsafeAccessor;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
@@ -18,15 +20,18 @@ import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import jnr.ffi.Pointer;
import jnr.ffi.Runtime;
import jnr.ffi.Struct;
import jnr.ffi.types.off_t;
import jnr.ffi.types.size_t;
import org.apache.commons.lang3.SystemUtils;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import ru.serce.jnrfuse.ErrorCodes;
import ru.serce.jnrfuse.FuseFillDir;
import ru.serce.jnrfuse.FuseStubFS;
import ru.serce.jnrfuse.struct.FileStat;
import ru.serce.jnrfuse.struct.FuseFileInfo;
import ru.serce.jnrfuse.struct.Statvfs;
import ru.serce.jnrfuse.struct.Timespec;
import ru.serce.jnrfuse.NotImplemented;
import ru.serce.jnrfuse.flags.FuseBufFlags;
import ru.serce.jnrfuse.struct.*;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
@@ -240,17 +245,19 @@ public class DhfsFuse extends FuseStubFS {
@Override
public int write(String path, Pointer buf, long size, long offset, FuseFileInfo fi) {
var buffer = UninitializedByteBuffer.allocate((int) size);
UnsafeAccessor.get().getUnsafe().copyMemory(
buf.address(),
UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer),
size
);
return write(path, buffer, offset, fi);
}
public int write(String path, ByteBuffer buffer, long offset, FuseFileInfo fi) {
if (offset < 0) return -ErrorCodes.EINVAL();
try {
var fileKey = getFromHandle(fi.fh.get());
var buffer = ByteBuffer.allocateDirect((int) size);
UnsafeAccessor.get().getUnsafe().copyMemory(
buf.address(),
UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer),
size
);
var written = fileService.write(fileKey, offset, UnsafeByteOperations.unsafeWrap(buffer));
return written.intValue();
} catch (Exception e) {
@@ -421,4 +428,29 @@ public class DhfsFuse extends FuseStubFS {
return -ErrorCodes.EIO();
}
}
@Override
public int write_buf(String path, FuseBufvec buf, @off_t long off, FuseFileInfo fi) {
int size = (int) libFuse.fuse_buf_size(buf);
FuseBufvec tmpVec = new FuseBufvec(Runtime.getSystemRuntime());
long tmpVecAddr = MemoryIO.getInstance().allocateMemory(Struct.size(tmpVec), false);
try {
tmpVec.useMemory(Pointer.wrap(Runtime.getSystemRuntime(), tmpVecAddr));
FuseBufvec.init(tmpVec, size);
var bb = UninitializedByteBuffer.allocate(size);
var mem = UninitializedByteBuffer.getAddress(bb);
tmpVec.buf.mem.set(mem);
tmpVec.buf.size.set(size);
int res = (int) libFuse.fuse_buf_copy(tmpVec, buf, 0);
if (res != size) {
Log.errorv("fuse_buf_copy failed: {0} != {1}", res, size);
return -ErrorCodes.ENOMEM();
}
return write(path, bb, off, fi);
} finally {
if (tmpVecAddr != 0) {
MemoryIO.getInstance().freeMemory(tmpVecAddr);
}
}
}
}

View File

@@ -1,5 +1,7 @@
package com.usatiuk.objects;
import com.usatiuk.utils.UninitializedByteBuffer;
import java.io.Serial;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
@@ -46,7 +48,7 @@ public final class JObjectKeyImpl implements JObjectKey {
synchronized (this) {
if (_bb != null) return _bb;
var bytes = value.getBytes(StandardCharsets.ISO_8859_1);
var directBb = ByteBuffer.allocateDirect(bytes.length);
var directBb = UninitializedByteBuffer.allocate(bytes.length);
directBb.put(bytes);
directBb.flip();
_bb = directBb;

View File

@@ -27,23 +27,33 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Stream;
@ApplicationScoped
public class WritebackObjectPersistentStore {
private final LinkedList<TxBundle> _pendingBundles = new LinkedList<>();
private final LinkedHashMap<Long, TxBundle> _notFlushedBundles = new LinkedHashMap<>();
private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null);
private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastWrittenId = new AtomicLong(-1);
private final AtomicLong _lastCommittedId = new AtomicLong();
private final AtomicLong _waitedTotal = new AtomicLong(0);
@Inject
CachingObjectPersistentStore cachedStore;
@ConfigProperty(name = "dhfs.objects.writeback.limit")
long sizeLimit;
private final AtomicReference<TxBundle> _pendingBundle = new AtomicReference<>(null);
private final AtomicReference<PendingWriteData> _pendingWrites = new AtomicReference<>(null);
private final Object _flushWaitSynchronizer = new Object();
private final AtomicLong _lastFlushedId = new AtomicLong(-1);
private final AtomicLong _lastCommittedId = new AtomicLong(-1);
private final AtomicLong _waitedTotal = new AtomicLong(0);
private long currentSize = 0;
private ExecutorService _writebackExecutor;
private ExecutorService _statusExecutor;
@Inject
ExecutorService _callbackExecutor;
private volatile boolean _ready = false;
void init(@Observes @Priority(120) StartupEvent event) {
@@ -72,7 +82,7 @@ public class WritebackObjectPersistentStore {
lastTxId = s.id();
}
_lastCommittedId.set(lastTxId);
_lastWrittenId.set(lastTxId);
_lastFlushedId.set(lastTxId);
_pendingWrites.set(new PendingWriteData(TreePMap.empty(), lastTxId, lastTxId));
_ready = true;
}
@@ -98,21 +108,10 @@ public class WritebackObjectPersistentStore {
private void writeback() {
while (!Thread.interrupted()) {
try {
TxBundle bundle = new TxBundle(0);
synchronized (_pendingBundles) {
while (_pendingBundles.isEmpty() || !_pendingBundles.peek()._ready)
_pendingBundles.wait();
long diff = 0;
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.size();
bundle.compress(toCompress);
}
diff += bundle.size();
synchronized (_flushWaitSynchronizer) {
currentSize += diff;
}
TxBundle bundle;
synchronized (_pendingBundle) {
while ((bundle = _pendingBundle.getAndSet(null)) == null)
_pendingBundle.wait();
}
var toWrite = new ArrayList<Pair<JObjectKey, JDataVersionedWrapper>>();
@@ -132,15 +131,11 @@ public class WritebackObjectPersistentStore {
}
}
cachedStore.commitTx(
new TxManifestObj<>(
Collections.unmodifiableList(toWrite),
Collections.unmodifiableList(toDelete)
), bundle.id());
cachedStore.commitTx(new TxManifestObj<>(toWrite, toDelete), bundle.id());
Log.tracev("Bundle {0} committed", bundle.id());
while (true) {
synchronized (_pendingWrites) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : bundle._entries.values()) {
@@ -153,22 +148,17 @@ public class WritebackObjectPersistentStore {
bundle.id(),
curPw.lastCommittedId()
);
if (_pendingWrites.compareAndSet(curPw, newCurPw))
break;
_pendingWrites.compareAndSet(curPw, newCurPw);
}
List<List<Runnable>> callbacks = new ArrayList<>();
synchronized (_notFlushedBundles) {
_lastWrittenId.set(bundle.id());
while (!_notFlushedBundles.isEmpty() && _notFlushedBundles.firstEntry().getKey() <= bundle.id()) {
callbacks.add(_notFlushedBundles.pollFirstEntry().getValue().setCommitted());
}
}
callbacks.forEach(l -> l.forEach(Runnable::run));
_lastFlushedId.set(bundle.id());
var callbacks = bundle.getCallbacks();
_callbackExecutor.submit(() -> {
callbacks.forEach(Runnable::run);
});
synchronized (_flushWaitSynchronizer) {
currentSize -= bundle.size();
// FIXME:
if (currentSize <= sizeLimit || !_ready)
_flushWaitSynchronizer.notifyAll();
}
@@ -184,65 +174,39 @@ public class WritebackObjectPersistentStore {
public long commitBundle(Collection<TxRecord.TxObjectRecord<?>> writes) {
verifyReady();
boolean wait = false;
while (true) {
if (wait) {
synchronized (_flushWaitSynchronizer) {
long started = System.currentTimeMillis();
while (currentSize > sizeLimit) {
try {
_flushWaitSynchronizer.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (_flushWaitSynchronizer) {
long started = System.currentTimeMillis();
while (currentSize > sizeLimit) {
try {
_flushWaitSynchronizer.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled())
Log.tracev("Thread {0} waited for tx bundle for {1} ms", Thread.currentThread().getName(), waited);
wait = false;
}
long waited = System.currentTimeMillis() - started;
_waitedTotal.addAndGet(waited);
if (Log.isTraceEnabled())
Log.tracev("Thread {0} waited for tx bundle for {1} ms", Thread.currentThread().getName(), waited);
}
synchronized (_pendingBundles) {
synchronized (_pendingBundle) {
synchronized (_flushWaitSynchronizer) {
if (currentSize > sizeLimit) {
if (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var target = _pendingBundles.poll();
long diff = -target.size();
while (!_pendingBundles.isEmpty() && _pendingBundles.peek()._ready) {
var toCompress = _pendingBundles.poll();
diff -= toCompress.size();
target.compress(toCompress);
}
diff += target.size();
currentSize += diff;
_pendingBundles.addFirst(target);
}
}
if (currentSize > sizeLimit) {
wait = true;
continue;
}
}
TxBundle bundle;
synchronized (_notFlushedBundles) {
bundle = new TxBundle(_lastCommittedId.incrementAndGet());
_pendingBundles.addLast(bundle);
_notFlushedBundles.put(bundle.id(), bundle);
}
TxBundle bundle = new TxBundle(_lastCommittedId.incrementAndGet());
for (var action : writes) {
switch (action) {
case TxRecord.TxObjectRecordWrite<?> write -> {
Log.tracev("Flushing object {0}", write.key());
// Log.tracev("Flushing object {0}", write.key());
bundle.commit(new JDataVersionedWrapperImpl(write.data(), bundle.id()));
}
case TxRecord.TxObjectRecordDeleted deleted -> {
Log.tracev("Deleting object {0}", deleted.key());
// Log.tracev("Deleting object {0}", deleted.key());
bundle.delete(deleted.key());
}
default -> {
@@ -251,7 +215,7 @@ public class WritebackObjectPersistentStore {
}
}
while (true) {
synchronized (_pendingWrites) {
var curPw = _pendingWrites.get();
var curPwMap = curPw.pendingWrites();
for (var e : ((TxBundle) bundle)._entries.values()) {
@@ -272,18 +236,24 @@ public class WritebackObjectPersistentStore {
bundle.id()
);
if (!_pendingWrites.compareAndSet(curPw, newCurPw))
continue;
((TxBundle) bundle).setReady();
if (_pendingBundles.peek() == bundle)
_pendingBundles.notify();
synchronized (_flushWaitSynchronizer) {
currentSize += ((TxBundle) bundle).size();
}
return bundle.id();
_pendingWrites.compareAndSet(curPw, newCurPw);
}
var curBundle = _pendingBundle.get();
long oldSize = 0;
if (curBundle != null) {
oldSize = curBundle.size();
curBundle.compress(bundle);
} else {
curBundle = bundle;
}
_pendingBundle.set(curBundle);
_pendingBundle.notifyAll();
synchronized (_flushWaitSynchronizer) {
currentSize += (curBundle.size() - oldSize);
}
return bundle.id();
}
}
}
@@ -291,16 +261,21 @@ public class WritebackObjectPersistentStore {
public void asyncFence(long bundleId, Runnable fn) {
verifyReady();
if (bundleId < 0) throw new IllegalArgumentException("txId should be >0!");
if (_lastWrittenId.get() >= bundleId) {
if (_lastFlushedId.get() >= bundleId) {
fn.run();
return;
}
synchronized (_notFlushedBundles) {
if (_lastWrittenId.get() >= bundleId) {
synchronized (_pendingBundle) {
if (_lastFlushedId.get() >= bundleId) {
fn.run();
return;
}
_notFlushedBundles.get(bundleId).addCallback(fn);
var pendingBundle = _pendingBundle.get();
if (pendingBundle == null) {
fn.run();
return;
}
pendingBundle.addCallback(fn);
}
}
@@ -381,21 +356,20 @@ public class WritebackObjectPersistentStore {
}
}
public interface VerboseReadResult {
}
private record PendingWriteData(TreePMap<JObjectKey, PendingWriteEntry> pendingWrites,
long lastFlushedId,
long lastCommittedId) {
}
private static class TxBundle {
private final LinkedHashMap<JObjectKey, BundleEntry> _entries = new LinkedHashMap<>();
private final HashMap<JObjectKey, BundleEntry> _entries = new HashMap<>();
private final ArrayList<Runnable> _callbacks = new ArrayList<>();
private long _txId;
private volatile boolean _ready = false;
private long _size = 0;
private boolean _wasCommitted = false;
ArrayList<Runnable> getCallbacks() {
return _callbacks;
}
private TxBundle(long txId) {
_txId = txId;
@@ -405,22 +379,8 @@ public class WritebackObjectPersistentStore {
return _txId;
}
public void setReady() {
_ready = true;
}
public void addCallback(Runnable callback) {
synchronized (_callbacks) {
if (_wasCommitted) throw new IllegalStateException();
_callbacks.add(callback);
}
}
public List<Runnable> setCommitted() {
synchronized (_callbacks) {
_wasCommitted = true;
return Collections.unmodifiableList(_callbacks);
}
_callbacks.add(callback);
}
private void putEntry(BundleEntry entry) {
@@ -453,11 +413,7 @@ public class WritebackObjectPersistentStore {
putEntry(entry);
}
synchronized (_callbacks) {
assert !_wasCommitted;
assert !other._wasCommitted;
_callbacks.addAll(other._callbacks);
}
_callbacks.addAll(other._callbacks);
}
private interface BundleEntry {
@@ -478,10 +434,4 @@ public class WritebackObjectPersistentStore {
}
}
}
public record VerboseReadResultPersisted(Optional<JDataVersionedWrapper> data) implements VerboseReadResult {
}
public record VerboseReadResultPending(PendingWriteEntry pending) implements VerboseReadResult {
}
}

View File

@@ -70,7 +70,7 @@ public class JObjectManager {
CommitHookIterationData[] hookIterationDataArray = new CommitHookIterationData[_preCommitTxHooks.size()];
for (int i = 0; i < _preCommitTxHooks.size(); i++) {
var hook = _preCommitTxHooks.get(i);
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>(), new HashMap<>());
hookIterationDataArray[i] = new CommitHookIterationData(hook, new HashMap<>());
}
hookIterationData = List.of(hookIterationDataArray);
}
@@ -83,6 +83,18 @@ public class JObjectManager {
writes.put(n.key(), n);
}
var lastIterationState = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
var curIterationWrites = new HashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
Function<JObjectKey, JData> getPrev =
key -> switch (lastIterationState.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null;
case null -> tx.getFromSource(JData.class, key).orElse(null);
default -> {
throw new TxCommitException("Unexpected value: " + writes.get(key));
}
};
// Run hooks for all objects
// Every hook should see every change made to every object, yet the object's evolution
// should be consistent from the view point of each individual hook
@@ -92,25 +104,14 @@ public class JObjectManager {
while (pendingCount > 0) {
for (var hookId : hookIterationData) {
var hook = hookId.hook();
var lastCurHookSeen = hookId.lastWrites();
Function<JObjectKey, JData> getPrev =
key -> switch (lastCurHookSeen.get(key)) {
case TxRecord.TxObjectRecordWrite<?> write -> write.data();
case TxRecord.TxObjectRecordDeleted deleted -> null;
case null -> tx.getFromSource(JData.class, key).orElse(null);
default -> {
throw new TxCommitException("Unexpected value: " + writes.get(key));
}
};
var curIteration = hookId.pendingWrites();
// Log.trace("Commit iteration with " + curIteration.size() + " records for hook " + hook.getClass());
for (var entry : curIteration.entrySet()) {
// Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
// Log.trace("Running pre-commit hook " + hook.getClass() + " for " + entry.getKey());
var oldObj = getPrev.apply(entry.getKey());
lastCurHookSeen.put(entry.getKey(), entry.getValue());
switch (entry.getValue()) {
case TxRecord.TxObjectRecordWrite<?> write -> {
if (oldObj == null) {
@@ -131,18 +132,19 @@ public class JObjectManager {
for (var n : tx.drainNewWrites()) {
for (var hookPut : hookIterationData) {
if (hookPut == hookId) {
lastCurHookSeen.put(n.key(), n);
continue;
if (hookPut != hookId) {
var before = hookPut.pendingWrites().put(n.key(), n);
if (before == null)
pendingCount++;
}
var before = hookPut.pendingWrites().put(n.key(), n);
if (before == null)
pendingCount++;
}
writes.put(n.key(), n);
curIterationWrites.put(n.key(), n);
}
}
lastIterationState.putAll(curIterationWrites);
curIterationWrites.clear();
}
writes.putAll(lastIterationState);
} catch (Throwable e) {
for (var read : tx.reads().entrySet()) {
if (read.getValue() instanceof TransactionObjectLocked<?> locked) {
@@ -164,11 +166,11 @@ public class JObjectManager {
toLock.add(read.getKey());
}
}
for (var write : writes.entrySet()) {
if (!readSet.containsKey(write.getKey()))
toLock.add(write.getKey());
for (var write : writes.keySet()) {
if (!readSet.containsKey(write))
toLock.add(write);
}
Collections.sort(toLock);
toLock.sort(null);
for (var key : toLock) {
var lock = lockManager.lockObject(key);
toUnlock.add(lock);
@@ -192,14 +194,16 @@ public class JObjectManager {
writebackObjectPersistentStore.asyncFence(finalVersion, r);
};
var onCommit = tx.getOnCommit();
var onFlush = tx.getOnFlush();
return Pair.of(
Stream.concat(
tx.getOnCommit().stream(),
Stream.<Runnable>of(() -> {
for (var f : tx.getOnFlush())
fenceFn.accept(f);
})
).toList(),
List.of(() -> {
for (var f : onCommit)
f.run();
for (var f : onFlush)
fenceFn.accept(f);
}),
new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
@@ -221,7 +225,7 @@ public class JObjectManager {
}
if (current.isEmpty()) {
// TODO: Every write gets a dependency due to hooks
// Every write gets a dependency due to hooks
continue;
// assert false;
// throw new TxCommitException("Serialization hazard: " + dep.isEmpty() + " vs " + read.getValue().value().isEmpty());
@@ -246,7 +250,7 @@ public class JObjectManager {
}
return Pair.of(
List.copyOf(tx.getOnCommit()),
tx.getOnCommit(),
new TransactionHandle() {
@Override
public void onFlush(Runnable runnable) {
@@ -278,7 +282,6 @@ public class JObjectManager {
}
private record CommitHookIterationData(PreCommitTxHook hook,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> lastWrites,
Map<JObjectKey, TxRecord.TxObjectRecord<?>> pendingWrites) {
}
}

View File

@@ -102,6 +102,7 @@
<arg>-parameters</arg>
<arg>--add-exports</arg>
<arg>java.base/jdk.internal.access=ALL-UNNAMED</arg>
<arg>--enable-preview</arg>
</compilerArgs>
</configuration>
</plugin>
@@ -119,6 +120,7 @@
--add-exports java.base/sun.nio.ch=ALL-UNNAMED
--add-exports java.base/jdk.internal.access=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--enable-preview
</argLine>
<skipTests>${skip.unit}</skipTests>
<redirectTestOutputToFile>true</redirectTestOutputToFile>

View File

@@ -34,11 +34,9 @@ public class HashSetDelayedBlockingQueue<T> {
synchronized (this) {
if (_closed) throw new IllegalStateException("Adding to a queue that is closed!");
if (_set.containsKey(el))
if (_set.putIfAbsent(el, new SetElement<>(el, System.currentTimeMillis())) != null)
return false;
_set.put(el, new SetElement<>(el, System.currentTimeMillis()));
this.notify();
return true;
}

View File

@@ -0,0 +1,44 @@
package com.usatiuk.utils;
import java.lang.foreign.*;
import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
public class UninitializedByteBuffer {
private static final Linker LINKER = Linker.nativeLinker();
private static final MethodHandle malloc = LINKER.downcallHandle(
LINKER.defaultLookup().find("malloc").orElseThrow(),
FunctionDescriptor.of(ValueLayout.ADDRESS, ValueLayout.JAVA_LONG)
);
private static final MethodHandle free = LINKER.downcallHandle(
LINKER.defaultLookup().find("free").orElseThrow(),
FunctionDescriptor.ofVoid(ValueLayout.ADDRESS)
);
public static ByteBuffer allocate(int capacity) {
UnsafeAccessor.get().getNioAccess().reserveMemory(capacity, capacity);
MemorySegment segment = null;
try {
segment = (MemorySegment) malloc.invokeExact((long) capacity);
} catch (Throwable e) {
throw new RuntimeException(e);
}
Consumer<MemorySegment> cleanup = s -> {
try {
free.invokeExact(s);
UnsafeAccessor.get().getNioAccess().unreserveMemory(capacity, capacity);
} catch (Throwable e) {
throw new RuntimeException(e);
}
};
var reint = segment.reinterpret(capacity, Arena.ofAuto(), cleanup);
return reint.asByteBuffer();
}
public static long getAddress(ByteBuffer buffer) {
return UnsafeAccessor.get().getNioAccess().getBufferAddress(buffer);
}
}

View File

@@ -25,6 +25,7 @@ echo "Extra options: $EXTRAOPTS_PARSED"
java \
-Xmx512M \
--enable-preview \
-Ddhfs.objects.writeback.limit=134217728 \
-Ddhfs.objects.lru.limit=134217728 \
--add-exports java.base/sun.nio.ch=ALL-UNNAMED \