Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/nameserver/cluster_info.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,14 @@ int ClusterInfo::Init(std::string& msg) {
PDLOG(WARNING, "connect ns failed, replica cluster ns");
return 403;
}
zk_client_->WatchNodes(boost::bind(&ClusterInfo::UpdateNSClient, this, _1));
zk_client_->WatchNodes([this](const std::vector<std::string>& endpoints) {
this->UpdateNSClient(endpoints);
});
zk_client_->WatchNodes();
if (FLAGS_use_name) {
UpdateRemoteRealEpMap();
bool ok = zk_client_->WatchItem(cluster_add_.zk_path() + "/nodes",
boost::bind(&ClusterInfo::UpdateRemoteRealEpMap, this));
[this]() { this->UpdateRemoteRealEpMap(); });
if (!ok) {
zk_client_->CloseZK();
msg = "zk watch nodes failed";
Expand Down
4 changes: 3 additions & 1 deletion src/nameserver/name_server_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1509,7 +1509,9 @@ bool NameServerImpl::Init(const std::string& zk_cluster, const std::string& zk_p
} else {
UpdateTablets(endpoints);
}
zk_client_->WatchNodes(boost::bind(&NameServerImpl::UpdateTabletsLocked, this, _1));
zk_client_->WatchNodes([this](const std::vector<std::string>& endpoints) {
this->UpdateTabletsLocked(endpoints);
});
bool ok = zk_client_->WatchNodes();
if (!ok) {
PDLOG(WARNING, "fail to watch nodes");
Expand Down
4 changes: 2 additions & 2 deletions src/tablet/tablet_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,11 @@ bool TabletImpl::RegisterZK() {
zk_client_->CreateNode(globalvar_changed_notify_path_, "1");
}
if (!zk_client_->WatchItem(globalvar_changed_notify_path_,
boost::bind(&TabletImpl::UpdateGlobalVarTable, this))) {
[this]() { this->UpdateGlobalVarTable(); })) {
LOG(WARNING) << "add global var changed watcher failed";
return false;
}
if (!zk_client_->WatchItem(notify_path_, boost::bind(&TabletImpl::RefreshTableInfo, this))) {
if (!zk_client_->WatchItem(notify_path_, [this]() { this->RefreshTableInfo(); })) {
LOG(WARNING) << "add notify watcher failed";
return false;
}
Expand Down
3 changes: 1 addition & 2 deletions src/tablet/tablet_impl_keep_alive_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include <unistd.h>
#include <string>

#include "boost/bind.hpp"
#include "gflags/gflags.h"
#include "gtest/gtest.h"
#include "tablet/tablet_impl.h"
Expand Down Expand Up @@ -71,7 +70,7 @@ TEST_F(TabletImplTest, KeepAlive) {
ASSERT_TRUE(ok);
ok = zk_client.Mkdir("/rtidb2/nodes");
ASSERT_TRUE(ok);
zk_client.WatchNodes(boost::bind(&WatchCallback, _1));
zk_client.WatchNodes(&WatchCallback);
ok = zk_client.WatchNodes();
ASSERT_TRUE(ok);
TabletImpl tablet;
Expand Down
11 changes: 6 additions & 5 deletions src/zk/dist_lock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

#include "zk/dist_lock.h"

#include "absl/strings/str_join.h"
#include "base/glog_wrapper.h"
#include "boost/algorithm/string/join.hpp"
#include "boost/bind.hpp"
extern "C" {
#include "zookeeper/zookeeper.h"
}
Expand All @@ -43,7 +42,7 @@ DistLock::DistLock(const std::string& root_path, ZkClient* zk_client, NotifyCall

DistLock::~DistLock() {}

void DistLock::Lock() { pool_.AddTask(boost::bind(&DistLock::InternalLock, this)); }
void DistLock::Lock() { pool_.AddTask([this]() { this->InternalLock(); }); }

void DistLock::Stop() {
running_.store(false, std::memory_order_relaxed);
Expand Down Expand Up @@ -74,7 +73,9 @@ void DistLock::InternalLock() {
lock_state_.store(kTryLock, std::memory_order_relaxed);
client_session_term_ = cur_session_term;
HandleChildrenChanged(children);
zk_client_->WatchChildren(root_path_, boost::bind(&DistLock::HandleChildrenChangedLocked, this, _1));
zk_client_->WatchChildren(root_path_, [this](const std::vector<std::string>& children) {
this->HandleChildrenChangedLocked(children);
});
}
}
}
Expand Down Expand Up @@ -117,7 +118,7 @@ void DistLock::HandleChildrenChanged(const std::vector<std::string>& children) {
}
PDLOG(INFO, "my path %s , first child %s , lock value %s", assigned_path_.c_str(), current_lock_node_.c_str(),
current_lock_value_.c_str());
PDLOG(INFO, "all child: %s", boost::algorithm::join(children, ", ").c_str());
PDLOG(INFO, "all child: %s", absl::StrJoin(children, ",").c_str());
}

void DistLock::HandleChildrenChangedLocked(const std::vector<std::string>& children) {
Expand Down
4 changes: 2 additions & 2 deletions src/zk/dist_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
#define SRC_ZK_DIST_LOCK_H_

#include <atomic>
#include <functional>
#include <mutex> // NOLINT
#include <string>
#include <vector>

#include "boost/function.hpp"
#include "common/thread_pool.h"
#include "zk/zk_client.h"

Expand All @@ -32,7 +32,7 @@ using ::baidu::common::ThreadPool;

enum LockState { kLocked, kLostLock, kTryLock };

typedef boost::function<void()> NotifyCallback;
typedef std::function<void()> NotifyCallback;
class DistLock {
public:
DistLock(const std::string& root_path, ZkClient* zk_client, NotifyCallback on_locked_cl,
Expand Down
10 changes: 4 additions & 6 deletions src/zk/dist_lock_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
#include <sched.h>
#include <unistd.h>

#include <boost/bind.hpp>

#include "zk/zk_client.h"
extern "C" {
#include "zookeeper/zookeeper.h"
Expand All @@ -46,8 +44,8 @@ TEST_F(DistLockTest, Lock) {
ZkClient client("127.0.0.1:6181", "", 10000, "127.0.0.1:9527", "/openmldb_lock", "", "");
bool ok = client.Init();
ASSERT_TRUE(ok);
DistLock lock("/openmldb_lock/nameserver_lock", &client, boost::bind(&OnLockedCallback),
boost::bind(&OnLostCallback), "endpoint1");
DistLock lock("/openmldb_lock/nameserver_lock", &client, &OnLockedCallback,
&OnLostCallback, "endpoint1");
lock.Lock();
sleep(5);
if (!call_invoked) {
Expand All @@ -65,8 +63,8 @@ TEST_F(DistLockTest, Lock) {
lock.Stop();
ASSERT_TRUE(false);
}
DistLock lock2("/openmldb_lock/nameserver_lock", &client2, boost::bind(&OnLockedCallback),
boost::bind(&OnLostCallback), "endpoint2");
DistLock lock2("/openmldb_lock/nameserver_lock", &client2, &OnLockedCallback,
&OnLostCallback, "endpoint2");
lock2.Lock();
sleep(5);
ASSERT_FALSE(call_invoked);
Expand Down
7 changes: 3 additions & 4 deletions src/zk/zk_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,22 @@
#include <atomic>
#include <condition_variable> // NOLINT
#include <cstdio>
#include <functional>
#include <map>
#include <mutex> // NOLINT
#include <string>
#include <vector>

#include "boost/function.hpp"

extern "C" {
#include "zookeeper/zookeeper.h"
}

namespace openmldb {
namespace zk {

typedef boost::function<void(const std::vector<std::string>& endpoint)> NodesChangedCallback;
typedef std::function<void(const std::vector<std::string>& endpoint)> NodesChangedCallback;

typedef boost::function<void(void)> ItemChangedCallback;
typedef std::function<void(void)> ItemChangedCallback;

const uint32_t ZK_MAX_BUFFER_SIZE = 1024 * 1024;

Expand Down
4 changes: 1 addition & 3 deletions src/zk/zk_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
#include <sched.h>
#include <unistd.h>

#include <boost/bind.hpp>

#include "base/glog_wrapper.h" // NOLINT
extern "C" {
#include "zookeeper/zookeeper.h"
Expand Down Expand Up @@ -66,7 +64,7 @@ TEST_F(ZkClientTest, Init) {
uint32_t size = 1;
ASSERT_EQ(size, endpoints.size());
ASSERT_EQ("127.0.0.1:9527", endpoints[0]);
client.WatchNodes(boost::bind(&WatchCallback, _1));
client.WatchNodes(&WatchCallback);
// trigger watch
ok = client.WatchNodes();
ASSERT_TRUE(ok);
Expand Down
Loading