From 70784168dfa705c03bdc1c1a887c77bfdc8b9973 Mon Sep 17 00:00:00 2001 From: akucukoduk16 Date: Tue, 17 May 2022 16:36:47 +0300 Subject: [PATCH 1/3] Distributed Storage For Hub done. --- ...LightChainDistributedStorageException.java | 5 +- .../java/model/lightchain/Identifier.java | 41 +++++- src/main/java/network/NetworkAdapter.java | 11 ++ src/test/java/networking/MockConduit.java | 5 +- src/test/java/networking/stub/Hub.java | 126 +++++++++++++++++- .../java/networking/stub/StubNetwork.java | 21 ++- .../stub/StubNetworkStorageTest.java | 93 +++++++++++++ 7 files changed, 291 insertions(+), 11 deletions(-) diff --git a/src/main/java/model/exceptions/LightChainDistributedStorageException.java b/src/main/java/model/exceptions/LightChainDistributedStorageException.java index 5e5713a6..c7784fc0 100644 --- a/src/main/java/model/exceptions/LightChainDistributedStorageException.java +++ b/src/main/java/model/exceptions/LightChainDistributedStorageException.java @@ -3,4 +3,7 @@ /** * Represents a runtime exception happens on distributed storage layer of LightChain. */ -public class LightChainDistributedStorageException extends Exception{} +public class LightChainDistributedStorageException extends Exception{ + public LightChainDistributedStorageException(String could_not_get_the_entity) { + } +} diff --git a/src/main/java/model/lightchain/Identifier.java b/src/main/java/model/lightchain/Identifier.java index 7306ffbf..8286c0b3 100644 --- a/src/main/java/model/lightchain/Identifier.java +++ b/src/main/java/model/lightchain/Identifier.java @@ -4,11 +4,12 @@ import java.util.Arrays; import io.ipfs.multibase.Multibase; +import org.jetbrains.annotations.NotNull; /** * Represents a 32-byte unique identifier for an entity. Normally is computed as the hash value of the entity. */ -public class Identifier implements Serializable { +public class Identifier implements Serializable,Comparable { public static final int Size = 32; private final byte[] value; @@ -78,4 +79,42 @@ public int comparedTo(Identifier other) { int result = Arrays.compare(this.value, other.value); return Integer.compare(result, 0); } + + /** + * Compares this object with the specified object for order. Returns a + * negative integer, zero, or a positive integer as this object is less + * than, equal to, or greater than the specified object. + * + *

The implementor must ensure {@link Integer#signum + * signum}{@code (x.compareTo(y)) == -signum(y.compareTo(x))} for + * all {@code x} and {@code y}. (This implies that {@code + * x.compareTo(y)} must throw an exception if and only if {@code + * y.compareTo(x)} throws an exception.) + * + *

The implementor must also ensure that the relation is transitive: + * {@code (x.compareTo(y) > 0 && y.compareTo(z) > 0)} implies + * {@code x.compareTo(z) > 0}. + * + *

Finally, the implementor must ensure that {@code + * x.compareTo(y)==0} implies that {@code signum(x.compareTo(z)) + * == signum(y.compareTo(z))}, for all {@code z}. + * + * @param o the object to be compared. + * @return a negative integer, zero, or a positive integer as this object + * is less than, equal to, or greater than the specified object. + * @throws NullPointerException if the specified object is null + * @throws ClassCastException if the specified object's type prevents it + * from being compared to this object. + * @apiNote It is strongly recommended, but not strictly required that + * {@code (x.compareTo(y)==0) == (x.equals(y))}. Generally speaking, any + * class that implements the {@code Comparable} interface and violates + * this condition should clearly indicate this fact. The recommended + * language is "Note: this class has a natural ordering that is + * inconsistent with equals." + */ + @Override + public int compareTo(@NotNull Identifier o) { + int result = Arrays.compare(this.value, o.value); + return Integer.compare(result, 0); + } } diff --git a/src/main/java/network/NetworkAdapter.java b/src/main/java/network/NetworkAdapter.java index e5151104..14e46911 100644 --- a/src/main/java/network/NetworkAdapter.java +++ b/src/main/java/network/NetworkAdapter.java @@ -5,6 +5,8 @@ import model.exceptions.LightChainNetworkingException; import model.lightchain.Identifier; +import java.util.ArrayList; + /** * NetworkAdapter models the interface that is exposed to the conduits from the networking layer. */ @@ -38,4 +40,13 @@ public interface NetworkAdapter { * @throws LightChainDistributedStorageException any unhappy path taken on retrieving the Entity. */ Entity get(Identifier identifier, String namespace) throws LightChainDistributedStorageException; + + /** + * Retrieves all entities stored on the underlying DHT of nodes that stored on this channel. + * + * @param namespace the namespace on which this query is resolved. + * @return list of all entities stored on this channel from underlying DHT. + * @throws LightChainDistributedStorageException any unhappy path taken on retrieving the Entities. + */ + ArrayList allEntities(String namespace) throws LightChainDistributedStorageException; } diff --git a/src/test/java/networking/MockConduit.java b/src/test/java/networking/MockConduit.java index 86f2191e..93199be2 100644 --- a/src/test/java/networking/MockConduit.java +++ b/src/test/java/networking/MockConduit.java @@ -53,6 +53,7 @@ public void unicast(Entity e, Identifier target) throws LightChainNetworkingExce */ @Override public void put(Entity e) throws LightChainDistributedStorageException { + this.networkAdapter.put(e, channel); } @@ -66,12 +67,12 @@ public void put(Entity e) throws LightChainDistributedStorageException { */ @Override public Entity get(Identifier identifier) throws LightChainDistributedStorageException { - return null; + return this.networkAdapter.get(identifier, channel); } @Override public ArrayList allEntities() throws LightChainDistributedStorageException { - return null; + return this.networkAdapter.allEntities(channel); } public boolean hasSent(Identifier entityId) { diff --git a/src/test/java/networking/stub/Hub.java b/src/test/java/networking/stub/Hub.java index 3a53543c..52ac3751 100644 --- a/src/test/java/networking/stub/Hub.java +++ b/src/test/java/networking/stub/Hub.java @@ -1,8 +1,11 @@ package networking.stub; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import model.Entity; +import model.exceptions.LightChainDistributedStorageException; import model.lightchain.Identifier; import network.Network; @@ -10,30 +13,119 @@ * Models the core communication part of the networking layer that allows stub network instances to talk to each other. */ public class Hub { + private final ReentrantReadWriteLock lock; + private final String channel1 = "test-network-channel-1"; + private final String channel2 = "test-network-channel-2"; + private final List identifierSet; private final ConcurrentHashMap networks; + private final ConcurrentHashMap> channelMap1; + private final ConcurrentHashMap> channelMap2; /** * Create a hub. */ public Hub() { + this.lock = new ReentrantReadWriteLock(); this.networks = new ConcurrentHashMap<>(); + this.channelMap1 = new ConcurrentHashMap<>(); + this.channelMap2 = new ConcurrentHashMap<>(); + this.identifierSet = new ArrayList<>(); } /** * Registeration of a network to the Hub. * * @param identifier identifier of network. - * @param network to be registered. + * @param network to be registered. */ public void registerNetwork(Identifier identifier, Network network) { - networks.put(identifier, network); + networks.putIfAbsent(identifier, network); + channelMap1.putIfAbsent(identifier, new ConcurrentHashMap<>()); + channelMap2.putIfAbsent(identifier, new ConcurrentHashMap<>()); + identifierSet.add(identifier); + sortMaps(); + + } + + /** + * Sort the identifier of networks. + */ + public void sortMaps() { + Collections.sort(identifierSet); + } + + /** + * Put Entity to Node. + * + * @param e entitiy. + * @param namespace channel name. + */ + public void putEntityToChannel(Entity e, String namespace) throws LightChainDistributedStorageException { + try { + lock.writeLock().lock(); + Identifier identifierOfNetwork = binarySearch(e.id()); + if (namespace.equals(channel1)) { + channelMap1.get(identifierOfNetwork).put(e.id(), e); + } else if (namespace.equals(channel2)) { + channelMap2.get(identifierOfNetwork).put(e.id(), e); + } else { + throw new LightChainDistributedStorageException("entity could not be put the given channel"); + } + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Get entity from channel. + * + * @param identifier of entity. + * @param namespace channel name. + * @return entity. + */ + public Entity getEntityFromChannel(Identifier identifier, String namespace) throws LightChainDistributedStorageException { + try { + lock.readLock().lock(); + Identifier identifierOfNetwork = binarySearch(identifier); + if (namespace.equals(channel1)) { + return channelMap1.get(identifierOfNetwork).get(identifier); + } else if (namespace.equals(channel2)) { + return channelMap2.get(identifierOfNetwork).get(identifier); + } else { + throw new LightChainDistributedStorageException("could not get the entity"); + } + } finally { + lock.readLock().unlock(); + } + } + + /** + * Get the identifier of the network that store the entity. + * + * @param identifier identifier of the entity to be stored. + * @return identifier of the network that store the entity. + */ + public Identifier binarySearch(Identifier identifier) { + int left = 0, right = identifierSet.size() - 1; + while (left < right) { + int mid = left + (right - left) / 2; + if (identifierSet.get(mid).compareTo(identifier) == 0) { + return identifierSet.get(mid); + } + if (identifier.compareTo(identifierSet.get(mid)) > 0) { + left = mid + 1; + } else { + right = mid - 1; + } + } + return identifierSet.get(left); } /** * Transfer entity from to another network on the same channel. * - * @param entity entity to be transferred. - * @param target identifier of target. + * @param entity entity to be transferred. + * @param target identifier of target. * @param channel channel on which the entity is delivered to target. */ public void transferEntity(Entity entity, Identifier target, String channel) throws IllegalStateException { @@ -54,4 +146,30 @@ public void transferEntity(Entity entity, Identifier target, String channel) thr private StubNetwork getNetwork(Identifier identifier) { return (StubNetwork) networks.get(identifier); } + + /** + * Retrieves all entities stored on the underlying DHT of nodes that stored on this channel. + * + * @param namespace the namespace on which this query is resolved. + * @return list of all entities stored on this channel from underlying DHT. + * @throws LightChainDistributedStorageException any unhappy path taken on retrieving the Entities. + */ + public ArrayList getAllEntities(String namespace) throws LightChainDistributedStorageException { + ArrayList allEntities = new ArrayList<>(); + if (namespace.equals(channel1)) { + for (Identifier identifier : channelMap1.keySet()) { + List entityList = new ArrayList<>(channelMap1.get(identifier).values()); + allEntities.addAll(entityList); + } + return allEntities; + } else if (namespace.equals(channel2)) { + for (Identifier identifier : channelMap2.keySet()) { + List entityList = new ArrayList<>(channelMap2.get(identifier).values()); + allEntities.addAll(entityList); + } + return allEntities; + } else { + throw new LightChainDistributedStorageException("could not get the entities"); + } + } } \ No newline at end of file diff --git a/src/test/java/networking/stub/StubNetwork.java b/src/test/java/networking/stub/StubNetwork.java index 7eed4979..0e94dacf 100644 --- a/src/test/java/networking/stub/StubNetwork.java +++ b/src/test/java/networking/stub/StubNetwork.java @@ -1,5 +1,6 @@ package networking.stub; +import java.util.ArrayList; import java.util.concurrent.ConcurrentHashMap; import model.Entity; @@ -17,6 +18,8 @@ * A mock implementation of networking layer as a test util. */ public class StubNetwork implements Network, NetworkAdapter { + private final String channel1 = "test-network-channel-1"; + private final String channel2 = "test-network-channel-2"; private final ConcurrentHashMap engines; private final Hub hub; private final Identifier identifier; @@ -98,7 +101,6 @@ public void unicast(Entity e, Identifier target, String channel) throws LightCha } catch (IllegalStateException ex) { throw new LightChainNetworkingException("stub network could not transfer entity", ex); } - } /** @@ -110,7 +112,7 @@ public void unicast(Entity e, Identifier target, String channel) throws LightCha */ @Override public void put(Entity e, String namespace) throws LightChainDistributedStorageException { - + this.hub.putEntityToChannel(e, namespace); } /** @@ -124,6 +126,19 @@ public void put(Entity e, String namespace) throws LightChainDistributedStorageE */ @Override public Entity get(Identifier identifier, String namespace) throws LightChainDistributedStorageException { - return null; + return this.hub.getEntityFromChannel(identifier, namespace); } + + /** + * Retrieves all entities stored on the underlying DHT of nodes that stored on this channel. + * + * @param namespace the namespace on which this query is resolved. + * @return list of all entities stored on this channel from underlying DHT. + * @throws LightChainDistributedStorageException any unhappy path taken on retrieving the Entities. + */ + @Override + public ArrayList allEntities(String namespace) throws LightChainDistributedStorageException { + return this.hub.getAllEntities(namespace); + } + } \ No newline at end of file diff --git a/src/test/java/networking/stub/StubNetworkStorageTest.java b/src/test/java/networking/stub/StubNetworkStorageTest.java index bb73a1bb..9d75b36b 100644 --- a/src/test/java/networking/stub/StubNetworkStorageTest.java +++ b/src/test/java/networking/stub/StubNetworkStorageTest.java @@ -1,9 +1,25 @@ package networking.stub; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import model.Entity; +import model.exceptions.LightChainDistributedStorageException; +import network.Conduit; +import networking.MockEngine; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import unittest.fixtures.EntityFixture; + /** * Encapsulates tests for the storage side of the stub network. */ public class StubNetworkStorageTest { + private final String channel1 = "test-network-channel-1"; + private final String channel2 = "test-network-channel-2"; + // TODO: implement test scenarios // Use mock engines with stub network. // 1. Engine A1 (on one network) puts an entity on channel1 and Engine B1 on another network can get it on the @@ -14,4 +30,81 @@ public class StubNetworkStorageTest { // 3. Engine A1 (on one network) can CONCURRENTLY put 100 different entities on channel1, and // Engine B1 on another network can get all of them at once using allEntities method, // while Engine B2 on another channel2 can't get none of them using all. + + /** + * With 10 networks each with one engine on channel 1 and one engine on channel 2. Each engine concurrently puts 100 + * entities on the channel that it is registered on. Then, all engines registering on channel 1 should be able to get + * all entities that other engines have put on this channel (total of 1000), while could not get any of entities that + * have been put on channel 2. The same should be true for engines of channel 2. + */ + @Test + void putAndGetConcurrently() throws LightChainDistributedStorageException { + int concurrencyDegree = 2000; + AtomicInteger threadError = new AtomicInteger(); + Thread[] putThreads = new Thread[concurrencyDegree]; + CountDownLatch putDone = new CountDownLatch(concurrencyDegree); + ArrayList conduits1 = new ArrayList<>(); + ArrayList conduits2 = new ArrayList<>(); + ArrayList allEntitiesChannel1 = new ArrayList<>(); + ArrayList allEntitiesChannel2 = new ArrayList<>(); + Hub hub = new Hub(); + for (int i = 0; i < 10; i++) { + StubNetwork stubNetwork = new StubNetwork(hub); + MockEngine engine1 = new MockEngine(); + MockEngine engine2 = new MockEngine(); + Conduit conduit1 = stubNetwork.register(engine1, channel1); + Conduit conduit2 = stubNetwork.register(engine2, channel2); + conduits1.add(conduit1); + conduits2.add(conduit2); + } + int counter = 0; + for (int k = 0; k < 2; k++) { + int finalK = k; + for (int j = 0; j < 10; j++) { + int finalJ = j; + for (int i = 0; i < 100; i++) { + putThreads[counter] = new Thread(() -> { + try { + Conduit conduit; + if (finalK == 0) { + Entity entity1 = new EntityFixture(); + allEntitiesChannel1.add(entity1); + conduit = conduits1.get(finalJ); + conduit.put(entity1); + } else { + Entity entity1 = new EntityFixture(); + allEntitiesChannel2.add(entity1); + conduit = conduits2.get(finalJ); + conduit.put(entity1); + } + + putDone.countDown(); + } catch (LightChainDistributedStorageException e) { + threadError.getAndIncrement(); + } + }); + counter++; + } + } + } + for (Thread t : putThreads) { + t.start(); + } + try { + boolean doneOneTime = putDone.await(60, TimeUnit.SECONDS); + Assertions.assertTrue(doneOneTime); + } catch (InterruptedException e) { + Assertions.fail(); + } + for (Conduit conduit : conduits1) { + Assertions.assertTrue(conduit.allEntities().containsAll(allEntitiesChannel1)); + Assertions.assertEquals(conduit.allEntities().size(), 1000); + + } + for (Conduit conduit : conduits2) { + Assertions.assertTrue(conduit.allEntities().containsAll(allEntitiesChannel2)); + Assertions.assertEquals(conduit.allEntities().size(), 1000); + } + Assertions.assertEquals(0, threadError.get()); + } } From 5dbfc4753794bc5f468111b8ba2b5e00e2d50f0c Mon Sep 17 00:00:00 2001 From: akucukoduk16 Date: Tue, 17 May 2022 16:42:42 +0300 Subject: [PATCH 2/3] style fix --- .../exceptions/LightChainDistributedStorageException.java | 4 ++-- src/main/java/model/lightchain/Identifier.java | 2 +- src/main/java/network/NetworkAdapter.java | 4 ++-- src/test/java/networking/stub/Hub.java | 6 ++++-- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/main/java/model/exceptions/LightChainDistributedStorageException.java b/src/main/java/model/exceptions/LightChainDistributedStorageException.java index c7784fc0..cdb4d93f 100644 --- a/src/main/java/model/exceptions/LightChainDistributedStorageException.java +++ b/src/main/java/model/exceptions/LightChainDistributedStorageException.java @@ -3,7 +3,7 @@ /** * Represents a runtime exception happens on distributed storage layer of LightChain. */ -public class LightChainDistributedStorageException extends Exception{ - public LightChainDistributedStorageException(String could_not_get_the_entity) { +public class LightChainDistributedStorageException extends Exception { + public LightChainDistributedStorageException(String e) { } } diff --git a/src/main/java/model/lightchain/Identifier.java b/src/main/java/model/lightchain/Identifier.java index 8286c0b3..a04355a6 100644 --- a/src/main/java/model/lightchain/Identifier.java +++ b/src/main/java/model/lightchain/Identifier.java @@ -9,7 +9,7 @@ /** * Represents a 32-byte unique identifier for an entity. Normally is computed as the hash value of the entity. */ -public class Identifier implements Serializable,Comparable { +public class Identifier implements Serializable, Comparable { public static final int Size = 32; private final byte[] value; diff --git a/src/main/java/network/NetworkAdapter.java b/src/main/java/network/NetworkAdapter.java index 14e46911..956f435c 100644 --- a/src/main/java/network/NetworkAdapter.java +++ b/src/main/java/network/NetworkAdapter.java @@ -1,12 +1,12 @@ package network; +import java.util.ArrayList; + import model.Entity; import model.exceptions.LightChainDistributedStorageException; import model.exceptions.LightChainNetworkingException; import model.lightchain.Identifier; -import java.util.ArrayList; - /** * NetworkAdapter models the interface that is exposed to the conduits from the networking layer. */ diff --git a/src/test/java/networking/stub/Hub.java b/src/test/java/networking/stub/Hub.java index 52ac3751..9ad126d9 100644 --- a/src/test/java/networking/stub/Hub.java +++ b/src/test/java/networking/stub/Hub.java @@ -83,7 +83,8 @@ public void putEntityToChannel(Entity e, String namespace) throws LightChainDist * @param namespace channel name. * @return entity. */ - public Entity getEntityFromChannel(Identifier identifier, String namespace) throws LightChainDistributedStorageException { + public Entity getEntityFromChannel(Identifier identifier, String namespace) + throws LightChainDistributedStorageException { try { lock.readLock().lock(); Identifier identifierOfNetwork = binarySearch(identifier); @@ -106,7 +107,8 @@ public Entity getEntityFromChannel(Identifier identifier, String namespace) thro * @return identifier of the network that store the entity. */ public Identifier binarySearch(Identifier identifier) { - int left = 0, right = identifierSet.size() - 1; + int left = 0; + int right = identifierSet.size() - 1; while (left < right) { int mid = left + (right - left) / 2; if (identifierSet.get(mid).compareTo(identifier) == 0) { From 569e3b6c5c94594641f70d62629496e34bc4c662 Mon Sep 17 00:00:00 2001 From: akucukoduk16 Date: Thu, 9 Jun 2022 21:05:46 +0300 Subject: [PATCH 3/3] binary search fix --- src/test/java/networking/stub/Hub.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/networking/stub/Hub.java b/src/test/java/networking/stub/Hub.java index 9ad126d9..e88f9e66 100644 --- a/src/test/java/networking/stub/Hub.java +++ b/src/test/java/networking/stub/Hub.java @@ -109,15 +109,15 @@ public Entity getEntityFromChannel(Identifier identifier, String namespace) public Identifier binarySearch(Identifier identifier) { int left = 0; int right = identifierSet.size() - 1; - while (left < right) { + while (left + 1 < right) { int mid = left + (right - left) / 2; if (identifierSet.get(mid).compareTo(identifier) == 0) { return identifierSet.get(mid); } if (identifier.compareTo(identifierSet.get(mid)) > 0) { - left = mid + 1; + left = mid; } else { - right = mid - 1; + right = mid; } } return identifierSet.get(left);