diff --git a/CHANGELOG.md b/CHANGELOG.md index 28d4da024f..83f6ee4e39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Convert `WORKER_CLOSE` into a notification ([PR #1729](https://github.com/versatica/mediasoup/pull/1729). - Node tests: Replace `sctp` unmaintained library with `werift-sctp` ([PR #1732](https://github.com/versatica/mediasoup/pull/1732), thanks to @shinyoshiaki for his help with `werift-sctp`. - Worker: Require C++20 ([PR #1741](https://github.com/versatica/mediasoup/pull/1741). +- Fix "SCTP failed" if no DataChannel is created on a Transport with `enableSctp: true` ([PR #1749](https://github.com/versatica/mediasoup/pull/1749). ### 3.19.17 diff --git a/node/src/test/test-werift-sctp.ts b/node/src/test/test-werift-sctp.ts index df0183ca91..62725673cf 100644 --- a/node/src/test/test-werift-sctp.ts +++ b/node/src/test/test-werift-sctp.ts @@ -35,6 +35,9 @@ beforeEach(async () => { numSctpStreams: { OS: 256, MIS: 256 }, }); + // Create an explicit SCTP outgoing stream id. + ctx.sctpSendStreamId = 123; + ctx.sctpClient = SCTP.client( createSctpUdpTransport(createSocket('udp4'), { port: ctx.plainTransport.tuple.localPort, @@ -42,24 +45,9 @@ beforeEach(async () => { }) ); - await ctx.sctpClient.start(5000); - - let connectionTimeoutTimer: NodeJS.Timeout | undefined; - - await Promise.race([ - ctx.sctpClient.stateChanged.connected.asPromise(), - new Promise((resolve, reject) => { - connectionTimeoutTimer = setTimeout( - () => reject(new Error('SCTP connection timeout')), - 3000 - ); - }), - ]); - - clearTimeout(connectionTimeoutTimer); - - // Create an explicit SCTP outgoing stream id. - ctx.sctpSendStreamId = 123; + // NOTE: We don't await it on purpose since we don't want to block here until + // SCTP connects. + void ctx.sctpClient.start(5000); // Create a DataProducer with the corresponding SCTP stream id. ctx.dataProducer = await ctx.plainTransport.produceData({ @@ -76,6 +64,41 @@ beforeEach(async () => { ctx.dataConsumer = await ctx.plainTransport.consumeData({ dataProducerId: ctx.dataProducer.id, }); + + let connectionTimeoutTimer: NodeJS.Timeout | undefined; + + await Promise.race([ + // Wait for SCTP to become connected in both the PlainTransport and in the + // werift-sctp client. + Promise.all([ + ctx.sctpClient.stateChanged.connected.asPromise(), + new Promise((resolve, reject) => { + if (ctx.plainTransport?.sctpState === 'connected') { + resolve(); + } else { + ctx.plainTransport?.on('sctpstatechange', state => { + if (state === 'connected') { + resolve(); + } else if (state === 'failed' || state === 'closed') { + reject( + new Error( + 'SCTP connection in PlainTransport failed or was closed' + ) + ); + } + }); + } + }), + ]), + new Promise((resolve, reject) => { + connectionTimeoutTimer = setTimeout( + () => reject(new Error('SCTP connection timeout')), + 3000 + ); + }), + ]); + + clearTimeout(connectionTimeoutTimer); }); afterEach(async () => { diff --git a/worker/include/RTC/SctpAssociation.hpp b/worker/include/RTC/SctpAssociation.hpp index 68c9844133..7892e64c54 100644 --- a/worker/include/RTC/SctpAssociation.hpp +++ b/worker/include/RTC/SctpAssociation.hpp @@ -81,6 +81,7 @@ namespace RTC flatbuffers::Offset FillBuffer( flatbuffers::FlatBufferBuilder& builder) const; void TransportConnected(); + void TransportDisconnected(); SctpState GetState() const { return this->state; @@ -89,18 +90,20 @@ namespace RTC { return this->sctpBufferedAmount; } - void ProcessSctpData(const uint8_t* data, size_t len) const; + void ProcessSctpData(const uint8_t* data, size_t len); void SendSctpMessage( RTC::DataConsumer* dataConsumer, const uint8_t* msg, size_t len, uint32_t ppid, onQueuedCallback* cb = nullptr); + void HandleDataProducer(RTC::DataProducer* dataProducer); void HandleDataConsumer(RTC::DataConsumer* dataConsumer); void DataProducerClosed(RTC::DataProducer* dataProducer); void DataConsumerClosed(RTC::DataConsumer* dataConsumer); private: + void MayConnect(); void ResetSctpStream(uint16_t streamId, StreamDirection direction) const; void AddOutgoingStreams(bool force = false); @@ -131,6 +134,12 @@ namespace RTC struct socket* socket{ nullptr }; uint16_t desiredOs{ 0u }; size_t messageBufferLen{ 0u }; + bool transportConnected{ false }; + // Whether at least one SCTP stream (AKA DataProducer or DataConsumer) has + // already been created, no matter it's still alive. + bool firstStreamCreated{ false }; + // Whether we have received STCP data from the remote peer. + bool sctpDataReceived{ false }; uint16_t lastSsnReceived{ 0u }; // Valid for us since no SCTP I-DATA support. }; } // namespace RTC diff --git a/worker/src/RTC/PipeTransport.cpp b/worker/src/RTC/PipeTransport.cpp index bae392fa52..689b816237 100644 --- a/worker/src/RTC/PipeTransport.cpp +++ b/worker/src/RTC/PipeTransport.cpp @@ -486,7 +486,7 @@ namespace RTC inline bool PipeTransport::IsConnected() const { - return this->tuple; + return this->tuple ? true : false; } inline bool PipeTransport::HasSrtp() const diff --git a/worker/src/RTC/PlainTransport.cpp b/worker/src/RTC/PlainTransport.cpp index cd59a082d5..943ccc4389 100644 --- a/worker/src/RTC/PlainTransport.cpp +++ b/worker/src/RTC/PlainTransport.cpp @@ -6,6 +6,10 @@ #include "MediaSoupErrors.hpp" #include "Settings.hpp" #include "Utils.hpp" +// TODO: For testing purposes. Must be removed. +#ifdef MS_SCTP_STACK +#include "RTC/SCTP/packet/Packet.hpp" +#endif namespace RTC { @@ -788,7 +792,7 @@ namespace RTC inline bool PlainTransport::IsConnected() const { - return this->tuple; + return this->tuple ? true : false; } inline bool PlainTransport::HasSrtp() const @@ -913,9 +917,29 @@ namespace RTC if (!IsConnected()) { + MS_WARN_TAG(sctp, "not connected, cannot send SCTP data"); + return; } +// TODO: For testing purposes. Must be removed. +#ifdef MS_SCTP_STACK + MS_DUMP(">>> sending SCTP packet..."); + + const auto* packet = RTC::SCTP::Packet::Parse(data, len); + + if (packet) + { + packet->Dump(); + + delete packet; + } + else + { + MS_ABORT("RTC::SCTP::Packet::Parse() failed to parse sent SCTP data"); + } +#endif + this->tuple->Send(data, len); // Increase send transmission. @@ -1209,6 +1233,24 @@ namespace RTC return; } +// TODO: For testing purposes. Must be removed. +#ifdef MS_SCTP_STACK + MS_DUMP("<<< received SCTP packet..."); + + const auto* packet = RTC::SCTP::Packet::Parse(data, len); + + if (packet) + { + packet->Dump(); + + delete packet; + } + else + { + MS_ABORT("RTC::SCTP::Packet::Parse() failed to parse received SCTP data"); + } +#endif + // Pass it to the parent transport. RTC::Transport::ReceiveSctpData(data, len); } diff --git a/worker/src/RTC/SctpAssociation.cpp b/worker/src/RTC/SctpAssociation.cpp index dfb55bdf83..0c9a0e4b13 100644 --- a/worker/src/RTC/SctpAssociation.cpp +++ b/worker/src/RTC/SctpAssociation.cpp @@ -290,60 +290,16 @@ namespace RTC { MS_TRACE(); - // Just run the SCTP stack if our state is 'new'. - if (this->state != SctpState::NEW) - { - return; - } - - try - { - int ret; - struct sockaddr_conn rconn{}; // NOLINT(cppcoreguidelines-pro-type-member-init) - - std::memset(&rconn, 0, sizeof(rconn)); - rconn.sconn_family = AF_CONN; - rconn.sconn_port = htons(5000); - rconn.sconn_addr = reinterpret_cast(this->id); -#ifdef HAVE_SCONN_LEN - rconn.sconn_len = sizeof(rconn); -#endif - - ret = usrsctp_connect(this->socket, reinterpret_cast(&rconn), sizeof(rconn)); - - if (ret < 0 && errno != EINPROGRESS) - { - MS_THROW_ERROR("usrsctp_connect() failed: %s", std::strerror(errno)); - } - - // Disable MTU discovery. - sctp_paddrparams peerAddrParams{}; // NOLINT(cppcoreguidelines-pro-type-member-init) - - std::memset(&peerAddrParams, 0, sizeof(peerAddrParams)); - std::memcpy(&peerAddrParams.spp_address, &rconn, sizeof(rconn)); - peerAddrParams.spp_flags = SPP_PMTUD_DISABLE; - - // The MTU value provided specifies the space available for chunks in the - // packet, so let's subtract the SCTP header size. - peerAddrParams.spp_pathmtu = SctpMtu - sizeof(struct sctp_common_header); + this->transportConnected = true; - ret = usrsctp_setsockopt( - this->socket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &peerAddrParams, sizeof(peerAddrParams)); + MayConnect(); + } - if (ret < 0) - { - MS_THROW_ERROR("usrsctp_setsockopt(SCTP_PEER_ADDR_PARAMS) failed: %s", std::strerror(errno)); - } + void SctpAssociation::TransportDisconnected() + { + MS_TRACE(); - // Announce connecting state. - this->state = SctpState::CONNECTING; - this->listener->OnSctpAssociationConnecting(this); - } - catch (const MediaSoupError& /*error*/) - { - this->state = SctpState::FAILED; - this->listener->OnSctpAssociationFailed(this); - } + this->transportConnected = false; } flatbuffers::Offset SctpAssociation::FillBuffer( @@ -369,12 +325,17 @@ namespace RTC this->isDataChannel); } - void SctpAssociation::ProcessSctpData(const uint8_t* data, size_t len) const + void SctpAssociation::ProcessSctpData(const uint8_t* data, size_t len) { MS_TRACE(); + this->sctpDataReceived = true; + + MayConnect(); + #if MS_LOG_DEV_LEVEL == 3 - MS_DUMP_DATA(data, len); + // NOTE: Only uncomment this during local debugging if needed. + // MS_DUMP_DATA(data, len); #endif usrsctp_conninput(reinterpret_cast(this->id), data, len, 0); @@ -445,7 +406,7 @@ namespace RTC // SCTP send buffer being full is legit, not an error. if (sctpSendBufferFull) { - MS_DEBUG_DEV( + MS_WARN_DEV( "error sending SCTP message [sid:%" PRIu16 ", ppid:%" PRIu32 ", message size:%zu]: %s", parameters.streamId, ppid, @@ -481,10 +442,23 @@ namespace RTC } } + void SctpAssociation::HandleDataProducer(RTC::DataProducer* /*dataProducer*/) + { + MS_TRACE(); + + this->firstStreamCreated = true; + + MayConnect(); + } + void SctpAssociation::HandleDataConsumer(RTC::DataConsumer* dataConsumer) { MS_TRACE(); + this->firstStreamCreated = true; + + MayConnect(); + auto streamId = dataConsumer->GetSctpStreamParameters().streamId; // We need more OS. @@ -522,6 +496,96 @@ namespace RTC ResetSctpStream(streamId, StreamDirection::OUTGOING); } + void SctpAssociation::MayConnect() + { + MS_TRACE(); + + // Just run the SCTP stack if our state is 'new'. + // Notice that once MayConnect() is called (and the code below is executed), + // SCTP state will no longer be "NEW". + if (this->state != SctpState::NEW) + { + MS_DEBUG_DEV("SCTP state is not NEW, ignoring"); + + return; + } + // If the transport is not connected and has never been connected, don't do + // anything. + else if (!this->transportConnected) + { + MS_DEBUG_DEV("transport is not connected, ignoring"); + + return; + } + // If there are no SCTP streams yet and no SCTP data has been yet received + // from the remote peer, don't do anything. + // This is because the peer may never create a DataChannel so we shouldn't + // try to connect SCTP (SCTP INIT chunk, etc) since it will timeout and + // trigger "SCTP failed". + else if (!this->firstStreamCreated && !this->sctpDataReceived) + { + MS_DEBUG_DEV( + "no SCTP stream has been created yet and no SCTP data has been received yet, ignoring"); + + return; + } + + MS_DEBUG_TAG(sctp, "connecting SCTP"); + + try + { + int ret; + struct sockaddr_conn rconn{}; // NOLINT(cppcoreguidelines-pro-type-member-init) + + std::memset(&rconn, 0, sizeof(rconn)); + rconn.sconn_family = AF_CONN; + rconn.sconn_port = htons(5000); + rconn.sconn_addr = reinterpret_cast(this->id); +#ifdef HAVE_SCONN_LEN + rconn.sconn_len = sizeof(rconn); +#endif + + ret = usrsctp_connect(this->socket, reinterpret_cast(&rconn), sizeof(rconn)); + + if (ret < 0 && errno != EINPROGRESS) + { + MS_THROW_ERROR("usrsctp_connect() failed: %s", std::strerror(errno)); + } + + // Disable MTU discovery. + sctp_paddrparams peerAddrParams{}; // NOLINT(cppcoreguidelines-pro-type-member-init) + + std::memset(&peerAddrParams, 0, sizeof(peerAddrParams)); + std::memcpy(&peerAddrParams.spp_address, &rconn, sizeof(rconn)); + peerAddrParams.spp_flags = SPP_PMTUD_DISABLE; + + // The MTU value provided specifies the space available for chunks in the + // packet, so let's subtract the SCTP header size. + peerAddrParams.spp_pathmtu = SctpMtu - sizeof(struct sctp_common_header); + + ret = usrsctp_setsockopt( + this->socket, IPPROTO_SCTP, SCTP_PEER_ADDR_PARAMS, &peerAddrParams, sizeof(peerAddrParams)); + + if (ret < 0) + { + MS_THROW_ERROR("usrsctp_setsockopt(SCTP_PEER_ADDR_PARAMS) failed: %s", std::strerror(errno)); + } + + // Announce connecting state. + MS_DEBUG_DEV("SCTP state switched to CONNECTING (in MayConnect())"); + + this->state = SctpState::CONNECTING; + this->listener->OnSctpAssociationConnecting(this); + } + catch (const MediaSoupError& /*error*/) + { + MS_DEBUG_DEV("SCTP state switched to FAILED (in MayConnect())"); + + this->state = SctpState::FAILED; + this->listener->OnSctpAssociationFailed(this); + } + } + void SctpAssociation::ResetSctpStream(uint16_t streamId, StreamDirection direction) const { MS_TRACE(); @@ -655,7 +719,8 @@ namespace RTC const uint8_t* data = static_cast(buffer); #if MS_LOG_DEV_LEVEL == 3 - MS_DUMP_DATA(data, len); + // NOTE: Only uncomment this during local debugging if needed. + // MS_DUMP_DATA(data, len); #endif this->listener->OnSctpAssociationSendData(this, data, len); @@ -781,6 +846,8 @@ namespace RTC if (this->state != SctpState::CONNECTED) { + MS_DEBUG_DEV("SCTP state switched to CONNECTED (in SCTP_ASSOC_CHANGE)"); + this->state = SctpState::CONNECTED; this->listener->OnSctpAssociationConnected(this); } @@ -813,6 +880,8 @@ namespace RTC if (this->state != SctpState::CLOSED) { + MS_DEBUG_DEV("SCTP state switched to CLOSED (in SCTP_COMM_LOST)"); + this->state = SctpState::CLOSED; this->listener->OnSctpAssociationClosed(this); } @@ -839,6 +908,8 @@ namespace RTC if (this->state != SctpState::CONNECTED) { + MS_DEBUG_DEV("SCTP state switched to CONNECTED (in SCTP_RESTART)"); + this->state = SctpState::CONNECTED; this->listener->OnSctpAssociationConnected(this); } @@ -852,6 +923,8 @@ namespace RTC if (this->state != SctpState::CLOSED) { + MS_DEBUG_DEV("SCTP state switched to CLOSED (in SCTP_SHUTDOWN_COMP)"); + this->state = SctpState::CLOSED; this->listener->OnSctpAssociationClosed(this); } @@ -880,6 +953,8 @@ namespace RTC if (this->state != SctpState::FAILED) { + MS_DEBUG_DEV("SCTP state switched to FAILED (in SCTP_CANT_STR_ASSOC)"); + this->state = SctpState::FAILED; this->listener->OnSctpAssociationFailed(this); } @@ -933,6 +1008,8 @@ namespace RTC if (this->state != SctpState::CLOSED) { + MS_DEBUG_DEV("SCTP state switched to CLOSED (in SCTP_SHUTDOWN_EVENT)"); + this->state = SctpState::CLOSED; this->listener->OnSctpAssociationClosed(this); } diff --git a/worker/src/RTC/Transport.cpp b/worker/src/RTC/Transport.cpp index c5041924cb..7564742b30 100644 --- a/worker/src/RTC/Transport.cpp +++ b/worker/src/RTC/Transport.cpp @@ -1139,6 +1139,12 @@ namespace RTC request->Accept(FBS::Response::Body::DataProducer_DumpResponse, dumpOffset); + if (dataProducer->GetType() == RTC::DataProducer::Type::SCTP) + { + // Tell to the SCTP association. + this->sctpAssociation->HandleDataProducer(dataProducer); + } + break; } @@ -1534,6 +1540,12 @@ namespace RTC dataConsumer->TransportDisconnected(); } + // Tell the SctpAssociation. + if (this->sctpAssociation) + { + this->sctpAssociation->TransportDisconnected(); + } + // Stop the RTCP timer. this->rtcpTimer->Stop(); diff --git a/worker/src/RTC/WebRtcTransport.cpp b/worker/src/RTC/WebRtcTransport.cpp index 7914856089..4cacd371af 100644 --- a/worker/src/RTC/WebRtcTransport.cpp +++ b/worker/src/RTC/WebRtcTransport.cpp @@ -667,6 +667,12 @@ namespace RTC { MS_TRACE(); + // Dont' start DTLS handshake if ICE is not connected/completed. + if (!this->iceServer->GetSelectedTuple()) + { + return; + } + // Do nothing if we have the same local DTLS role as the DTLS transport. // NOTE: local role in DTLS transport can be NONE, but not ours. if (this->dtlsTransport->GetLocalRole() == this->dtlsRole)