From a9314957fcd39463ab0d4b84057bb31e7b241975 Mon Sep 17 00:00:00 2001 From: Luca Terracciano Date: Wed, 17 Sep 2025 14:49:40 +0200 Subject: [PATCH] Revert "refactor: channel creation" --- .gitmodules | 3 - CloudR | 1 - include/hllm/channelCreationImpl.hpp | 346 ++++++++++++--------------- include/hllm/engine.hpp | 51 +++- 4 files changed, 197 insertions(+), 204 deletions(-) delete mode 160000 CloudR diff --git a/.gitmodules b/.gitmodules index b669e15..61b41c3 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,6 +7,3 @@ [submodule "extern/DeployR"] path = extern/DeployR url = https://github.com/Algebraic-Programming/DeployR.git -[submodule "CloudR"] - path = CloudR - url = git@github.com:Algebraic-Programming/CloudR.git diff --git a/CloudR b/CloudR deleted file mode 160000 index d8df57d..0000000 --- a/CloudR +++ /dev/null @@ -1 +0,0 @@ -Subproject commit d8df57d878f00b230cd789bbd7e307505d668036 diff --git a/include/hllm/channelCreationImpl.hpp b/include/hllm/channelCreationImpl.hpp index 721368e..c198850 100644 --- a/include/hllm/channelCreationImpl.hpp +++ b/include/hllm/channelCreationImpl.hpp @@ -47,59 +47,105 @@ __INLINE__ std::unique_ptr Engine::moveCons return consumer; } -#define SIZES_BUFFER_KEY 0 -#define CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY 3 -#define CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY 4 -#define CONSUMER_PAYLOAD_KEY 5 -#define _CHANNEL_CREATION_ERROR 1 - -__INLINE__ std::vector allocateChannelBuffers( - HiCR::MemoryManager &memoryManager, - std::shared_ptr &memorySpace, - HiCR::GlobalMemorySlot::tag_t channelTag, - size_t bufferSize, - size_t bufferCapacity, - std::map> &localCoordinationBufferForSizesMap, - std::map> &localCoordinationBufferForPayloadsMap) +// void newcreate() { std::set tags; } + +void Engine::createChannels(HiCR::CommunicationManager &bufferedCommunicationManager, + HiCR::MemoryManager &bufferedMemoryManager, + std::shared_ptr &bufferedMemorySpace, + HiCR::CommunicationManager &unbufferedCommunicationManager, + HiCR::MemoryManager &unbufferedMemoryManager, + std::shared_ptr &unbufferedMemorySpace) { - // Vector to store slots to exchange - std::vector memorySlotsToExchange; + // Create the channels + auto channelId = 0; - // Getting required buffer sizes - auto sizesBufferSize = HiCR::channel::variableSize::Base::getTokenBufferSize(sizeof(size_t), bufferCapacity); + const auto &dependencies = _config["Dependencies"]; - // Allocating sizes buffer as a local memory slot - auto sizesBufferSlot = memoryManager.allocateLocalMemorySlot(memorySpace, sizesBufferSize); + std::set bufferedDependencies; + std::set unbufferedDependencies; - // Allocating payload buffer as a local memory slot - auto payloadBufferSlot = memoryManager.allocateLocalMemorySlot(memorySpace, bufferSize); + for (const auto &[name, dependency] : dependencies.items()) + { + if (hicr::json::getString(dependency, "Type") == "Buffered") { bufferedDependencies.emplace(name); } + else { unbufferedDependencies.emplace(name); } + } - // Getting required buffer size - auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); + // Create unbuffered dependencies channels + for (const auto &name : unbufferedDependencies) + { + // Get dependency informations + const auto &dependency = hicr::json::getObject(dependencies, name); - // Allocating coordination buffer for internal message size metadata - auto localCoordinationBufferForSizes = memoryManager.allocateLocalMemorySlot(memorySpace, coordinationBufferSize); + // Getting channel's producer + const auto &producerPartitionId = hicr::json::getNumber(dependency, "Producer"); - // Allocating coordination buffer for internal payload metadata - auto localCoordinationBufferForPayloads = memoryManager.allocateLocalMemorySlot(memorySpace, coordinationBufferSize); + // Getting channel's consumer + const auto &consumerPartitionId = hicr::json::getNumber(dependency, "Consumer"); - // Initializing coordination buffer (sets to zero the counters) - HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localCoordinationBufferForSizes); - HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localCoordinationBufferForPayloads); + // Getting buffer capacity (max token count) + const auto &bufferCapacity = hicr::json::getNumber(dependency, "Buffer Capacity (Tokens)"); - // Adding memory slots to exchange - memorySlotsToExchange.push_back({SIZES_BUFFER_KEY, sizesBufferSlot}); - memorySlotsToExchange.push_back({CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY, localCoordinationBufferForSizes}); - memorySlotsToExchange.push_back({CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY, localCoordinationBufferForPayloads}); - memorySlotsToExchange.push_back({CONSUMER_PAYLOAD_KEY, payloadBufferSlot}); + // Getting buffer size (bytes) + const auto &bufferSize = hicr::json::getNumber(dependency, "Buffer Size (Bytes)"); - // Store coordination buffers - localCoordinationBufferForSizesMap[channelTag] = std::move(localCoordinationBufferForSizes); - localCoordinationBufferForPayloadsMap[channelTag] = std::move(localCoordinationBufferForPayloads); + // Creating channel object and increase channelId + //TODO: for malleability check the channel type to be used + auto [producer, consumer] = createChannel(channelId++, + name, + producerPartitionId == _partitionId, + consumerPartitionId == _partitionId, + unbufferedCommunicationManager, + unbufferedMemoryManager, + unbufferedMemorySpace, + bufferCapacity, + bufferSize); + + // Adding channel to map, only if defined + if (producer != nullptr) _producers.emplace(name, std::move(producer)); + if (consumer != nullptr) _consumers.emplace(name, std::move(consumer)); + } - return memorySlotsToExchange; + for (const auto &name : bufferedDependencies) + { + // Get dependency informations + const auto &dependency = hicr::json::getObject(dependencies, name); + + // Getting channel's producer + const auto &producerPartitionId = hicr::json::getNumber(dependency, "Producer"); + + // Getting channel's consumer + const auto &consumerPartitionId = hicr::json::getNumber(dependency, "Consumer"); + + // Getting buffer capacity (max token count) + const auto &bufferCapacity = hicr::json::getNumber(dependency, "Buffer Capacity (Tokens)"); + + // Getting buffer size (bytes) + const auto &bufferSize = hicr::json::getNumber(dependency, "Buffer Size (Bytes)"); + + // Creating channel object and increase channelId + //TODO: for malleability check the channel type to be used + auto [producer, consumer] = createChannel(channelId++, + name, + producerPartitionId == _partitionId, + consumerPartitionId == _partitionId, + bufferedCommunicationManager, + bufferedMemoryManager, + bufferedMemorySpace, + bufferCapacity, + bufferSize); + + // Adding channel to map, only if defined + if (producer != nullptr) _producers.emplace(name, std::move(producer)); + if (consumer != nullptr) _consumers.emplace(name, std::move(consumer)); + } } +#define SIZES_BUFFER_KEY 0 +#define CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY 3 +#define CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY 4 +#define CONSUMER_PAYLOAD_KEY 5 +#define _CHANNEL_CREATION_ERROR 1 + __INLINE__ std::unique_ptr createConsumer(HiCR::CommunicationManager &communicationManager, HiCR::GlobalMemorySlot::tag_t channelTag, size_t bufferSize, @@ -166,175 +212,91 @@ __INLINE__ std::unique_ptr createProducer(H bufferCapacity); } -__INLINE__ void createChannelBuffers(HiCR::MemoryManager &memoryManager, - std::shared_ptr &memorySpace, - std::map &consumers, - std::map> &memorySlotsToExchange, - std::map> &coordinationBufferSizesSlots, - std::map> &coordinationBufferPayloadsSlots) -{ - // Exchange unbuffered memory slots - for (const auto &[tag, dependency] : consumers) - { - // Getting buffer capacity (max token count) - const auto &bufferCapacity = hicr::json::getNumber(dependency, "Buffer Capacity (Tokens)"); - - // Getting buffer size (bytes) - const auto &bufferSize = hicr::json::getNumber(dependency, "Buffer Size (Bytes)"); - - // Create Global Memory Slots to exchange - memorySlotsToExchange[tag] = allocateChannelBuffers(memoryManager, memorySpace, tag, bufferSize, bufferCapacity, coordinationBufferSizesSlots, coordinationBufferPayloadsSlots); - } -} - -void createChannels(HiCR::MemoryManager &memoryManager, - HiCR::CommunicationManager &communicationManager, - std::shared_ptr &memorySpace, - std::map &consumersData, - std::map &producersData, - HiCR::GlobalMemorySlot::tag_t channelsIds, - std::map &channelTagNameMap, - std::unordered_map> &consumers, - std::unordered_map> &producers) +__INLINE__ std::vector exchangeGlobalMemorySlots(HiCR::MemoryManager &memoryManager, + std::shared_ptr &memorySpace, + HiCR::GlobalMemorySlot::tag_t channelTag, + size_t bufferSize, + size_t bufferCapacity, + std::shared_ptr &localCoordinationBufferForSizes, + std::shared_ptr &localCoordinationBufferForPayloads) { - std::map> memorySlotsToExchange; - std::map> coordinationBufferSizesSlots; - std::map> coordinationBufferPayloadsSlots; - - // Exchange unbuffered memory slots - for (const auto &[tag, dependency] : consumersData) - { - // Getting buffer capacity (max token count) - const auto &bufferCapacity = hicr::json::getNumber(dependency, "Buffer Capacity (Tokens)"); + // Collection of memory slots to exchange and their keys + std::vector memorySlotsToExchange; + // Getting required buffer sizes + auto sizesBufferSize = HiCR::channel::variableSize::Base::getTokenBufferSize(sizeof(size_t), bufferCapacity); - // Getting buffer size (bytes) - const auto &bufferSize = hicr::json::getNumber(dependency, "Buffer Size (Bytes)"); + // Allocating sizes buffer as a local memory slot + auto sizesBufferSlot = memoryManager.allocateLocalMemorySlot(memorySpace, sizesBufferSize); - // Create Global Memory Slots to exchange - memorySlotsToExchange[tag] = allocateChannelBuffers(memoryManager, memorySpace, tag, bufferSize, bufferCapacity, coordinationBufferSizesSlots, coordinationBufferPayloadsSlots); - } + // Allocating payload buffer as a local memory slot + auto payloadBufferSlot = memoryManager.allocateLocalMemorySlot(memorySpace, bufferSize); - // Exchange the Global Memory Slots - for (HiCR::GlobalMemorySlot::tag_t tag = 0; tag < channelsIds; ++tag) - { - if (memorySlotsToExchange.contains(tag)) - communicationManager.exchangeGlobalMemorySlots(tag, memorySlotsToExchange[tag]); - else - communicationManager.exchangeGlobalMemorySlots(tag, {}); - } + // Getting required buffer size + auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); - // Fence - for (HiCR::GlobalMemorySlot::tag_t tag = 0; tag < channelsIds; ++tag) communicationManager.fence(tag); + // Allocating coordination buffer for internal message size metadata + localCoordinationBufferForSizes = memoryManager.allocateLocalMemorySlot(memorySpace, coordinationBufferSize); - for (const auto &[tag, dependency] : consumersData) - { - // Getting buffer capacity (max token count) - const auto &bufferCapacity = hicr::json::getNumber(dependency, "Buffer Capacity (Tokens)"); + // Allocating coordination buffer for internal payload metadata + localCoordinationBufferForPayloads = memoryManager.allocateLocalMemorySlot(memorySpace, coordinationBufferSize); - // Getting buffer size (bytes) - const auto &bufferSize = hicr::json::getNumber(dependency, "Buffer Size (Bytes)"); - consumers.emplace(channelTagNameMap[tag], - createConsumer(communicationManager, tag, bufferSize, bufferCapacity, coordinationBufferSizesSlots[tag], coordinationBufferPayloadsSlots[tag])); - } + // Initializing coordination buffer (sets to zero the counters) + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localCoordinationBufferForSizes); + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localCoordinationBufferForPayloads); - for (const auto &[tag, dependency] : producersData) - { - // Getting buffer capacity (max token count) - const auto &bufferCapacity = hicr::json::getNumber(dependency, "Buffer Capacity (Tokens)"); + // Adding memory slots to exchange + memorySlotsToExchange.push_back({SIZES_BUFFER_KEY, sizesBufferSlot}); + memorySlotsToExchange.push_back({CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY, localCoordinationBufferForSizes}); + memorySlotsToExchange.push_back({CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY, localCoordinationBufferForPayloads}); + memorySlotsToExchange.push_back({CONSUMER_PAYLOAD_KEY, payloadBufferSlot}); - // Getting buffer size (bytes) - const auto &bufferSize = hicr::json::getNumber(dependency, "Buffer Size (Bytes)"); - producers.emplace(channelTagNameMap[tag], createProducer(communicationManager, memoryManager, memorySpace, tag, bufferSize, bufferCapacity)); - } + return memorySlotsToExchange; } -void Engine::createDependencies() + +__INLINE__ std::pair, std::unique_ptr> Engine::createChannel( + const size_t channelTag, + const std::string channelName, + const bool isProducer, + const bool isConsumer, + HiCR::CommunicationManager &communicationManager, + HiCR::MemoryManager &memoryManager, + std::shared_ptr &memorySpace, + const size_t bufferCapacity, + const size_t bufferSize) { - // Tags for channels - HiCR::GlobalMemorySlot::tag_t bufferedChannelId = 0; - HiCR::GlobalMemorySlot::tag_t unbufferedChannelId = 0; + // Interfaces for the channel + std::unique_ptr consumerInterface = nullptr; + std::unique_ptr producerInterface = nullptr; - // Get the application dependencies - const auto &dependencies = _config["Dependencies"]; + // Collection of memory slots to exchange and their keys + std::vector memorySlotsToExchange; - // Extract dependency informations - std::map bufferedConsumers; - std::map bufferedProducers; - std::map unbufferedConsumers; - std::map unbufferedProducers; - std::map bufferedChannelTagNameMap; - std::map unbufferedChannelTagNameMap; + // Temporary storage for coordination buffer pointers + std::shared_ptr localCoordinationBufferForSizes = nullptr; + std::shared_ptr localCoordinationBufferForPayloads = nullptr; - for (const auto &[name, dependency] : dependencies.items()) + ////// Pre-Exchange: Create local slots and register them for exchange + if (isConsumer == true) { - // Getting channel's producer - const auto &producerPartitionId = hicr::json::getNumber(dependency, "Producer"); - - // Getting channel's consumer - const auto &consumerPartitionId = hicr::json::getNumber(dependency, "Consumer"); - - // Store the producer - if (_partitionId == producerPartitionId) - { - if (hicr::json::getString(dependency, "Type") == "Buffered") - { - // Store the producer - bufferedProducers[bufferedChannelId] = dependency; - - // Store tag name mapping - bufferedChannelTagNameMap[bufferedChannelId] = name; - } - else - { - // Store the producer - unbufferedProducers[unbufferedChannelId] = dependency; - } - } - - if (_partitionId == consumerPartitionId) - { - if (hicr::json::getString(dependency, "Type") == "Buffered") - { - // Store the consumer - bufferedConsumers[bufferedChannelId] = dependency; - - // Store the tag name mapping - bufferedChannelTagNameMap[bufferedChannelId] = name; - } - else - { - // Store the consumer - unbufferedConsumers[unbufferedChannelId] = dependency; - - // Store tag name mapping - unbufferedChannelTagNameMap[unbufferedChannelId] = name; - - // Increase unbuffered channel id locally. The unbuffered are instance local - unbufferedChannelId++; - } - } - - // Increase buffered channel id for everyone. All the instances should participate - bufferedChannelId++; + memorySlotsToExchange = + exchangeGlobalMemorySlots(memoryManager, memorySpace, channelTag, bufferSize, bufferCapacity, localCoordinationBufferForSizes, localCoordinationBufferForPayloads); } + ////// Exchange: local memory slots to become global for them to be used by the remote end + communicationManager.exchangeGlobalMemorySlots(channelTag, memorySlotsToExchange); + // Synchronizing so that all actors have finished registering their global memory slots + communicationManager.fence(channelTag); + + ///// Post exchange: create consumer/producer intefaces + // If I am a consumer, create consumer interface now + if (isConsumer == true) + { + consumerInterface = createConsumer(communicationManager, channelTag, bufferSize, bufferCapacity, localCoordinationBufferForSizes, localCoordinationBufferForPayloads); + } + + // If I am producer, create the producer interface for the channel + if (isProducer == true) { producerInterface = createProducer(communicationManager, memoryManager, memorySpace, channelTag, bufferSize, bufferCapacity); } - createChannels(*_unbufferedMemoryManager, - *_unbufferedCommunicationManager, - _unbufferedMemorySpace, - unbufferedConsumers, - unbufferedProducers, - unbufferedChannelId, - unbufferedChannelTagNameMap, - _consumers, - _producers); - createChannels(*_bufferedMemoryManager, - *_bufferedCommunicationManager, - _bufferedMemorySpace, - bufferedConsumers, - bufferedProducers, - bufferedChannelId, - bufferedChannelTagNameMap, - _consumers, - _producers); + return {std::move(producerInterface), std::move(consumerInterface)}; } } // namespace hLLM \ No newline at end of file diff --git a/include/hllm/engine.hpp b/include/hllm/engine.hpp index a44d0f2..01376a8 100644 --- a/include/hllm/engine.hpp +++ b/include/hllm/engine.hpp @@ -168,7 +168,36 @@ class Engine final /** * Function to create all the channels based on the previously passed configuration * */ - __INLINE__ void createDependencies(); + void createChannels(HiCR::CommunicationManager &bufferedCommunicationManager, + HiCR::MemoryManager &bufferedMemoryManager, + std::shared_ptr &bufferedMemorySpace, + HiCR::CommunicationManager &unbufferedCommunicationManager, + HiCR::MemoryManager &unbufferedMemoryManager, + std::shared_ptr &unbufferedMemorySpace); + + /** + * Function to create a variable-sized token locking channel between N producers and 1 consumer + * + * @note This is a collective operation. All instances must participate in this call, even if they don't play a producer or consumer role + * + * @param[in] channelTag The unique identifier for the channel. This tag should be unique for each channel + * @param[in] channelName The name of the channel. This will be the identifier used to retrieve the channel + * @param[in] isProducer whether a producer should be created + * @param[in] isConsumer whether a consumer should be created + * @param[in] bufferCapacity The number of tokens that can be simultaneously held in the channel's buffer + * @param[in] bufferSize The size (bytes) of the buffer. + * @return A shared pointer of the newly created channel + */ + __INLINE__ std::pair, std::unique_ptr> createChannel( + const size_t channelTag, + const std::string channelName, + const bool isProducer, + const bool isConsumer, + HiCR::CommunicationManager &communicationManager, + HiCR::MemoryManager &memoryManager, + std::shared_ptr &memorySpace, + const size_t bufferCapacity, + const size_t bufferSize); __INLINE__ void doLocalTermination() { @@ -321,11 +350,20 @@ class Engine final // If I am the deployer, search for its own partition id for (const auto &[partitionId, runnerId] : _partitionRunnerIdMap) { - if (runnerId == _deployr.getRunnerId()) { _partitionId = partitionId; } + if (runnerId == _deployr.getRunnerId()) + { + _partitionId = partitionId; + } } // Wait for incoming rpc from all the other_ =nner_ - for (size_t _ = 0; _ < _deployment.getRunners().size() - 1; _++) { _rpcEngine->listen(); } + for (size_t _ = 0; _ < _deployment.getRunners().size() - 1; _++) + { + _rpcEngine->listen(); + printf("listened to request %zu\n", _); + } + + printf("finished listening\n"); } else { @@ -339,8 +377,6 @@ class Engine final // Broadcast the partition id to all the instances broadcastPartitionId(); - printf("[hLLM] Partition %lu starts\n", _partitionId); - // Finding partition information on the configuration const auto &partitionsJs = hicr::json::getArray(_config, "Partitions"); nlohmann::json partitionConfig; @@ -353,10 +389,9 @@ class Engine final // Set partition name _partitionName = hicr::json::getString(partitionConfig, "Name"); - printf("[hLLM] Partition %lu starts creating its dependencies...\n", _partitionId); // Create channels - createDependencies(); - printf("[hLLM] Partition %lu created the dependencies\n", _partitionId); + createChannels( + *_bufferedCommunicationManager, *_bufferedMemoryManager, _bufferedMemorySpace, *_unbufferedCommunicationManager, *_unbufferedMemoryManager, _unbufferedMemorySpace); // Starting a new deployment _continueRunning = true;