From ad1784898162bb84a6f1d576e5ce42c194ec8733 Mon Sep 17 00:00:00 2001 From: Luca Terracciano Date: Wed, 17 Sep 2025 13:04:44 +0200 Subject: [PATCH] refactor: exchange and fence are done all together in channel creation --- include/hllm/channelCreationImpl.hpp | 346 +++++++++++++++------------ include/hllm/engine.hpp | 51 +--- 2 files changed, 200 insertions(+), 197 deletions(-) diff --git a/include/hllm/channelCreationImpl.hpp b/include/hllm/channelCreationImpl.hpp index c198850..721368e 100644 --- a/include/hllm/channelCreationImpl.hpp +++ b/include/hllm/channelCreationImpl.hpp @@ -47,105 +47,59 @@ __INLINE__ std::unique_ptr Engine::moveCons return consumer; } -// void newcreate() { std::set tags; } - -void Engine::createChannels(HiCR::CommunicationManager &bufferedCommunicationManager, - HiCR::MemoryManager &bufferedMemoryManager, - std::shared_ptr &bufferedMemorySpace, - HiCR::CommunicationManager &unbufferedCommunicationManager, - HiCR::MemoryManager &unbufferedMemoryManager, - std::shared_ptr &unbufferedMemorySpace) -{ - // Create the channels - auto channelId = 0; - - const auto &dependencies = _config["Dependencies"]; - - std::set bufferedDependencies; - std::set unbufferedDependencies; - - for (const auto &[name, dependency] : dependencies.items()) - { - if (hicr::json::getString(dependency, "Type") == "Buffered") { bufferedDependencies.emplace(name); } - else { unbufferedDependencies.emplace(name); } - } - - // Create unbuffered dependencies channels - for (const auto &name : unbufferedDependencies) - { - // Get dependency informations - const auto &dependency = hicr::json::getObject(dependencies, name); +#define SIZES_BUFFER_KEY 0 +#define CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY 3 +#define CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY 4 +#define CONSUMER_PAYLOAD_KEY 5 +#define _CHANNEL_CREATION_ERROR 1 - // Getting channel's producer - const auto &producerPartitionId = hicr::json::getNumber(dependency, "Producer"); +__INLINE__ std::vector allocateChannelBuffers( + HiCR::MemoryManager &memoryManager, + std::shared_ptr &memorySpace, + HiCR::GlobalMemorySlot::tag_t channelTag, + size_t bufferSize, + size_t bufferCapacity, + std::map> &localCoordinationBufferForSizesMap, + std::map> &localCoordinationBufferForPayloadsMap) +{ + // Vector to store slots to exchange + std::vector memorySlotsToExchange; - // Getting channel's consumer - const auto &consumerPartitionId = hicr::json::getNumber(dependency, "Consumer"); + // Getting required buffer sizes + auto sizesBufferSize = HiCR::channel::variableSize::Base::getTokenBufferSize(sizeof(size_t), bufferCapacity); - // Getting buffer capacity (max token count) - const auto &bufferCapacity = hicr::json::getNumber(dependency, "Buffer Capacity (Tokens)"); + // Allocating sizes buffer as a local memory slot + auto sizesBufferSlot = memoryManager.allocateLocalMemorySlot(memorySpace, sizesBufferSize); - // Getting buffer size (bytes) - const auto &bufferSize = hicr::json::getNumber(dependency, "Buffer Size (Bytes)"); + // Allocating payload buffer as a local memory slot + auto payloadBufferSlot = memoryManager.allocateLocalMemorySlot(memorySpace, bufferSize); - // Creating channel object and increase channelId - //TODO: for malleability check the channel type to be used - auto [producer, consumer] = createChannel(channelId++, - name, - producerPartitionId == _partitionId, - consumerPartitionId == _partitionId, - unbufferedCommunicationManager, - unbufferedMemoryManager, - unbufferedMemorySpace, - bufferCapacity, - bufferSize); - - // Adding channel to map, only if defined - if (producer != nullptr) _producers.emplace(name, std::move(producer)); - if (consumer != nullptr) _consumers.emplace(name, std::move(consumer)); - } + // Getting required buffer size + auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); - for (const auto &name : bufferedDependencies) - { - // Get dependency informations - const auto &dependency = hicr::json::getObject(dependencies, name); + // Allocating coordination buffer for internal message size metadata + auto localCoordinationBufferForSizes = memoryManager.allocateLocalMemorySlot(memorySpace, coordinationBufferSize); - // Getting channel's producer - const auto &producerPartitionId = hicr::json::getNumber(dependency, "Producer"); + // Allocating coordination buffer for internal payload metadata + auto localCoordinationBufferForPayloads = memoryManager.allocateLocalMemorySlot(memorySpace, coordinationBufferSize); - // Getting channel's consumer - const auto &consumerPartitionId = hicr::json::getNumber(dependency, "Consumer"); + // Initializing coordination buffer (sets to zero the counters) + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localCoordinationBufferForSizes); + HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localCoordinationBufferForPayloads); - // Getting buffer capacity (max token count) - const auto &bufferCapacity = hicr::json::getNumber(dependency, "Buffer Capacity (Tokens)"); + // Adding memory slots to exchange + memorySlotsToExchange.push_back({SIZES_BUFFER_KEY, sizesBufferSlot}); + memorySlotsToExchange.push_back({CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY, localCoordinationBufferForSizes}); + memorySlotsToExchange.push_back({CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY, localCoordinationBufferForPayloads}); + memorySlotsToExchange.push_back({CONSUMER_PAYLOAD_KEY, payloadBufferSlot}); - // Getting buffer size (bytes) - const auto &bufferSize = hicr::json::getNumber(dependency, "Buffer Size (Bytes)"); + // Store coordination buffers + localCoordinationBufferForSizesMap[channelTag] = std::move(localCoordinationBufferForSizes); + localCoordinationBufferForPayloadsMap[channelTag] = std::move(localCoordinationBufferForPayloads); - // Creating channel object and increase channelId - //TODO: for malleability check the channel type to be used - auto [producer, consumer] = createChannel(channelId++, - name, - producerPartitionId == _partitionId, - consumerPartitionId == _partitionId, - bufferedCommunicationManager, - bufferedMemoryManager, - bufferedMemorySpace, - bufferCapacity, - bufferSize); - - // Adding channel to map, only if defined - if (producer != nullptr) _producers.emplace(name, std::move(producer)); - if (consumer != nullptr) _consumers.emplace(name, std::move(consumer)); - } + return memorySlotsToExchange; } -#define SIZES_BUFFER_KEY 0 -#define CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY 3 -#define CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY 4 -#define CONSUMER_PAYLOAD_KEY 5 -#define _CHANNEL_CREATION_ERROR 1 - __INLINE__ std::unique_ptr createConsumer(HiCR::CommunicationManager &communicationManager, HiCR::GlobalMemorySlot::tag_t channelTag, size_t bufferSize, @@ -212,91 +166,175 @@ __INLINE__ std::unique_ptr createProducer(H bufferCapacity); } -__INLINE__ std::vector exchangeGlobalMemorySlots(HiCR::MemoryManager &memoryManager, - std::shared_ptr &memorySpace, - HiCR::GlobalMemorySlot::tag_t channelTag, - size_t bufferSize, - size_t bufferCapacity, - std::shared_ptr &localCoordinationBufferForSizes, - std::shared_ptr &localCoordinationBufferForPayloads) +__INLINE__ void createChannelBuffers(HiCR::MemoryManager &memoryManager, + std::shared_ptr &memorySpace, + std::map &consumers, + std::map> &memorySlotsToExchange, + std::map> &coordinationBufferSizesSlots, + std::map> &coordinationBufferPayloadsSlots) { - // Collection of memory slots to exchange and their keys - std::vector memorySlotsToExchange; - // Getting required buffer sizes - auto sizesBufferSize = HiCR::channel::variableSize::Base::getTokenBufferSize(sizeof(size_t), bufferCapacity); + // Exchange unbuffered memory slots + for (const auto &[tag, dependency] : consumers) + { + // Getting buffer capacity (max token count) + const auto &bufferCapacity = hicr::json::getNumber(dependency, "Buffer Capacity (Tokens)"); - // Allocating sizes buffer as a local memory slot - auto sizesBufferSlot = memoryManager.allocateLocalMemorySlot(memorySpace, sizesBufferSize); + // Getting buffer size (bytes) + const auto &bufferSize = hicr::json::getNumber(dependency, "Buffer Size (Bytes)"); - // Allocating payload buffer as a local memory slot - auto payloadBufferSlot = memoryManager.allocateLocalMemorySlot(memorySpace, bufferSize); + // Create Global Memory Slots to exchange + memorySlotsToExchange[tag] = allocateChannelBuffers(memoryManager, memorySpace, tag, bufferSize, bufferCapacity, coordinationBufferSizesSlots, coordinationBufferPayloadsSlots); + } +} - // Getting required buffer size - auto coordinationBufferSize = HiCR::channel::variableSize::Base::getCoordinationBufferSize(); +void createChannels(HiCR::MemoryManager &memoryManager, + HiCR::CommunicationManager &communicationManager, + std::shared_ptr &memorySpace, + std::map &consumersData, + std::map &producersData, + HiCR::GlobalMemorySlot::tag_t channelsIds, + std::map &channelTagNameMap, + std::unordered_map> &consumers, + std::unordered_map> &producers) +{ + std::map> memorySlotsToExchange; + std::map> coordinationBufferSizesSlots; + std::map> coordinationBufferPayloadsSlots; - // Allocating coordination buffer for internal message size metadata - localCoordinationBufferForSizes = memoryManager.allocateLocalMemorySlot(memorySpace, coordinationBufferSize); + // Exchange unbuffered memory slots + for (const auto &[tag, dependency] : consumersData) + { + // Getting buffer capacity (max token count) + const auto &bufferCapacity = hicr::json::getNumber(dependency, "Buffer Capacity (Tokens)"); - // Allocating coordination buffer for internal payload metadata - localCoordinationBufferForPayloads = memoryManager.allocateLocalMemorySlot(memorySpace, coordinationBufferSize); + // Getting buffer size (bytes) + const auto &bufferSize = hicr::json::getNumber(dependency, "Buffer Size (Bytes)"); - // Initializing coordination buffer (sets to zero the counters) - HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localCoordinationBufferForSizes); - HiCR::channel::variableSize::Base::initializeCoordinationBuffer(localCoordinationBufferForPayloads); + // Create Global Memory Slots to exchange + memorySlotsToExchange[tag] = allocateChannelBuffers(memoryManager, memorySpace, tag, bufferSize, bufferCapacity, coordinationBufferSizesSlots, coordinationBufferPayloadsSlots); + } - // Adding memory slots to exchange - memorySlotsToExchange.push_back({SIZES_BUFFER_KEY, sizesBufferSlot}); - memorySlotsToExchange.push_back({CONSUMER_COORDINATION_BUFFER_FOR_SIZES_KEY, localCoordinationBufferForSizes}); - memorySlotsToExchange.push_back({CONSUMER_COORDINATION_BUFFER_FOR_PAYLOADS_KEY, localCoordinationBufferForPayloads}); - memorySlotsToExchange.push_back({CONSUMER_PAYLOAD_KEY, payloadBufferSlot}); + // Exchange the Global Memory Slots + for (HiCR::GlobalMemorySlot::tag_t tag = 0; tag < channelsIds; ++tag) + { + if (memorySlotsToExchange.contains(tag)) + communicationManager.exchangeGlobalMemorySlots(tag, memorySlotsToExchange[tag]); + else + communicationManager.exchangeGlobalMemorySlots(tag, {}); + } - return memorySlotsToExchange; -} + // Fence + for (HiCR::GlobalMemorySlot::tag_t tag = 0; tag < channelsIds; ++tag) communicationManager.fence(tag); + + for (const auto &[tag, dependency] : consumersData) + { + // Getting buffer capacity (max token count) + const auto &bufferCapacity = hicr::json::getNumber(dependency, "Buffer Capacity (Tokens)"); + + // Getting buffer size (bytes) + const auto &bufferSize = hicr::json::getNumber(dependency, "Buffer Size (Bytes)"); + consumers.emplace(channelTagNameMap[tag], + createConsumer(communicationManager, tag, bufferSize, bufferCapacity, coordinationBufferSizesSlots[tag], coordinationBufferPayloadsSlots[tag])); + } + for (const auto &[tag, dependency] : producersData) + { + // Getting buffer capacity (max token count) + const auto &bufferCapacity = hicr::json::getNumber(dependency, "Buffer Capacity (Tokens)"); -__INLINE__ std::pair, std::unique_ptr> Engine::createChannel( - const size_t channelTag, - const std::string channelName, - const bool isProducer, - const bool isConsumer, - HiCR::CommunicationManager &communicationManager, - HiCR::MemoryManager &memoryManager, - std::shared_ptr &memorySpace, - const size_t bufferCapacity, - const size_t bufferSize) + // Getting buffer size (bytes) + const auto &bufferSize = hicr::json::getNumber(dependency, "Buffer Size (Bytes)"); + producers.emplace(channelTagNameMap[tag], createProducer(communicationManager, memoryManager, memorySpace, tag, bufferSize, bufferCapacity)); + } +} + +void Engine::createDependencies() { - // Interfaces for the channel - std::unique_ptr consumerInterface = nullptr; - std::unique_ptr producerInterface = nullptr; + // Tags for channels + HiCR::GlobalMemorySlot::tag_t bufferedChannelId = 0; + HiCR::GlobalMemorySlot::tag_t unbufferedChannelId = 0; - // Collection of memory slots to exchange and their keys - std::vector memorySlotsToExchange; + // Get the application dependencies + const auto &dependencies = _config["Dependencies"]; - // Temporary storage for coordination buffer pointers - std::shared_ptr localCoordinationBufferForSizes = nullptr; - std::shared_ptr localCoordinationBufferForPayloads = nullptr; + // Extract dependency informations + std::map bufferedConsumers; + std::map bufferedProducers; + std::map unbufferedConsumers; + std::map unbufferedProducers; + std::map bufferedChannelTagNameMap; + std::map unbufferedChannelTagNameMap; - ////// Pre-Exchange: Create local slots and register them for exchange - if (isConsumer == true) - { - memorySlotsToExchange = - exchangeGlobalMemorySlots(memoryManager, memorySpace, channelTag, bufferSize, bufferCapacity, localCoordinationBufferForSizes, localCoordinationBufferForPayloads); - } - ////// Exchange: local memory slots to become global for them to be used by the remote end - communicationManager.exchangeGlobalMemorySlots(channelTag, memorySlotsToExchange); - // Synchronizing so that all actors have finished registering their global memory slots - communicationManager.fence(channelTag); - - ///// Post exchange: create consumer/producer intefaces - // If I am a consumer, create consumer interface now - if (isConsumer == true) + for (const auto &[name, dependency] : dependencies.items()) { - consumerInterface = createConsumer(communicationManager, channelTag, bufferSize, bufferCapacity, localCoordinationBufferForSizes, localCoordinationBufferForPayloads); - } + // Getting channel's producer + const auto &producerPartitionId = hicr::json::getNumber(dependency, "Producer"); - // If I am producer, create the producer interface for the channel - if (isProducer == true) { producerInterface = createProducer(communicationManager, memoryManager, memorySpace, channelTag, bufferSize, bufferCapacity); } + // Getting channel's consumer + const auto &consumerPartitionId = hicr::json::getNumber(dependency, "Consumer"); + + // Store the producer + if (_partitionId == producerPartitionId) + { + if (hicr::json::getString(dependency, "Type") == "Buffered") + { + // Store the producer + bufferedProducers[bufferedChannelId] = dependency; + + // Store tag name mapping + bufferedChannelTagNameMap[bufferedChannelId] = name; + } + else + { + // Store the producer + unbufferedProducers[unbufferedChannelId] = dependency; + } + } + + if (_partitionId == consumerPartitionId) + { + if (hicr::json::getString(dependency, "Type") == "Buffered") + { + // Store the consumer + bufferedConsumers[bufferedChannelId] = dependency; + + // Store the tag name mapping + bufferedChannelTagNameMap[bufferedChannelId] = name; + } + else + { + // Store the consumer + unbufferedConsumers[unbufferedChannelId] = dependency; + + // Store tag name mapping + unbufferedChannelTagNameMap[unbufferedChannelId] = name; + + // Increase unbuffered channel id locally. The unbuffered are instance local + unbufferedChannelId++; + } + } + + // Increase buffered channel id for everyone. All the instances should participate + bufferedChannelId++; + } - return {std::move(producerInterface), std::move(consumerInterface)}; + createChannels(*_unbufferedMemoryManager, + *_unbufferedCommunicationManager, + _unbufferedMemorySpace, + unbufferedConsumers, + unbufferedProducers, + unbufferedChannelId, + unbufferedChannelTagNameMap, + _consumers, + _producers); + createChannels(*_bufferedMemoryManager, + *_bufferedCommunicationManager, + _bufferedMemorySpace, + bufferedConsumers, + bufferedProducers, + bufferedChannelId, + bufferedChannelTagNameMap, + _consumers, + _producers); } } // namespace hLLM \ No newline at end of file diff --git a/include/hllm/engine.hpp b/include/hllm/engine.hpp index 01376a8..a44d0f2 100644 --- a/include/hllm/engine.hpp +++ b/include/hllm/engine.hpp @@ -168,36 +168,7 @@ class Engine final /** * Function to create all the channels based on the previously passed configuration * */ - void createChannels(HiCR::CommunicationManager &bufferedCommunicationManager, - HiCR::MemoryManager &bufferedMemoryManager, - std::shared_ptr &bufferedMemorySpace, - HiCR::CommunicationManager &unbufferedCommunicationManager, - HiCR::MemoryManager &unbufferedMemoryManager, - std::shared_ptr &unbufferedMemorySpace); - - /** - * Function to create a variable-sized token locking channel between N producers and 1 consumer - * - * @note This is a collective operation. All instances must participate in this call, even if they don't play a producer or consumer role - * - * @param[in] channelTag The unique identifier for the channel. This tag should be unique for each channel - * @param[in] channelName The name of the channel. This will be the identifier used to retrieve the channel - * @param[in] isProducer whether a producer should be created - * @param[in] isConsumer whether a consumer should be created - * @param[in] bufferCapacity The number of tokens that can be simultaneously held in the channel's buffer - * @param[in] bufferSize The size (bytes) of the buffer. - * @return A shared pointer of the newly created channel - */ - __INLINE__ std::pair, std::unique_ptr> createChannel( - const size_t channelTag, - const std::string channelName, - const bool isProducer, - const bool isConsumer, - HiCR::CommunicationManager &communicationManager, - HiCR::MemoryManager &memoryManager, - std::shared_ptr &memorySpace, - const size_t bufferCapacity, - const size_t bufferSize); + __INLINE__ void createDependencies(); __INLINE__ void doLocalTermination() { @@ -350,20 +321,11 @@ class Engine final // If I am the deployer, search for its own partition id for (const auto &[partitionId, runnerId] : _partitionRunnerIdMap) { - if (runnerId == _deployr.getRunnerId()) - { - _partitionId = partitionId; - } + if (runnerId == _deployr.getRunnerId()) { _partitionId = partitionId; } } // Wait for incoming rpc from all the other_ =nner_ - for (size_t _ = 0; _ < _deployment.getRunners().size() - 1; _++) - { - _rpcEngine->listen(); - printf("listened to request %zu\n", _); - } - - printf("finished listening\n"); + for (size_t _ = 0; _ < _deployment.getRunners().size() - 1; _++) { _rpcEngine->listen(); } } else { @@ -377,6 +339,8 @@ class Engine final // Broadcast the partition id to all the instances broadcastPartitionId(); + printf("[hLLM] Partition %lu starts\n", _partitionId); + // Finding partition information on the configuration const auto &partitionsJs = hicr::json::getArray(_config, "Partitions"); nlohmann::json partitionConfig; @@ -389,9 +353,10 @@ class Engine final // Set partition name _partitionName = hicr::json::getString(partitionConfig, "Name"); + printf("[hLLM] Partition %lu starts creating its dependencies...\n", _partitionId); // Create channels - createChannels( - *_bufferedCommunicationManager, *_bufferedMemoryManager, _bufferedMemorySpace, *_unbufferedCommunicationManager, *_unbufferedMemoryManager, _unbufferedMemorySpace); + createDependencies(); + printf("[hLLM] Partition %lu created the dependencies\n", _partitionId); // Starting a new deployment _continueRunning = true;