mirror of
https://github.com/usatiuk/dhfs.git
synced 2025-10-28 20:47:49 +01:00
do not forget the transaction id
This commit is contained in:
@@ -0,0 +1,28 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
public class JDataDummy implements JData {
|
||||
public static final JObjectKey TX_ID_OBJ_NAME = JObjectKey.of("tx_id");
|
||||
private static final JDataDummy INSTANCE = new JDataDummy();
|
||||
|
||||
public static JDataDummy getInstance() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JObjectKey key() {
|
||||
return TX_ID_OBJ_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
// hashCode
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,10 @@ import com.usatiuk.dhfs.objects.transaction.*;
|
||||
import com.usatiuk.dhfs.utils.AutoCloseableNoThrow;
|
||||
import com.usatiuk.dhfs.utils.DataLocker;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.StartupEvent;
|
||||
import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.enterprise.inject.Instance;
|
||||
import jakarta.inject.Inject;
|
||||
|
||||
@@ -26,16 +29,30 @@ public class JObjectManager {
|
||||
private final DataLocker _objLocker = new DataLocker();
|
||||
private final ConcurrentHashMap<JObjectKey, JDataWrapper<?>> _objects = new ConcurrentHashMap<>();
|
||||
private final AtomicLong _txCounter = new AtomicLong();
|
||||
private boolean _ready = false;
|
||||
@Inject
|
||||
WritebackObjectPersistentStore writebackObjectPersistentStore;
|
||||
@Inject
|
||||
TransactionFactory transactionFactory;
|
||||
|
||||
private void verifyReady() {
|
||||
if (!_ready) throw new IllegalStateException("Wrong service order!");
|
||||
}
|
||||
|
||||
void init(@Observes @Priority(200) StartupEvent event) {
|
||||
var read = writebackObjectPersistentStore.readObject(JDataDummy.TX_ID_OBJ_NAME).orElse(null);
|
||||
if (read != null) {
|
||||
_txCounter.set(read.version());
|
||||
}
|
||||
_ready = true;
|
||||
}
|
||||
|
||||
JObjectManager(Instance<PreCommitTxHook> preCommitTxHooks) {
|
||||
_preCommitTxHooks = preCommitTxHooks.stream().sorted(Comparator.comparingInt(PreCommitTxHook::getPriority)).toList();
|
||||
}
|
||||
|
||||
private <T extends JData> JDataVersionedWrapper<T> get(Class<T> type, JObjectKey key) {
|
||||
verifyReady();
|
||||
while (true) {
|
||||
{
|
||||
var got = _objects.get(key);
|
||||
@@ -73,24 +90,30 @@ public class JObjectManager {
|
||||
}
|
||||
|
||||
private <T extends JData> TransactionObjectNoLock<T> getObj(Class<T> type, JObjectKey key) {
|
||||
verifyReady();
|
||||
var got = get(type, key);
|
||||
return new TransactionObjectNoLock<>(Optional.ofNullable(got));
|
||||
}
|
||||
|
||||
private <T extends JData> TransactionObjectLocked<T> getObjLock(Class<T> type, JObjectKey key) {
|
||||
verifyReady();
|
||||
var lock = _objLocker.lock(key);
|
||||
var got = get(type, key);
|
||||
return new TransactionObjectLocked<>(Optional.ofNullable(got), lock);
|
||||
}
|
||||
|
||||
public TransactionPrivate createTransaction() {
|
||||
verifyReady();
|
||||
var counter = _txCounter.getAndIncrement();
|
||||
Log.trace("Creating transaction " + counter);
|
||||
return transactionFactory.createTransaction(counter, new TransactionObjectSourceImpl(counter));
|
||||
}
|
||||
|
||||
public void commit(TransactionPrivate tx) {
|
||||
verifyReady();
|
||||
Log.trace("Committing transaction " + tx.getId());
|
||||
// FIXME: Better way?
|
||||
tx.put(JDataDummy.getInstance());
|
||||
|
||||
var current = new LinkedHashMap<JObjectKey, TxRecord.TxObjectRecord<?>>();
|
||||
var dependenciesLocked = new LinkedHashMap<JObjectKey, TransactionObjectLocked<?>>();
|
||||
@@ -101,7 +124,7 @@ public class JObjectManager {
|
||||
key -> {
|
||||
dependenciesLocked.computeIfAbsent(key, k -> {
|
||||
var got = getObjLock(JData.class, k);
|
||||
Log.trace("Adding dependency " + k.toString() + " -> " + got);
|
||||
Log.trace("Adding dependency " + k.toString() + " -> " + got.data().map(JDataVersionedWrapper::data).map(JData::key).orElse(null));
|
||||
toUnlock.add(got.lock);
|
||||
return got;
|
||||
});
|
||||
@@ -127,7 +150,6 @@ public class JObjectManager {
|
||||
// Check that their version is not higher than the version of transaction being committed
|
||||
// TODO: check deletions, inserts
|
||||
try {
|
||||
Collection<TxRecord.TxObjectRecord<?>> drained;
|
||||
try {
|
||||
boolean somethingChanged;
|
||||
do {
|
||||
@@ -143,8 +165,12 @@ public class JObjectManager {
|
||||
.forEach(addDependency);
|
||||
|
||||
for (var entry : currentIteration.entrySet()) {
|
||||
// FIXME: Kinda hack?
|
||||
if (entry.getKey().equals(JDataDummy.TX_ID_OBJ_NAME)) {
|
||||
continue;
|
||||
}
|
||||
somethingChanged = true;
|
||||
Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.toString());
|
||||
Log.trace("Running pre-commit hook " + hook.getClass() + " for" + entry.getKey());
|
||||
var oldObj = getCurrent.apply(entry.getKey());
|
||||
switch (entry.getValue()) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> {
|
||||
@@ -173,37 +199,39 @@ public class JObjectManager {
|
||||
}
|
||||
}
|
||||
|
||||
for (var dep : dependenciesLocked.entrySet()) {
|
||||
if (dep.getValue().data().isEmpty()) {
|
||||
Log.trace("Checking dependency " + dep.getKey() + " - not found");
|
||||
for (var read : reads.entrySet()) {
|
||||
var dep = dependenciesLocked.get(read.getKey());
|
||||
|
||||
if (dep.data().isEmpty()) {
|
||||
Log.trace("Checking read dependency " + read.getKey() + " - not found");
|
||||
continue;
|
||||
}
|
||||
|
||||
if (dep.getValue().data().get().version() >= tx.getId()) {
|
||||
Log.trace("Checking dependency " + dep.getKey() + " - newer than");
|
||||
throw new TxCommitException("Serialization hazard: " + dep.getValue().data().get().version() + " vs " + tx.getId());
|
||||
if (dep.data().get().version() >= tx.getId()) {
|
||||
Log.trace("Checking dependency " + read.getKey() + " - newer than");
|
||||
throw new TxCommitException("Serialization hazard: " + dep.data().get().version() + " vs " + tx.getId());
|
||||
}
|
||||
|
||||
var read = reads.get(dep.getKey());
|
||||
if (read != null && read.data().orElse(null) != dep.getValue().data().orElse(null)) {
|
||||
Log.trace("Checking dependency " + dep.getKey() + " - read mismatch");
|
||||
throw new TxCommitException("Read mismatch for " + dep.getKey() + ": " + read + " vs " + dep.getValue());
|
||||
}
|
||||
|
||||
Log.trace("Checking dependency " + dep.getKey() + " - ok with read " + read);
|
||||
Log.trace("Checking dependency " + read.getKey() + " - ok with read");
|
||||
}
|
||||
|
||||
Log.tracef("Flushing transaction %d to storage", tx.getId());
|
||||
|
||||
for (var action : current.entrySet()) {
|
||||
var dep = dependenciesLocked.get(action.getKey());
|
||||
if (dep.data().isPresent() && dep.data.get().version() >= tx.getId()) {
|
||||
Log.trace("Skipping write " + action.getKey() + " - dependency " + dep.data().get().version() + " vs " + tx.getId());
|
||||
continue;
|
||||
}
|
||||
|
||||
switch (action.getValue()) {
|
||||
case TxRecord.TxObjectRecordWrite<?> write -> {
|
||||
Log.trace("Flushing object " + action.getKey());
|
||||
Log.trace("Writing " + action.getKey());
|
||||
var wrapped = new JDataVersionedWrapper<>(write.data(), tx.getId());
|
||||
_objects.put(action.getKey(), new JDataWrapper<>(wrapped));
|
||||
}
|
||||
case TxRecord.TxObjectRecordDeleted deleted -> {
|
||||
Log.trace("Deleting object " + action.getKey());
|
||||
Log.trace("Deleting " + action.getKey());
|
||||
_objects.remove(action.getKey());
|
||||
}
|
||||
default -> {
|
||||
@@ -225,6 +253,7 @@ public class JObjectManager {
|
||||
}
|
||||
|
||||
public void rollback(TransactionPrivate tx) {
|
||||
verifyReady();
|
||||
Log.trace("Rolling back transaction " + tx.getId());
|
||||
tx.reads().forEach((key, value) -> {
|
||||
if (value instanceof TransactionObjectLocked<?> locked) {
|
||||
@@ -271,25 +300,21 @@ public class JObjectManager {
|
||||
|
||||
@Override
|
||||
public <T extends JData> TransactionObject<T> get(Class<T> type, JObjectKey key) {
|
||||
return getObj(type, key);
|
||||
// return getObj(type, key).map(got -> {
|
||||
// if (got.data().getVersion() > _txId) {
|
||||
// throw new IllegalStateException("Serialization race for " + key + ": " + got.data().getVersion() + " vs " + _txId);
|
||||
// }
|
||||
// return got;
|
||||
// });
|
||||
var got = getObj(type, key);
|
||||
if (got.data().isPresent() && got.data().get().version() > _txId) {
|
||||
throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
|
||||
}
|
||||
return got;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends JData> TransactionObject<T> getWriteLocked(Class<T> type, JObjectKey key) {
|
||||
return getObjLock(type, key);
|
||||
// return getObjLock(type, key).map(got -> {
|
||||
// if (got.data().getVersion() > _txId) {
|
||||
// got.lock.close();
|
||||
// throw new IllegalStateException("Serialization race for " + key + ": " + got.data().getVersion() + " vs " + _txId);
|
||||
// }
|
||||
// return got;
|
||||
// });
|
||||
var got = getObjLock(type, key);
|
||||
if (got.data().isPresent() && got.data().get().version() > _txId) {
|
||||
got.lock().close();
|
||||
throw new TxCommitException("Serialization race for " + key + ": " + got.data().get().version() + " vs " + _txId);
|
||||
}
|
||||
return got;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -19,11 +19,11 @@ public interface TransactionManager {
|
||||
}
|
||||
|
||||
begin();
|
||||
T ret;
|
||||
try {
|
||||
var ret = supplier.get();
|
||||
commit();
|
||||
return ret;
|
||||
ret = supplier.get();
|
||||
} catch (TxCommitException txCommitException) {
|
||||
rollback();
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
@@ -33,6 +33,16 @@ public interface TransactionManager {
|
||||
rollback();
|
||||
throw e;
|
||||
}
|
||||
try {
|
||||
commit();
|
||||
return ret;
|
||||
} catch (TxCommitException txCommitException) {
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
}
|
||||
return runTries(supplier, tries - 1);
|
||||
}
|
||||
}
|
||||
|
||||
default void runTries(VoidFn fn, int tries) {
|
||||
@@ -44,6 +54,19 @@ public interface TransactionManager {
|
||||
begin();
|
||||
try {
|
||||
fn.apply();
|
||||
} catch (TxCommitException txCommitException) {
|
||||
rollback();
|
||||
if (tries == 0) {
|
||||
Log.error("Transaction commit failed", txCommitException);
|
||||
throw txCommitException;
|
||||
}
|
||||
runTries(fn, tries - 1);
|
||||
return;
|
||||
} catch (Throwable e) {
|
||||
rollback();
|
||||
throw e;
|
||||
}
|
||||
try {
|
||||
commit();
|
||||
} catch (TxCommitException txCommitException) {
|
||||
if (tries == 0) {
|
||||
@@ -51,9 +74,6 @@ public interface TransactionManager {
|
||||
throw txCommitException;
|
||||
}
|
||||
runTries(fn, tries - 1);
|
||||
} catch (Throwable e) {
|
||||
rollback();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
package com.usatiuk.dhfs.objects;
|
||||
|
||||
import com.usatiuk.dhfs.objects.persistence.CachingObjectPersistentStore;
|
||||
import com.usatiuk.dhfs.objects.persistence.TxManifest;
|
||||
import com.usatiuk.dhfs.objects.persistence.TxManifestObj;
|
||||
import com.usatiuk.dhfs.utils.VoidFn;
|
||||
import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.ShutdownEvent;
|
||||
@@ -11,10 +11,14 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@ApplicationScoped
|
||||
@@ -108,39 +112,27 @@ public class TxWritebackImpl implements TxWriteback {
|
||||
}
|
||||
}
|
||||
|
||||
var latch = new CountDownLatch((int) bundle._entries.values().stream().filter(e -> e instanceof TxBundleImpl.CommittedEntry).count());
|
||||
ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
|
||||
var toWrite = new ArrayList<Pair<JObjectKey, JDataVersionedWrapper<?>>>();
|
||||
var toDelete = new ArrayList<JObjectKey>();
|
||||
|
||||
for (var e : bundle._entries.values()) {
|
||||
switch (e) {
|
||||
case TxBundleImpl.CommittedEntry c -> _commitExecutor.execute(() -> {
|
||||
try {
|
||||
Log.trace("Writing new " + c.key());
|
||||
objectPersistentStore.writeObject(c.key(), c.data());
|
||||
} catch (Throwable t) {
|
||||
Log.error("Error writing " + c.key(), t);
|
||||
errors.add(t);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
case TxBundleImpl.DeletedEntry d -> {
|
||||
if (Log.isDebugEnabled())
|
||||
Log.debug("Deleting from persistent storage " + d.key()); // FIXME: For tests
|
||||
case TxBundleImpl.CommittedEntry(JObjectKey key, JDataVersionedWrapper<?> data, int size) -> {
|
||||
Log.trace("Writing new " + key);
|
||||
toWrite.add(Pair.of(key, data));
|
||||
}
|
||||
case TxBundleImpl.DeletedEntry(JObjectKey key) -> {
|
||||
Log.trace("Deleting from persistent storage " + key);
|
||||
toDelete.add(key);
|
||||
}
|
||||
default -> throw new IllegalStateException("Unexpected value: " + e);
|
||||
}
|
||||
}
|
||||
|
||||
latch.await();
|
||||
if (!errors.isEmpty()) {
|
||||
throw new RuntimeException("Errors in writeback!");
|
||||
}
|
||||
|
||||
objectPersistentStore.commitTx(
|
||||
new TxManifest(
|
||||
bundle._entries.values().stream().filter(e -> e instanceof TxBundleImpl.CommittedEntry).map(TxBundleImpl.BundleEntry::key).toList(),
|
||||
bundle._entries.values().stream().filter(e -> e instanceof TxBundleImpl.DeletedEntry).map(TxBundleImpl.BundleEntry::key).toList()
|
||||
new TxManifestObj<>(
|
||||
Collections.unmodifiableList(toWrite),
|
||||
Collections.unmodifiableList(toDelete)
|
||||
));
|
||||
|
||||
Log.trace("Bundle " + bundle.getId() + " committed");
|
||||
|
||||
@@ -7,6 +7,7 @@ import io.quarkus.logging.Log;
|
||||
import io.quarkus.runtime.Startup;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
@@ -89,15 +90,12 @@ public class CachingObjectPersistentStore {
|
||||
}
|
||||
}
|
||||
|
||||
public void writeObject(JObjectKey name, JDataVersionedWrapper<?> object) {
|
||||
delegate.writeObject(name, object);
|
||||
}
|
||||
|
||||
public void commitTx(TxManifest names) {
|
||||
public void commitTx(TxManifestObj<? extends JDataVersionedWrapper<?>> names) {
|
||||
// During commit, readObject shouldn't be called for these items,
|
||||
// it should be handled by the upstream store
|
||||
synchronized (_cache) {
|
||||
for (var key : Stream.concat(names.written().stream(), names.deleted().stream()).toList()) {
|
||||
for (var key : Stream.concat(names.written().stream().map(Pair::getLeft),
|
||||
names.deleted().stream()).toList()) {
|
||||
_curSize -= Optional.ofNullable(_cache.get(key)).map(CacheEntry::size).orElse(0L);
|
||||
_cache.remove(key);
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ import jakarta.annotation.Priority;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.enterprise.event.Observes;
|
||||
import net.openhft.hashing.LongHashFunction;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
@@ -27,10 +26,11 @@ import java.nio.file.Files;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
|
||||
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
|
||||
@@ -50,7 +50,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
private final Path _txManifest;
|
||||
private ExecutorService _flushExecutor;
|
||||
private RandomAccessFile _txFile;
|
||||
private volatile boolean _ready = false;
|
||||
private boolean _ready = false;
|
||||
|
||||
public FileObjectPersistentStore(@ConfigProperty(name = "dhfs.objects.persistence.files.root") String root) {
|
||||
this._root = Path.of(root).resolve("objects");
|
||||
@@ -69,13 +69,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
Files.createFile(_txManifest);
|
||||
}
|
||||
_txFile = new RandomAccessFile(_txManifest.toFile(), "rw");
|
||||
{
|
||||
BasicThreadFactory factory = new BasicThreadFactory.Builder()
|
||||
.namingPattern("persistent-commit-%d")
|
||||
.build();
|
||||
|
||||
_flushExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), factory);
|
||||
}
|
||||
_flushExecutor = Executors.newVirtualThreadPerTaskExecutor();
|
||||
|
||||
tryReplay();
|
||||
Log.info("Transaction replay done");
|
||||
@@ -181,18 +175,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeObject(JObjectKey name, ByteString obj) {
|
||||
verifyReady();
|
||||
try {
|
||||
var tmpPath = getTmpObjPath(name);
|
||||
writeObjectImpl(tmpPath, obj, true);
|
||||
} catch (IOException e) {
|
||||
Log.error("Error writing new file " + name, e);
|
||||
}
|
||||
}
|
||||
|
||||
private TxManifest readTxManifest() {
|
||||
private TxManifestRaw readTxManifest() {
|
||||
try {
|
||||
var channel = _txFile.getChannel();
|
||||
|
||||
@@ -219,7 +202,7 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
}
|
||||
|
||||
private void putTxManifest(TxManifest manifest) {
|
||||
private void putTxManifest(TxManifestRaw manifest) {
|
||||
try {
|
||||
var channel = _txFile.getChannel();
|
||||
var data = SerializationHelper.serializeArray(manifest);
|
||||
@@ -237,62 +220,58 @@ public class FileObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitTx(TxManifest manifest) {
|
||||
public void commitTx(TxManifestRaw manifest) {
|
||||
verifyReady();
|
||||
try {
|
||||
_flushExecutor.invokeAll(
|
||||
manifest.written().stream().map(p -> (Callable<Void>) () -> {
|
||||
var tmpPath = getTmpObjPath(p.getKey());
|
||||
writeObjectImpl(tmpPath, p.getValue(), true);
|
||||
return null;
|
||||
}).toList()
|
||||
).forEach(p -> {
|
||||
try {
|
||||
p.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
commitTxImpl(manifest, true);
|
||||
}
|
||||
|
||||
public void commitTxImpl(TxManifest manifest, boolean failIfNotFound) {
|
||||
public void commitTxImpl(TxManifestRaw manifest, boolean failIfNotFound) {
|
||||
if (manifest.deleted().isEmpty() && manifest.written().isEmpty()) {
|
||||
Log.debug("Empty manifest, skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
putTxManifest(manifest);
|
||||
|
||||
try {
|
||||
if (manifest.deleted().isEmpty() && manifest.written().isEmpty()) {
|
||||
Log.debug("Empty manifest, skipping");
|
||||
return;
|
||||
}
|
||||
|
||||
putTxManifest(manifest);
|
||||
|
||||
var latch = new CountDownLatch(manifest.written().size() + manifest.deleted().size());
|
||||
ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
|
||||
|
||||
for (var n : manifest.written()) {
|
||||
_flushExecutor.execute(() -> {
|
||||
try {
|
||||
Files.move(getTmpObjPath(n), getObjPath(n), ATOMIC_MOVE, REPLACE_EXISTING);
|
||||
} catch (Throwable t) {
|
||||
if (!failIfNotFound && (t instanceof NoSuchFileException)) return;
|
||||
Log.error("Error writing " + n, t);
|
||||
errors.add(t);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
for (var d : manifest.deleted()) {
|
||||
_flushExecutor.execute(() -> {
|
||||
try {
|
||||
deleteImpl(getObjPath(d));
|
||||
} catch (Throwable t) {
|
||||
Log.error("Error deleting " + d, t);
|
||||
errors.add(t);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
latch.await();
|
||||
|
||||
if (!errors.isEmpty()) {
|
||||
throw new RuntimeException("Errors when commiting tx!");
|
||||
}
|
||||
|
||||
// No real need to truncate here
|
||||
// try (var channel = _txFile.getChannel()) {
|
||||
// channel.truncate(0);
|
||||
// }
|
||||
// } catch (IOException e) {
|
||||
// Log.error("Failed committing transaction to disk: ", e);
|
||||
// throw new RuntimeException(e);
|
||||
_flushExecutor.invokeAll(
|
||||
Stream.concat(manifest.written().stream().map(p -> (Callable<Void>) () -> {
|
||||
try {
|
||||
Files.move(getTmpObjPath(p.getKey()), getObjPath(p.getKey()), ATOMIC_MOVE, REPLACE_EXISTING);
|
||||
} catch (NoSuchFileException n) {
|
||||
if (failIfNotFound)
|
||||
throw n;
|
||||
}
|
||||
return null;
|
||||
}),
|
||||
manifest.deleted().stream().map(p -> (Callable<Void>) () -> {
|
||||
deleteImpl(getObjPath(p));
|
||||
return null;
|
||||
})).toList()
|
||||
).forEach(p -> {
|
||||
try {
|
||||
p.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@ import java.util.Optional;
|
||||
@IfBuildProperty(name = "dhfs.objects.persistence", stringValue = "memory")
|
||||
public class MemoryObjectPersistentStore implements ObjectPersistentStore {
|
||||
private final Map<JObjectKey, ByteString> _objects = new HashMap<>();
|
||||
private final Map<JObjectKey, ByteString> _pending = new HashMap<>();
|
||||
|
||||
@Nonnull
|
||||
@Override
|
||||
@@ -34,17 +33,10 @@ public class MemoryObjectPersistentStore implements ObjectPersistentStore {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeObject(JObjectKey name, ByteString object) {
|
||||
public void commitTx(TxManifestRaw names) {
|
||||
synchronized (this) {
|
||||
_pending.put(name, object);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commitTx(TxManifest names) {
|
||||
synchronized (this) {
|
||||
for (JObjectKey key : names.written()) {
|
||||
_objects.put(key, _pending.get(key));
|
||||
for (var written : names.written()) {
|
||||
_objects.put(written.getKey(), written.getValue());
|
||||
}
|
||||
for (JObjectKey key : names.deleted()) {
|
||||
_objects.remove(key);
|
||||
|
||||
@@ -16,9 +16,7 @@ public interface ObjectPersistentStore {
|
||||
@Nonnull
|
||||
Optional<ByteString> readObject(JObjectKey name);
|
||||
|
||||
void writeObject(JObjectKey name, ByteString object);
|
||||
|
||||
void commitTx(TxManifest names);
|
||||
void commitTx(TxManifestRaw names);
|
||||
|
||||
long getTotalSpace();
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.ObjectSerializer;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.Collection;
|
||||
@@ -28,11 +29,11 @@ public class SerializingObjectPersistentStore {
|
||||
return delegate.readObject(name).map(serializer::deserialize);
|
||||
}
|
||||
|
||||
void writeObject(JObjectKey name, JDataVersionedWrapper<?> object) {
|
||||
delegate.writeObject(name, serializer.serialize(object));
|
||||
}
|
||||
|
||||
void commitTx(TxManifest names) {
|
||||
delegate.commitTx(names);
|
||||
void commitTx(TxManifestObj<? extends JDataVersionedWrapper<?>> names) {
|
||||
delegate.commitTx(new TxManifestRaw(
|
||||
names.written().stream()
|
||||
.map(e -> Pair.of(e.getKey(), serializer.serialize(e.getValue())))
|
||||
.toList()
|
||||
, names.deleted()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
package com.usatiuk.dhfs.objects.persistence;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
|
||||
// FIXME: Serializable
|
||||
public record TxManifest(Collection<JObjectKey> written, Collection<JObjectKey> deleted) implements Serializable {
|
||||
}
|
||||
@@ -0,0 +1,12 @@
|
||||
package com.usatiuk.dhfs.objects.persistence;
|
||||
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
|
||||
// FIXME: Serializable
|
||||
public record TxManifestObj<T>(Collection<Pair<JObjectKey, T>> written,
|
||||
Collection<JObjectKey> deleted) implements Serializable {
|
||||
}
|
||||
@@ -0,0 +1,13 @@
|
||||
package com.usatiuk.dhfs.objects.persistence;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import org.apache.commons.lang3.tuple.Pair;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
|
||||
// FIXME: Serializable
|
||||
public record TxManifestRaw(Collection<Pair<JObjectKey, ByteString>> written,
|
||||
Collection<JObjectKey> deleted) implements Serializable {
|
||||
}
|
||||
@@ -7,6 +7,7 @@ import io.quarkus.logging.Log;
|
||||
import io.quarkus.test.junit.QuarkusTest;
|
||||
import jakarta.inject.Inject;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
@@ -113,6 +114,7 @@ public class ObjectsTest {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
void createObjectConflict() throws InterruptedException {
|
||||
AtomicBoolean thread1Failed = new AtomicBoolean(true);
|
||||
AtomicBoolean thread2Failed = new AtomicBoolean(true);
|
||||
|
||||
@@ -24,40 +24,52 @@ import java.util.Date;
|
||||
|
||||
public class CertificateTools {
|
||||
|
||||
public static X509Certificate certFromBytes(byte[] bytes) throws CertificateException {
|
||||
CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
|
||||
InputStream in = new ByteArrayInputStream(bytes);
|
||||
return (X509Certificate) certFactory.generateCertificate(in);
|
||||
public static X509Certificate certFromBytes(byte[] bytes) {
|
||||
try {
|
||||
CertificateFactory certFactory = CertificateFactory.getInstance("X.509");
|
||||
InputStream in = new ByteArrayInputStream(bytes);
|
||||
return (X509Certificate) certFactory.generateCertificate(in);
|
||||
} catch (CertificateException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static KeyPair generateKeyPair() throws NoSuchAlgorithmException {
|
||||
KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA");
|
||||
keyGen.initialize(2048); //FIXME:
|
||||
return keyGen.generateKeyPair();
|
||||
public static KeyPair generateKeyPair() {
|
||||
try {
|
||||
KeyPairGenerator keyGen = KeyPairGenerator.getInstance("RSA");
|
||||
keyGen.initialize(2048); //FIXME:
|
||||
return keyGen.generateKeyPair();
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static X509Certificate generateCertificate(KeyPair keyPair, String subject) throws CertificateException, CertIOException, NoSuchAlgorithmException, OperatorCreationException {
|
||||
Provider bcProvider = new BouncyCastleProvider();
|
||||
Security.addProvider(bcProvider);
|
||||
public static X509Certificate generateCertificate(KeyPair keyPair, String subject) {
|
||||
try {
|
||||
Provider bcProvider = new BouncyCastleProvider();
|
||||
Security.addProvider(bcProvider);
|
||||
|
||||
Date startDate = new Date();
|
||||
Date startDate = new Date();
|
||||
|
||||
X500Name cnName = new X500Name("CN=" + subject);
|
||||
BigInteger certSerialNumber = new BigInteger(DigestUtils.sha256(subject));
|
||||
X500Name cnName = new X500Name("CN=" + subject);
|
||||
BigInteger certSerialNumber = new BigInteger(DigestUtils.sha256(subject));
|
||||
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
calendar.setTime(startDate);
|
||||
calendar.add(Calendar.YEAR, 999);
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
calendar.setTime(startDate);
|
||||
calendar.add(Calendar.YEAR, 999);
|
||||
|
||||
Date endDate = calendar.getTime();
|
||||
Date endDate = calendar.getTime();
|
||||
|
||||
ContentSigner contentSigner = new JcaContentSignerBuilder("SHA256WithRSA").build(keyPair.getPrivate());
|
||||
ContentSigner contentSigner = new JcaContentSignerBuilder("SHA256WithRSA").build(keyPair.getPrivate());
|
||||
|
||||
JcaX509v3CertificateBuilder certBuilder = new JcaX509v3CertificateBuilder(cnName, certSerialNumber, startDate, endDate, cnName, keyPair.getPublic());
|
||||
JcaX509v3CertificateBuilder certBuilder = new JcaX509v3CertificateBuilder(cnName, certSerialNumber, startDate, endDate, cnName, keyPair.getPublic());
|
||||
|
||||
BasicConstraints basicConstraints = new BasicConstraints(false);
|
||||
certBuilder.addExtension(new ASN1ObjectIdentifier("2.5.29.19"), true, basicConstraints);
|
||||
BasicConstraints basicConstraints = new BasicConstraints(false);
|
||||
certBuilder.addExtension(new ASN1ObjectIdentifier("2.5.29.19"), true, basicConstraints);
|
||||
|
||||
return new JcaX509CertificateConverter().setProvider(bcProvider).getCertificate(certBuilder.build(contentSigner));
|
||||
return new JcaX509CertificateConverter().setProvider(bcProvider).getCertificate(certBuilder.build(contentSigner));
|
||||
} catch (OperatorCreationException | CertificateException | CertIOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.KeyPair;
|
||||
import java.security.cert.CertificateEncodingException;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
@@ -54,15 +55,15 @@ public class PersistentPeerDataService {
|
||||
_selfKeyPair = selfData.selfKeyPair();
|
||||
return;
|
||||
} else {
|
||||
_selfUuid = presetUuid.map(s -> PeerId.of(UUID.fromString(s))).orElseGet(() -> PeerId.of(UUID.randomUUID()));
|
||||
try {
|
||||
_selfUuid = presetUuid.map(s -> PeerId.of(UUID.fromString(s))).orElseGet(() -> PeerId.of(UUID.randomUUID()));
|
||||
Log.info("Generating a key pair, please wait");
|
||||
_selfKeyPair = CertificateTools.generateKeyPair();
|
||||
_selfCertificate = CertificateTools.generateCertificate(_selfKeyPair, _selfUuid.toString());
|
||||
|
||||
curTx.put(new PersistentRemoteHostsData(_selfUuid, 0, _selfCertificate, _selfKeyPair));
|
||||
peerInfoService.putPeer(_selfUuid, _selfCertificate.getEncoded());
|
||||
} catch (Exception e) {
|
||||
} catch (CertificateEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,10 @@
|
||||
package com.usatiuk.dhfs.objects.repository.peersync;
|
||||
|
||||
import com.usatiuk.autoprotomap.runtime.ProtoMirror;
|
||||
import com.usatiuk.dhfs.objects.JDataRemote;
|
||||
import com.usatiuk.dhfs.objects.JObjectKey;
|
||||
import com.usatiuk.dhfs.objects.PeerId;
|
||||
import com.usatiuk.dhfs.objects.persistence.ChunkDataP;
|
||||
import com.usatiuk.dhfs.objects.persistence.PeerInfoP;
|
||||
import com.usatiuk.dhfs.objects.repository.CertificateTools;
|
||||
|
||||
import java.security.cert.CertificateException;
|
||||
import java.security.cert.X509Certificate;
|
||||
|
||||
public record PeerInfo(JObjectKey key, PeerId id, byte[] cert) implements JDataRemote {
|
||||
@@ -17,10 +13,6 @@ public record PeerInfo(JObjectKey key, PeerId id, byte[] cert) implements JDataR
|
||||
}
|
||||
|
||||
public X509Certificate parsedCert() {
|
||||
try {
|
||||
return CertificateTools.certFromBytes(cert);
|
||||
} catch (CertificateException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return CertificateTools.certFromBytes(cert);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user