Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 111 additions & 22 deletions src/Api/PubnubApi/EndPoint/PubSub/PublishOperation.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
Expand All @@ -15,8 +15,12 @@ public class PublishOperation : PubnubCoreBase
private readonly PNConfiguration config;
private readonly IJsonPluggableLibrary jsonLibrary;
private readonly IPubnubUnitTest unit;


private const int MaxPublishRequestSizeBytes = 32768;
private const int MaxMessageContentSizeBytes = 2097152;
private const int PostBodyFramingOverheadBytes = 2;
private object publishContent;
private string preparedMessageContent;
private string channelName = "";
private bool storeInHistory = true;
private bool httpPost;
Expand All @@ -39,6 +43,7 @@ public PublishOperation(PNConfiguration pubnubConfig, IJsonPluggableLibrary json
public PublishOperation Message(object message)
{
publishContent = message;
preparedMessageContent = PrepareContent(message);
return this;
}

Expand Down Expand Up @@ -112,13 +117,16 @@ public void Execute(PNCallback<PNPublishResult> callback)
throw new ArgumentException("Missing userCallback");
}

ValidateMessageContentSize();

savedCallback = callback;
logger?.Trace($"{GetType().Name} Execute invoked");
Publish(channelName, publishContent, storeInHistory, ttl, userMetadata, queryParam, callback);
}

public async Task<PNResult<PNPublishResult>> ExecuteAsync()
{
ValidateMessageContentSize();
syncRequest = false;
logger?.Trace($"{GetType().Name} ExecuteAsync invoked.");
return await Publish(channelName, publishContent, storeInHistory, ttl, userMetadata, queryParam)
Expand All @@ -137,6 +145,8 @@ public PNPublishResult Sync()
throw new MissingMemberException("publish key is required");
}

ValidateMessageContentSize();

logger?.Trace($"{GetType().Name} parameter validated.");
ManualResetEvent syncEvent = new ManualResetEvent(false);
Task<PNPublishResult> task = Task<PNPublishResult>.Factory.StartNew(() =>
Expand Down Expand Up @@ -396,6 +406,15 @@ internal void CurrentPubnubInstance(Pubnub instance)
}
}

private void ValidateMessageContentSize()
{
if (preparedMessageContent != null
&& Encoding.UTF8.GetByteCount(preparedMessageContent) > MaxMessageContentSizeBytes)
{
throw new ArgumentException("Message content size exceeds the maximum permissible size of 2 MiB.");
}
}

private string PrepareContent(object originalMessage)
{
string message = jsonLibrary.SerializeToJsonString(originalMessage);
Expand All @@ -412,20 +431,7 @@ private string PrepareContent(object originalMessage)

private RequestParameter CreateRequestParameter()
{
List<string> urlSegments =
[
"publish",
config.PublishKey ?? "",
config.SubscribeKey ?? "",
"0",
channelName,
"0"
];
if (!httpPost)
{
urlSegments.Add(PrepareContent(publishContent));
}

var messageContent = preparedMessageContent;
Dictionary<string, string> requestQueryStringParams = new Dictionary<string, string>();

if (userMetadata != null)
Expand Down Expand Up @@ -463,20 +469,103 @@ private RequestParameter CreateRequestParameter()
}
}

var requestParam = new RequestParameter()
// Determine whether to use v2/publish endpoint and HTTP method.
var endpointInfo = ResolvePublishEndpoint(messageContent, requestQueryStringParams);
bool useV2Endpoint = endpointInfo.UseV2Endpoint;
bool usePost = endpointInfo.UsePost;

// Build URL path segments
var pathSegments = new List<string>();
if (useV2Endpoint)
{
pathSegments.Add("v2");
}
pathSegments.AddRange(["publish", config.PublishKey ?? "", config.SubscribeKey ?? "", "0", channelName, "0"]);

if (!usePost)
{
pathSegments.Add(messageContent);
}

var requestParam = new RequestParameter
{
RequestType = httpPost ? Constants.POST : Constants.GET,
PathSegment = urlSegments,
RequestType = usePost ? Constants.POST : Constants.GET,
PathSegment = pathSegments,
Query = requestQueryStringParams
};
if (httpPost)

if (useV2Endpoint)
{
requestParam.Headers.Add("Expect", "100-continue");
}

if (usePost)
{
string postMessage = PrepareContent(publishContent);
requestParam.BodyContentString = postMessage;
requestParam.BodyContentString = messageContent;
}

return requestParam;
}

private PublishEndpointInfo ResolvePublishEndpoint(
string messageContent,
Dictionary<string, string> queryParams)
{
int messageSizeBytes = Encoding.UTF8.GetByteCount(messageContent);

if (httpPost)
{
bool exceedsPostLimit = messageSizeBytes >= MaxPublishRequestSizeBytes - PostBodyFramingOverheadBytes;
return new PublishEndpointInfo { UseV2Endpoint = exceedsPostLimit, UsePost = true };
}
int urlSizeWithoutMessage = EstimateUrlSizeWithoutMessage(queryParams);
int availableForMessage = MaxPublishRequestSizeBytes - urlSizeWithoutMessage;

if (messageSizeBytes >= availableForMessage)
{
// Message too large for URL; switch to v2/publish with POST body
return new PublishEndpointInfo { UseV2Endpoint = true, UsePost = true };
}

return new PublishEndpointInfo { UseV2Endpoint = false, UsePost = false };
}
private struct PublishEndpointInfo
{
public bool UseV2Endpoint;
public bool UsePost;
}

/// Estimates the total URL byte size excluding the message content.
/// Accounts for base URL components, path segments, user-specified query parameters,
/// and parameters injected later by the transport middleware
/// (uuid, pnsdk, requestid, instanceid, timestamp, auth, signature).
private int EstimateUrlSizeWithoutMessage(Dictionary<string, string> queryParams)
{
// 155 bytes covers the fixed overhead from base URL components and
// middleware-injected query params: scheme, origin, path separators,
// uuid, pnsdk, requestid, instanceid, and timestamp.
int estimatedSize = 155;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a chance this could change in the future and should be dynamically calculated instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to design of a common API execution flow, In the Endpoint class only that endpoint related path and query segments added. The common query param values are being added in later stage of API execution.
If we try to get all those params here then It would be good amount of code duplication.
Yes, In future, the design can be better to have those values dynamically calculated.


// Auth key adds "&auth={value}" — 5 bytes for the key portion plus the value length
if (config.AuthKey?.Length > 0)
{
estimatedSize += 5 + config.AuthKey.Length;
}

// Secret key triggers HMAC signature: "&signature=v2.{base64}" ≈ 80 bytes
if (!string.IsNullOrEmpty(config.SecretKey))
{
estimatedSize += 80;
}

// Add encoded size of the user-specified query parameters
estimatedSize += UriUtil.EncodeUriComponent(
UriUtil.BuildQueryString(queryParams),
PNOperationType.PNPublishOperation, false, false, false
).Length;

return estimatedSize;
}

private void CleanUp()
{
Expand Down
9 changes: 8 additions & 1 deletion src/Api/PubnubApi/Transport/HttpClientService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,17 @@ public async Task<TransportResponse> PostRequest(TransportRequest transportReque
postData.Headers.Add(transportRequestHeader.Key, transportRequestHeader.Value);
}
}

HttpRequestMessage requestMessage =
new HttpRequestMessage(method: HttpMethod.Post, requestUri: transportRequest.RequestUrl)
{ Content = postData };
// Set Http Request header, When the header is not a payload content header.
if (transportRequest.Headers.Keys.Count > 0 && transportRequest.BodyContentBytes == null)
{
foreach (var kvp in transportRequest.Headers)
{
requestMessage.Headers.Add(kvp.Key, kvp.Value);
}
}
logger?.Debug(
$"HttpClient Service:Sending http request {transportRequest.RequestType} to {transportRequest.RequestUrl}" +
(requestMessage.Headers.Any()
Expand Down
14 changes: 12 additions & 2 deletions src/Api/PubnubApi/Transport/Middleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,16 @@ public TransportRequest PreapareTransportRequest(RequestParameter requestParamet
{
string signature = string.Empty;
StringBuilder stringToSign = new StringBuilder();
stringToSign.AppendFormat(CultureInfo.InvariantCulture, "{0}\n", operationType == PNOperationType.PNPublishOperation ? "GET" : requestParameter.RequestType);
stringToSign.AppendFormat(CultureInfo.InvariantCulture, "{0}\n", operationType == PNOperationType.PNPublishOperation
&& !requestParameter.PathSegment.Contains("v2")? "GET": requestParameter.RequestType);
stringToSign.AppendFormat(CultureInfo.InvariantCulture, "{0}\n", configuration.PublishKey);
stringToSign.AppendFormat(CultureInfo.InvariantCulture, "{0}\n", pathString);
stringToSign.AppendFormat(CultureInfo.InvariantCulture, "{0}\n", queryString);
if (!string.IsNullOrEmpty(requestParameter.BodyContentString) && operationType != PNOperationType.PNPublishOperation) stringToSign.Append(requestParameter.BodyContentString);
if (!string.IsNullOrEmpty(requestParameter.BodyContentString) &&
!isPublishGET(requestParameter.PathSegment))
{
stringToSign.Append(requestParameter.BodyContentString);
}
signature = Util.PubnubAccessManagerSign(configuration.SecretKey, stringToSign.ToString());
signature = string.Format(CultureInfo.InvariantCulture, "v2.{0}", signature.TrimEnd(new[] { '=' }));
requestParameter.Query.Add("signature", signature);
Expand Down Expand Up @@ -153,5 +158,10 @@ private long TranslateUtcDateTimeToSeconds(DateTime dotNetUTCDateTime)
long timeStamp = Convert.ToInt64(timeSpan.TotalSeconds);
return timeStamp;
}

private bool isPublishGET(List<string> pathSegments)
{
return pathSegments.Contains("publish") && !pathSegments.Contains("v2");
}
}
}
Loading
Loading