Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Convert `WORKER_CLOSE` into a notification ([PR #1729](https://github.com/versatica/mediasoup/pull/1729).
- Node tests: Replace `sctp` unmaintained library with `werift-sctp` ([PR #1732](https://github.com/versatica/mediasoup/pull/1732), thanks to @shinyoshiaki for his help with `werift-sctp`.
- Worker: Require C++20 ([PR #1741](https://github.com/versatica/mediasoup/pull/1741).
- Fix "SCTP failed" if no DataChannel is created on a Transport with `enableSctp: true` ([PR #1749](https://github.com/versatica/mediasoup/pull/1749).

### 3.19.17

Expand Down
59 changes: 41 additions & 18 deletions node/src/test/test-werift-sctp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,19 @@ beforeEach(async () => {
numSctpStreams: { OS: 256, MIS: 256 },
});

// Create an explicit SCTP outgoing stream id.
ctx.sctpSendStreamId = 123;

ctx.sctpClient = SCTP.client(
createSctpUdpTransport(createSocket('udp4'), {
port: ctx.plainTransport.tuple.localPort,
address: ctx.plainTransport.tuple.localAddress,
})
);

await ctx.sctpClient.start(5000);

let connectionTimeoutTimer: NodeJS.Timeout | undefined;

await Promise.race([
ctx.sctpClient.stateChanged.connected.asPromise(),
new Promise<void>((resolve, reject) => {
connectionTimeoutTimer = setTimeout(
() => reject(new Error('SCTP connection timeout')),
3000
);
}),
]);

clearTimeout(connectionTimeoutTimer);

// Create an explicit SCTP outgoing stream id.
ctx.sctpSendStreamId = 123;
// NOTE: We don't await it on purpose since we don't want to block here until
// SCTP connects.
void ctx.sctpClient.start(5000);

// Create a DataProducer with the corresponding SCTP stream id.
ctx.dataProducer = await ctx.plainTransport.produceData({
Expand All @@ -76,6 +64,41 @@ beforeEach(async () => {
ctx.dataConsumer = await ctx.plainTransport.consumeData({
dataProducerId: ctx.dataProducer.id,
});

let connectionTimeoutTimer: NodeJS.Timeout | undefined;

await Promise.race([
// Wait for SCTP to become connected in both the PlainTransport and in the
// werift-sctp client.
Promise.all([
ctx.sctpClient.stateChanged.connected.asPromise(),
new Promise<void>((resolve, reject) => {
if (ctx.plainTransport?.sctpState === 'connected') {
resolve();
} else {
ctx.plainTransport?.on('sctpstatechange', state => {
if (state === 'connected') {
resolve();
} else if (state === 'failed' || state === 'closed') {
reject(
new Error(
'SCTP connection in PlainTransport failed or was closed'
)
);
}
});
}
}),
]),
new Promise<void>((resolve, reject) => {
connectionTimeoutTimer = setTimeout(
() => reject(new Error('SCTP connection timeout')),
3000
);
}),
]);

clearTimeout(connectionTimeoutTimer);
});

afterEach(async () => {
Expand Down
11 changes: 10 additions & 1 deletion worker/include/RTC/SctpAssociation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ namespace RTC
flatbuffers::Offset<FBS::SctpParameters::SctpParameters> FillBuffer(
flatbuffers::FlatBufferBuilder& builder) const;
void TransportConnected();
void TransportDisconnected();
SctpState GetState() const
{
return this->state;
Expand All @@ -89,18 +90,20 @@ namespace RTC
{
return this->sctpBufferedAmount;
}
void ProcessSctpData(const uint8_t* data, size_t len) const;
void ProcessSctpData(const uint8_t* data, size_t len);
void SendSctpMessage(
RTC::DataConsumer* dataConsumer,
const uint8_t* msg,
size_t len,
uint32_t ppid,
onQueuedCallback* cb = nullptr);
void HandleDataProducer(RTC::DataProducer* dataProducer);
void HandleDataConsumer(RTC::DataConsumer* dataConsumer);
void DataProducerClosed(RTC::DataProducer* dataProducer);
void DataConsumerClosed(RTC::DataConsumer* dataConsumer);

private:
void MayConnect();
void ResetSctpStream(uint16_t streamId, StreamDirection direction) const;
void AddOutgoingStreams(bool force = false);

Expand Down Expand Up @@ -131,6 +134,12 @@ namespace RTC
struct socket* socket{ nullptr };
uint16_t desiredOs{ 0u };
size_t messageBufferLen{ 0u };
bool transportConnected{ false };
// Whether at least one SCTP stream (AKA DataProducer or DataConsumer) has
// already been created, no matter it's still alive.
bool firstStreamCreated{ false };
// Whether we have received STCP data from the remote peer.
bool sctpDataReceived{ false };
uint16_t lastSsnReceived{ 0u }; // Valid for us since no SCTP I-DATA support.
};
} // namespace RTC
Expand Down
2 changes: 1 addition & 1 deletion worker/src/RTC/PipeTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ namespace RTC

inline bool PipeTransport::IsConnected() const
{
return this->tuple;
return this->tuple ? true : false;
}

inline bool PipeTransport::HasSrtp() const
Expand Down
44 changes: 43 additions & 1 deletion worker/src/RTC/PlainTransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
#include "MediaSoupErrors.hpp"
#include "Settings.hpp"
#include "Utils.hpp"
// TODO: For testing purposes. Must be removed.
#ifdef MS_SCTP_STACK
#include "RTC/SCTP/packet/Packet.hpp"
#endif

namespace RTC
{
Expand Down Expand Up @@ -788,7 +792,7 @@ namespace RTC

inline bool PlainTransport::IsConnected() const
{
return this->tuple;
return this->tuple ? true : false;
}

inline bool PlainTransport::HasSrtp() const
Expand Down Expand Up @@ -913,9 +917,29 @@ namespace RTC

if (!IsConnected())
{
MS_WARN_TAG(sctp, "not connected, cannot send SCTP data");

return;
}

// TODO: For testing purposes. Must be removed.
#ifdef MS_SCTP_STACK
MS_DUMP(">>> sending SCTP packet...");

const auto* packet = RTC::SCTP::Packet::Parse(data, len);

if (packet)
{
packet->Dump();

delete packet;
}
else
{
MS_ABORT("RTC::SCTP::Packet::Parse() failed to parse sent SCTP data");
}
#endif

this->tuple->Send(data, len);

// Increase send transmission.
Expand Down Expand Up @@ -1209,6 +1233,24 @@ namespace RTC
return;
}

// TODO: For testing purposes. Must be removed.
#ifdef MS_SCTP_STACK
MS_DUMP("<<< received SCTP packet...");

const auto* packet = RTC::SCTP::Packet::Parse(data, len);

if (packet)
{
packet->Dump();

delete packet;
}
else
{
MS_ABORT("RTC::SCTP::Packet::Parse() failed to parse received SCTP data");
}
#endif

// Pass it to the parent transport.
RTC::Transport::ReceiveSctpData(data, len);
}
Expand Down
Loading
Loading