Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/brpc/ubshm/shm/shm_ipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,41 @@

namespace brpc {
namespace ubring {
namespace {

RETURN_CODE ReserveIpcShm(int fd, const SHM *shm)
{
#if defined(__linux__)
const int rc = posix_fallocate(fd, 0, (off_t)shm->len);
if (rc != 0) {
LOG(ERROR) << "IPC reserve shm=" << shm->name << " length=" << shm->len
<< " failed, ret(" << rc << ").";
return SHM_ERR;
}
#else
UNREFERENCE_PARAM(fd);
UNREFERENCE_PARAM(shm);
#endif
return UBRING_OK;
}

RETURN_CODE CheckIpcShmSize(int fd, const SHM *shm)
{
struct stat st;
if (fstat(fd, &st) != 0) {
LOG(ERROR) << "IPC stat shm=" << shm->name << " failed, ret(" << errno << ").";
return SHM_ERR;
}
if ((uint64_t)st.st_size < (uint64_t)shm->len) {
LOG(ERROR) << "IPC shm=" << shm->name << " actual length=" << st.st_size
<< " is shorter than requested length=" << shm->len << ".";
return SHM_ERR;
}
return UBRING_OK;
}

} // namespace

RETURN_CODE IpcShmLocalMalloc(SHM *shm)
{
int fd = shm_open(shm->name, O_CREAT | O_EXCL | O_RDWR, SHM_IPC_MODE);
Expand All @@ -50,9 +85,16 @@ RETURN_CODE IpcShmLocalMalloc(SHM *shm)
return SHM_ERR;
}

if (ReserveIpcShm(fd, shm) != UBRING_OK) {
close(fd);
shm_unlink(shm->name);
return SHM_ERR;
}

shm->addr = (uint8_t*)mmap(NULL, shm->len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (shm->addr == (uint8_t*)MAP_FAILED) {
LOG(ERROR) << "IPC map shm=" << shm->name << " length=" << shm->len << " failed, ret(" << errno << ").";
shm->addr = NULL;
close(fd);
shm_unlink(shm->name);
return SHM_ERR;
Expand All @@ -75,6 +117,7 @@ RETURN_CODE IpcShmMunmap(SHM *shm)
return SHM_ERR;
}

shm->addr = NULL;
LOG(INFO) << "IPC unmap shm=" << shm->name << " length=" << shm->len << " success.";
return UBRING_OK;
}
Expand Down Expand Up @@ -109,6 +152,8 @@ RETURN_CODE IpcShmLocalFree(SHM *shm)
int ret = munmap(shm->addr, shm->len);
if (ret != UBRING_OK) {
LOG(WARNING) << "IPC unmap shm=" << shm->name << " failed, ret=" << ret;
} else {
shm->addr = NULL;
}

ret = shm_unlink(shm->name);
Expand Down Expand Up @@ -138,9 +183,15 @@ RETURN_CODE IpcShmRemoteMalloc(SHM *shm)
return SHM_ERR;
}

if (CheckIpcShmSize(fd, shm) != UBRING_OK) {
close(fd);
return SHM_ERR;
}

shm->addr = (uint8_t*)mmap(NULL, shm->len, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (shm->addr == (uint8_t*)MAP_FAILED) {
LOG(ERROR) << "IPC map shm=" << shm->name << " failed, ret=" << errno;
shm->addr = NULL;
close(fd);
return SHM_ERR;
}
Expand All @@ -157,9 +208,15 @@ RETURN_CODE IpcShmLocalMmap(SHM *shm, int prot)
return SHM_ERR;
}

if (CheckIpcShmSize(fd, shm) != UBRING_OK) {
close(fd);
return SHM_ERR;
}

shm->addr = (uint8_t*)mmap(NULL, shm->len, prot, MAP_SHARED, fd, 0);
if (shm->addr == (uint8_t*)MAP_FAILED) {
LOG(ERROR) << "IPC map shm=" << shm->name << " failed, ret=" << errno;
shm->addr = NULL;
close(fd);
return SHM_ERR;
}
Expand All @@ -182,6 +239,7 @@ RETURN_CODE IpcShmRemoteFree(SHM *shm)
return SHM_ERR;
}

shm->addr = NULL;
LOG(INFO) << "IPC free remote shm=" << shm->name << " success.";
return UBRING_OK;
}
Expand Down
7 changes: 6 additions & 1 deletion src/brpc/ubshm/shm/shm_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ RETURN_CODE ShmLocalCalloc(SHM *shm) {
LOG(ERROR) << "Failed to alloc local shm.";
return rc;
}
if (UNLIKELY(shm->addr == NULL)) {
LOG(ERROR) << "Local shm=" << shm->name << " allocated with NULL address.";
ShmFree(shm);
return SHM_ERR;
}
memset(shm->addr, 0, shm->len);
return UBRING_OK;
}
Expand Down Expand Up @@ -244,4 +249,4 @@ RETURN_CODE ShmFree(SHM *shm) {
return rc;
}
}
}
}
15 changes: 11 additions & 4 deletions src/brpc/ubshm/ub_endpoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#if BRPC_WITH_UBRING

#include <errno.h>

#include <gflags/gflags.h>
#include <array>
#include "butil/fd_utility.h"
Expand Down Expand Up @@ -526,12 +528,12 @@ void* UBShmEndpoint::ProcessHandshakeAtServer(void* arg) {
ub_transport->_ub_state = UBShmTransport::UB_OFF;
} else {
ep->_state = S_ALLOC_SHM;
ubring::SHM remote_trx_shm = {NULL, remote_msg.len, 0, {0}, (uint8_t)ep->_socket->fd()};
ubring::SHM remote_trx_shm = {NULL, remote_msg.len, 0, {0}, (uint32_t)ep->_socket->fd()};
strncpy(remote_trx_shm.name, remote_msg.shm_name, SHM_MAX_NAME_BUFF_LEN);

size_t local_shm_len = (size_t)(FLAGS_data_queue_size) * MB_TO_BYTE;
// server端共享内存名称
ubring::SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, (uint8_t)ep->_socket->fd()};
ubring::SHM local_trx_shm = {NULL, local_shm_len, 0, {0}, (uint32_t)ep->_socket->fd()};
char clientName[SHM_MAX_NAME_BUFF_LEN];
strncpy(clientName, remote_msg.shm_name, SHM_MAX_NAME_BUFF_LEN);

Expand Down Expand Up @@ -646,10 +648,15 @@ ssize_t UBShmEndpoint::CutFromIOBufList(butil::IOBuf** from, size_t ndata) {
}

ssize_t nw = 0;
errno = 0;
nw = _ub_ring->UbrTrxWritev(vec, nvec);
if (UNLIKELY(nw == -1)) {
LOG(ERROR) << "Non-blocking send msg in failed, connection has been closed.";
errno = EPIPE;
if (errno == EMSGSIZE) {
LOG(ERROR) << "Non-blocking send msg failed, message is larger than ubring capacity.";
} else {
LOG(ERROR) << "Non-blocking send msg in failed, connection has been closed.";
errno = EPIPE;
}
} else if (UNLIKELY(nw == UBRING_RETRY)) {
errno = EAGAIN;
nw = -1;
Expand Down
30 changes: 19 additions & 11 deletions src/brpc/ubshm/ub_ring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <errno.h>
#include <iostream>
#include <gflags/gflags.h>
#include <unistd.h>
Expand Down Expand Up @@ -91,9 +92,6 @@ RETURN_CODE UBRing::UbrTrxClose() {
if (_trx->ubrTx.remoteRxEventQ.addr != nullptr) {
((UbrEventQMsg *)_trx->ubrTx.remoteRxEventQ.addr)->flag = UBR_STATE_CLOSED;
}
if (UNLIKELY(ShmRemoteFree(&_trx->remoteShm) != UBRING_OK)) {
LOG(WARNING) << "Force close, remote shm " << _trx->remoteShm.name << " free failed.";
}
if (UNLIKELY(UbrTrxFreeShm(_trx) != UBRING_OK)) {
LOG(WARNING) << "Force close, local shm " << _trx->localShm.name << " free failed.";
}
Expand Down Expand Up @@ -321,10 +319,6 @@ void *UBRing::UbrAsynClearCallback(void *args)
return NULL;
}

if (UNLIKELY(ShmRemoteFree(&trx->remoteShm) != UBRING_OK)) {
LOG(ERROR) << "Trx close, remote shm " << trx->remoteShm.name << " free failed.";
}

if (UNLIKELY(UbrTrxFreeShm(trx) != UBRING_OK)) {
LOG(ERROR) << "Trx close, wait for local shm " << trx->localShm.name << " free fail.";
}
Expand All @@ -348,6 +342,12 @@ int UBRing::UbrTrxSend(const void *buf, uint32_t bufLen)
uint32_t remainChunkNum =
(_trx->ubrTx.writePos > tail) ? (tail + cap - _trx->ubrTx.writePos) : (tail - _trx->ubrTx.writePos);
uint32_t needMsgChunkNum = CalcUbrMsgChunkCnt(bufLen);
if (needMsgChunkNum >= cap) {
LOG(ERROR) << "Ubr send failed, payload length=" << bufLen
<< " needs " << needMsgChunkNum << " chunks, capacity=" << cap << ".";
errno = EMSGSIZE;
return UBRING_ERR;
}
if (remainChunkNum < needMsgChunkNum) {
return UBRING_RETRY;
}
Expand Down Expand Up @@ -653,7 +653,7 @@ RETURN_CODE UBRing::UbrTrxFreeShm(UbrTrx *trx)

RETURN_CODE remoteRc = UBRING_OK;
if (trx->remoteShm.addr != NULL) {
remoteRc = IpcShmRemoteFree(&trx->remoteShm);
remoteRc = ShmRemoteFree(&trx->remoteShm);
}
if (remoteRc != UBRING_OK) {
LOG(WARNING) << "Free remote shm " << trx->remoteShm.name << " failed, rc=" << remoteRc;
Expand Down Expand Up @@ -795,6 +795,7 @@ int UBRing::UbrAllocateServerShm(SHM* remote_trx_shm, SHM* local_trx_shm) {

if (UNLIKELY((ShmLocalCalloc(local_trx_shm)) != UBRING_OK)) {
LOG(ERROR) << "Trx apply local shared memory failed.";
ShmRemoteFree(remote_trx_shm);
return -1;
}

Expand All @@ -808,9 +809,9 @@ int UBRing::UbrAllocateServerShm(SHM* remote_trx_shm, SHM* local_trx_shm) {
_trx->type = TCP_TRX;
if (UNLIKELY((UbrServerTrxInit(local_trx_shm, remote_trx_shm)) != UBRING_OK)) {
LOG(ERROR) << "Server trx init failed.";
ShmRemoteFree(remote_trx_shm);
UbrTrxFreeShm(_trx);
UBRingManager::ReleaseUbrTrxFromMgr(_trx);
_trx = nullptr;
return -1;
}
return 0;
Expand All @@ -826,6 +827,7 @@ int UBRing::UbrAllocateLocalShm(SHM *local_trx_shm, const char *shm_name)
_trx->type = TCP_TRX;
if (UNLIKELY((ApplyAndMapLocalShm(local_trx_shm, shm_name)) != UBRING_OK)) {
LOG(ERROR) << "Trx apply or map local shared memory failed, localName=" << shm_name;
_trx = nullptr;
return -1;
}
return 0;
Expand Down Expand Up @@ -873,7 +875,7 @@ RETURN_CODE UBRing::UbrMapRemoteShmAddTimer(SHM *localTrxShm, const char *localN

if (UNLIKELY(UbrAddTimer() != UBRING_OK)) {
LOG(ERROR) << "Ubr add timer failed, localName=" << localName;
ShmRemoteFree(&remoteTrxShm);
ShmRemoteFree(&_trx->remoteShm);
return UBRING_ERR;
}

Expand All @@ -884,7 +886,7 @@ RETURN_CODE UBRing::UbrMapRemoteShmAddTimer(SHM *localTrxShm, const char *localN
LOG(ERROR) << "Local shm " << localTrxShm->name << " wait for connect remote map timeout.";
DeleteTimerSafe((uint32_t)_trx->hbTimerFd);
DeleteTimerSafe((uint32_t)_trx->timerFd);
ShmRemoteFree(&remoteTrxShm);
ShmRemoteFree(&_trx->remoteShm);
return UBRING_ERR_TIMEOUT;
}

Expand Down Expand Up @@ -961,6 +963,12 @@ RETURN_CODE UBRing::WritevHasEnoughSpace(size_t bufLen)
uint32_t remainChunkNum =
(_trx->ubrTx.writePos > tail) ? (tail + cap - _trx->ubrTx.writePos) : (tail - _trx->ubrTx.writePos);
uint32_t needMsgChunkNum = CalcUbrMsgChunkCnt((uint32_t)bufLen);
if (needMsgChunkNum >= cap) {
LOG(ERROR) << "Ubr write failed, payload length=" << bufLen
<< " needs " << needMsgChunkNum << " chunks, capacity=" << cap << ".";
errno = EMSGSIZE;
return UBRING_ERR;
}
if (remainChunkNum < needMsgChunkNum) {
return UBRING_RETRY;
}
Expand Down
Loading
Loading