diff --git a/.clang-format b/.clang-format index 7e4a8374..c7e91cf5 100644 --- a/.clang-format +++ b/.clang-format @@ -12,7 +12,7 @@ AllowShortLoopsOnASingleLine: true AllowShortFunctionsOnASingleLine: All AlwaysBreakTemplateDeclarations: true AlwaysBreakBeforeMultilineStrings: false -BreakBeforeBinaryOperators: true +BreakBeforeBinaryOperators: true BreakBeforeTernaryOperators: true BreakConstructorInitializersBeforeComma: false BinPackParameters: true @@ -53,4 +53,3 @@ ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ] SpaceBeforeParens: Never DisableFormat: false ... - diff --git a/scripts/util/cascade_runner b/scripts/util/cascade_runner new file mode 100755 index 00000000..1d148cc2 --- /dev/null +++ b/scripts/util/cascade_runner @@ -0,0 +1,196 @@ +#!/bin/bash + +set -euo pipefail + +SCRIPT_DIR=$(dirname "${BASH_SOURCE[0]}") +SCRIPT_NAME=$(basename "${BASH_SOURCE[0]}") +HELP_STRING="A helper script to set up a local cascade instance +Usage: $SCRIPT_NAME [options] + -h Print this message and exit. + -d D CFG directory location (defaults to cfg). + -n N Sets a range from 0..N-1 for range commands commands. + + -u K update key in range. + -v V update value in range. + (ex. -n 7 -u default_log_level -v info) + + -k Kill running cascade nodes. + -r Remove old logs in range. + -s Start server nodes in range. + -f D directory to mount (requires -c flag to start and -k to end). + -c I Start client node at folder cfg/nI." + +CFG_DIR="$SCRIPT_DIR/cfg" +FUSE_DIR="" +HELP=false +KILL=false +CLEAN=false +SERVERS=false +NODES=0 +CLIENT=-1 + +UPDATE_KEY="" +UPDATE_VALUE="" + +# TODO get from env? or as arg +BASE_PATH="/root/workspace/cascade/build-Release/src" + +export PATH="$PATH:$BASE_PATH/service" +export PATH="$PATH:$BASE_PATH/service/fuse" + +FUSE_COMMAND="cascade_fuse_client" +CLIENT_COMMAND="cascade_client" +SERVER_COMMAND="cascade_server" +LOG_FILE="node_info.log" + +if ! command -v $SERVER_COMMAND &>/dev/null; then + echo "$SERVER_COMMAND could not be found" + exit_abnormal +fi + +usage() { + echo "$HELP_STRING" 1>&2 +} + +exit_abnormal() { + usage + exit 1 +} + +check_num() { + # $1: string to check + RE_ISNUM='^[0-9]+$' + if ! [[ $1 =~ $RE_ISNUM ]]; then + exit_abnormal + fi +} + +foreach() { + # $1: command to do (accepts count as arg) + COUNT=0 + while [ $COUNT -lt "$NODES" ]; do + DIR="$CFG_DIR/n$COUNT" + cd "$DIR" + + $1 $COUNT + + cd - &>/dev/null + ((COUNT += 1)) + done +} + +main() { + while getopts 'hd:n:f:krsc:u:v:' opt; do + case "$opt" in + h) HELP=true ;; + d) CFG_DIR="$OPTARG" ;; + n) + NODES="$OPTARG" + check_num "$NODES" + ;; + f) FUSE_DIR="$OPTARG" ;; + k) KILL=true ;; + r) CLEAN=true ;; + s) SERVERS=true ;; + c) + CLIENT="$OPTARG" + check_num "$CLIENT" + ;; + u) UPDATE_KEY="$OPTARG" ;; + v) UPDATE_VALUE="$OPTARG" ;; + *) exit_abnormal ;; + esac + done + + if [[ "$HELP" = true ]]; then + usage + exit + fi + + if [[ -n "$FUSE_DIR" ]]; then + MOUNT="$(realpath "$FUSE_DIR")" + + if [[ ! "$CLIENT" -eq "-1" ]]; then + DIR="$CFG_DIR/n$CLIENT" + cd "$DIR" + + echo "fuse client mounting at $MOUNT" + mkdir -p "$MOUNT" + "$FUSE_COMMAND" -s -f "$MOUNT" &>"$LOG_FILE" & + + cd - &>/dev/null + exit + fi + + if [[ "$KILL" = true ]]; then + echo "unmounting $MOUNT and ending fuses client" + umount "$MOUNT" + + pkill -SIGINT "$FUSE_COMMAND" || true + + exit + fi + fi + + if [[ "$KILL" = true ]]; then + # TODO better idea: save PIDs? + + pkill -SIGINT "$CLIENT_COMMAND" || true + pkill -SIGINT "$SERVER_COMMAND" || true # kill cascade_server + + # kill old cascade_runner and subprocesses (excluding itself) + # pgrep "${SCRIPT_NAME%.*}" | grep -v "$$" | while read -r LINE; do + # SUB_SLEEP=$(pgrep -P "$LINE") + # kill "$LINE" # kill cascade_runner while loop + # kill "$SUB_SLEEP" # kill sleep + # done + fi + + if [[ "$CLEAN" = true ]]; then + x() { + echo "$1 > removing .plog and logs" + rm -rf .plog derecho_debug.log "$LOG_FILE" + } + foreach x + fi + + if [[ -n "$UPDATE_KEY" ]]; then + x() { + CONFIG_FILE="derecho.cfg" + echo "$1 > updating $UPDATE_KEY with $UPDATE_VALUE in $CONFIG_FILE" + sed -i "s/\($UPDATE_KEY *= *\)\(.*\)/\1$UPDATE_VALUE/" "$CONFIG_FILE" + } + foreach x + fi + + if [[ ! "$CLIENT" -eq "-1" ]]; then + DIR="$CFG_DIR/n$CLIENT" + + if [[ -n "$FUSE_DIR" ]]; then + mkdir -p "$MOUNT" + + cd "$DIR" + "$FUSE_COMMAND" -s -f "$MOUNT" &>"$LOG_FILE" & + else + cd "$DIR" + "$CLIENT_COMMAND" + fi + + cd - &>/dev/null + exit + fi + set +e + + if [[ "$SERVERS" = true ]]; then + x() { + echo "$1 > starting server node" + "$SERVER_COMMAND" --signal &>"$LOG_FILE" & + # (while true; do sleep 10000; done) | + # "$SERVER_COMMAND" &>"$LOG_FILE" & + } + foreach x + + fi +} + +main "$@" diff --git a/src/service/fuse/CMakeLists.txt b/src/service/fuse/CMakeLists.txt index 0412104d..c329ced8 100644 --- a/src/service/fuse/CMakeLists.txt +++ b/src/service/fuse/CMakeLists.txt @@ -15,7 +15,7 @@ if (${HAS_FUSE}) ) target_link_libraries(fuse_client cascade readline fuse3) set_target_properties(fuse_client PROPERTIES OUTPUT_NAME cascade_fuse_client) - + add_custom_command(TARGET fuse_client POST_BUILD COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/testfuse.py ${CMAKE_CURRENT_BINARY_DIR}/. COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/util.py ${CMAKE_CURRENT_BINARY_DIR}/. @@ -29,4 +29,3 @@ if (${HAS_FUSE}) ) endif() - diff --git a/src/service/fuse/README.md b/src/service/fuse/README.md index a5da1b86..8fcb79ba 100644 --- a/src/service/fuse/README.md +++ b/src/service/fuse/README.md @@ -1,12 +1,24 @@ # Fuse API -Cascade service supports FUSE(Filesystem in Userspace) API to access data. It is a standalone client application that links with [`libfuse`](https://github.com/libfuse/libfuse). Cascade fuse api mount the file system to interact with Cascade K/V stored objects. +Cascade service supports FUSE(Filesystem in Userspace) API to access data. It is +a standalone client application that links with +[`libfuse`](https://github.com/libfuse/libfuse). Cascade fuse api mount the file +system to interact with Cascade K/V stored objects. -If you didn't see `-- Looking for include files fuse3/fuse.h, fuse3/fuse_lowlevel.h - found`, it is probably because the libfuse support is not installed. FUSE application shall be built as an executable in the built directory `src/service/fuse/cascade_fuse_client` +If you didn't see +`-- Looking for include files fuse3/fuse.h, fuse3/fuse_lowlevel.h - found`, it +is probably because the libfuse support is not installed. FUSE application shall +be built as an executable in the built directory +`src/service/fuse/cascade_fuse_client` -Running FUSE POSIX shell application is similar to run cascade cmd client. Once the cascade service is configured and started, in the client node, running `../../fuse/cascade_fuse_client [directory_to_mount]` will mount cascade file systsem to the dedicated directory. +Running FUSE POSIX shell application is similar to run cascade cmd client. Once +the cascade service is configured and started, in the client node, running +`../../fuse/cascade_fuse_client [directory_to_mount]` will mount cascade file +systsem to the dedicated directory. -Once fuse applicatino is mounted to the directory, you can access the K/V object stored in cascade using linux shell command. The structured of the mounted file system is as following +Once fuse application is mounted to the directory, you can access the K/V object +stored in cascade using linux shell command. The structured of the mounted file +system is as following ```bash . @@ -14,36 +26,39 @@ Once fuse applicatino is mounted to the directory, you can access the K/V object |-- PersistentCascadeStoreWithStringKey | |-- subgroup-0 | |-- shard-0 -| |-- .cascade +| |-- .cascade | |-- key-0 | ... |-- VolatileCascadeStoreWithStringKey | |-- subgroup-0 | |-- shard-0 -| |-- .cascade +| |-- .cascade | ... |-- TriggerCascadeStoreWithStringKey |-- ObjectPools | |-- objectpoolpathname-a -| |-- a1 -| |-- objectpool object 0 -| ... -| |-- .cascade +| |-- a1 +| |-- objectpool object 0 +| ... +| |-- .cascade | |-- .cascade |-- .cascade ``` Support READ commands: + ``` -cd [dir] open directory -ls [dir] list directory -ls -a list directory with attributes -cat [file] read file -cat .cascade read directory metadata information +cd [dir] open directory +ls [dir] list directory +ls -a list directory with attributes +cat [file] read file +cat .cascade read directory metadata information ``` Limitation: + - Support only single-threaded fuse client -- Current read_file keeps a buffer of file_bytes in memory, needs further optimization to read large file +- Current read_file keeps a buffer of file_bytes in memory, needs further + optimization to read large file - New features to come: WRITE commands to have fuse client interact with cascade - File write and editing commands, managing directories commands \ No newline at end of file + File write and editing commands, managing directories commands diff --git a/src/service/fuse/fuse_client.cpp b/src/service/fuse/fuse_client.cpp index b0dc45d8..b8bfbd4a 100644 --- a/src/service/fuse/fuse_client.cpp +++ b/src/service/fuse/fuse_client.cpp @@ -1,24 +1,22 @@ #define FUSE_USE_VERSION 31 -#include -#include -#include -#include -#include -#include -#include #include "fuse_client_context.hpp" #include #include #include - +#include +#include +#include +#include +#include +#include #include -#define FUSE_CLIENT_DEV_ID (0xCA7CADE) +#define FUSE_CLIENT_DEV_ID (0xCA7CADE) /** * fuse_client mount the cascade service to file system. This allows users to access cascade data with normal POSIX * filesystem API. - * + * * The data in cascade is organized this way: * ///// * "mount-point" is where the cascade data is mounted. @@ -36,34 +34,34 @@ using FuseClientContextType = FuseClientContext(p) #define FCC_REQ(req) FCC(fuse_req_userdata(req)) -static void fs_init(void* userdata, struct fuse_conn_info *conn) { - dbg_default_trace("entering {}.",__func__); - if (derecho::hasCustomizedConfKey(CONF_LAYOUT_JSON_LAYOUT)) { - FCC(userdata)->initialize(json::parse(derecho::getConfString(CONF_LAYOUT_JSON_LAYOUT))); - } else if (derecho::hasCustomizedConfKey(CONF_LAYOUT_JSON_LAYOUT_FILE)){ +static void fs_init(void* userdata, struct fuse_conn_info* conn) { + dbg_default_trace("entering {}.", __func__); + if(derecho::hasCustomizedConfKey(CONF_LAYOUT_JSON_LAYOUT)) { + FCC(userdata)->initialize(json::parse(derecho::getConfString(CONF_LAYOUT_JSON_LAYOUT))); + } else if(derecho::hasCustomizedConfKey(CONF_LAYOUT_JSON_LAYOUT_FILE)) { nlohmann::json layout_array; std::ifstream json_file(derecho::getAbsoluteFilePath(derecho::getConfString(CONF_LAYOUT_JSON_LAYOUT_FILE))); - if (!json_file) { - dbg_default_error("Cannot load json configuration from file: {}", derecho::getAbsoluteFilePath(derecho::getConfString(CONF_LAYOUT_JSON_LAYOUT_FILE))); + if(!json_file) { + dbg_default_error("Cannot load json configuration from file: {}", derecho::getAbsoluteFilePath(derecho::getConfString(CONF_LAYOUT_JSON_LAYOUT_FILE))); throw derecho::derecho_exception("Cannot load json configuration from file."); } json_file >> layout_array; FCC(userdata)->initialize(layout_array); } - dbg_default_trace("leaving {}.",__func__); + dbg_default_trace("leaving {}.", __func__); } static void fs_destroy(void* userdata) { - dbg_default_trace("entering {}.",__func__); - dbg_default_trace("leaving {}.",__func__); + dbg_default_trace("entering {}.", __func__); + dbg_default_trace("leaving {}.", __func__); } static void fs_lookup(fuse_req_t req, fuse_ino_t parent, const char* name) { - dbg_default_trace("entering {}.",__func__); + dbg_default_trace("entering {}.", __func__); struct fuse_entry_param e; // TODO: make this more efficient by implement a dedicated call in FCC. auto name_to_ino = FCC_REQ(req)->get_dir_entries(parent); - if (name_to_ino.find(name) == name_to_ino.end()) { + if(name_to_ino.find(name) == name_to_ino.end()) { fuse_reply_err(req, ENOENT); } else { // TODO: change timeout settings. @@ -75,79 +73,79 @@ static void fs_lookup(fuse_req_t req, fuse_ino_t parent, const char* name) { fuse_reply_entry(req, &e); } - dbg_default_trace("leaving {}.",__func__); + dbg_default_trace("leaving {}.", __func__); } static void fs_getattr(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { - dbg_default_trace("entering {}.",__func__); + dbg_default_trace("entering {}.", __func__); struct stat stbuf; - (void) fi; + (void)fi; std::memset(&stbuf, 0, sizeof(stbuf)); stbuf.st_ino = ino; FCC_REQ(req)->fill_stbuf_by_ino(stbuf); fuse_reply_attr(req, &stbuf, 10000.0); - dbg_default_trace("leaving {}.",__func__); + dbg_default_trace("leaving {}.", __func__); } // borrowed from libfuse \a hello_ll.c struct dirbuf { - char *p; + char* p; size_t size; }; -static void dirbuf_add(fuse_req_t req, struct dirbuf *b, const char *name, fuse_ino_t ino) { +static void dirbuf_add(fuse_req_t req, struct dirbuf* b, const char* name, fuse_ino_t ino) { struct stat stbuf; size_t oldsize = b->size; b->size += fuse_add_direntry(req, nullptr, 0, name, nullptr, 0); - b->p = (char*)realloc(b->p,b->size); - std::memset(&stbuf,0,sizeof(stbuf)); + b->p = (char*)realloc(b->p, b->size); + std::memset(&stbuf, 0, sizeof(stbuf)); stbuf.st_ino = ino; FCC_REQ(req)->fill_stbuf_by_ino(stbuf); - fuse_add_direntry(req, b->p+oldsize, b->size-oldsize, name, &stbuf, b->size); + fuse_add_direntry(req, b->p + oldsize, b->size - oldsize, name, &stbuf, b->size); } #define min(x, y) ((x) < (y) ? (x) : (y)) static void fs_readdir(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info* fi) { - dbg_default_trace("entering {}.",__func__); + dbg_default_trace("entering {}.", __func__); struct dirbuf b; std::memset(&b, 0, sizeof(b)); dirbuf_add(req, &b, ".", 1); dirbuf_add(req, &b, "..", 1); - for(auto kv:FCC_REQ(req)->get_dir_entries(ino)) { + for(auto kv : FCC_REQ(req)->get_dir_entries(ino)) { dirbuf_add(req, &b, kv.first.c_str(), kv.second); } - if (static_cast(off) < b.size) { - fuse_reply_buf(req, b.p + off, min(b.size - off, size)); + if(static_cast(off) < b.size) { + fuse_reply_buf(req, b.p + off, min(b.size - off, size)); } else { fuse_reply_buf(req, NULL, 0); } free(b.p); - dbg_default_trace("leaving {}.",__func__); + dbg_default_trace("leaving {}.", __func__); } static void fs_open(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { - dbg_default_trace("entering {}.",__func__); + dbg_default_trace("entering {}.", __func__); int err; - if ((fi->flags & O_ACCMODE) != O_RDONLY) { + if((fi->flags & O_ACCMODE) != O_RDONLY) { fuse_reply_err(req, EACCES); - } else if ((err = FCC_REQ(req)->open_file(ino, fi)) != 0) { + } else if((err = FCC_REQ(req)->open_file(ino, fi)) != 0) { fuse_reply_err(req, err); } else { - dbg_default_debug("fi({:x})->fh={:x}",reinterpret_cast(fi),fi->fh); + dbg_default_debug("fi({:x})->fh={:x}", reinterpret_cast(fi), fi->fh); fuse_reply_open(req, fi); } - dbg_default_trace("leaving {}.",__func__); + dbg_default_trace("leaving {}.", __func__); } static void fs_read(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, struct fuse_file_info* fi) { dbg_default_trace("entering {}.", __func__); FileBytes* pfb = reinterpret_cast(fi->fh); - if (static_cast(off) < pfb->size) { - fuse_reply_buf(req, reinterpret_cast(pfb->bytes+off), min(pfb->size - off, size)); + if(static_cast(off) < pfb->size) { + fuse_reply_buf(req, reinterpret_cast(pfb->bytes + off), min(pfb->size - off, size)); } else { fuse_reply_buf(req, nullptr, 0); } @@ -155,38 +153,37 @@ static void fs_read(fuse_req_t req, fuse_ino_t ino, size_t size, off_t off, stru } static void fs_release(fuse_req_t req, fuse_ino_t ino, struct fuse_file_info* fi) { - dbg_default_trace("entering {}.",__func__); + dbg_default_trace("entering {}.", __func__); FCC_REQ(req)->close_file(ino, fi); - fuse_reply_err(req,0); - dbg_default_trace("leaving {}.",__func__); + fuse_reply_err(req, 0); + dbg_default_trace("leaving {}.", __func__); } static const struct fuse_lowlevel_ops fs_ops = { - .init = fs_init, - .destroy = fs_destroy, - .lookup = fs_lookup, - .forget = NULL, - .getattr = fs_getattr, - .setattr = NULL, - .readlink = NULL, - .mknod = NULL, - .mkdir = NULL, - .unlink = NULL, - .rmdir = NULL, - .symlink = NULL, - .rename = NULL, - .link = NULL, - .open = fs_open, - .read = fs_read, - .write = NULL, - .flush = NULL, - .release = fs_release, - .fsync = NULL, - .opendir = NULL, - .readdir = fs_readdir, + .init = fs_init, + .destroy = fs_destroy, + .lookup = fs_lookup, + .forget = NULL, + .getattr = fs_getattr, + .setattr = NULL, + .readlink = NULL, + .mknod = NULL, + .mkdir = NULL, + .unlink = NULL, + .rmdir = NULL, + .symlink = NULL, + .rename = NULL, + .link = NULL, + .open = fs_open, + .read = fs_read, + .write = NULL, + .flush = NULL, + .release = fs_release, + .fsync = NULL, + .opendir = NULL, + .readdir = fs_readdir, }; - /** * According to our experiment as well as recorded in * [this](https://www.cs.hmc.edu/~geoff/classes/hmc.cs135.201109/homework/fuse/fuse_doc.html) document as following: @@ -201,39 +198,37 @@ static const struct fuse_lowlevel_ops fs_ops = { void prepare_derecho_conf_file() { char cwd[4096]; #pragma GCC diagnostic ignored "-Wunused-result" - getcwd(cwd,4096); + getcwd(cwd, 4096); #pragma GCC diagnostic pop - sprintf(cwd+strlen(cwd), "/derecho.cfg"); + sprintf(cwd + strlen(cwd), "/derecho.cfg"); setenv("DERECHO_CONF_FILE", cwd, false); dbg_default_debug("Using derecho config file:{}.", getenv("DERECHO_CONF_FILE")); } void log_current_dir(bool foreground) { - char buff[FILENAME_MAX]; //create string buffer to hold path - auto working = GetCurrentDir( buff, FILENAME_MAX ); - if(working){ - std::string current_working_dir(buff); - - current_working_dir = (foreground)? current_working_dir + " (running in foreground)": current_working_dir + " (running in background)"; - auto derecho_conf_file = getenv("DERECHO_CONF_FILE"); - - std::ofstream out("/home/yy354/fuse_log.txt"); - out << current_working_dir; - out << " derecho conf file: [" <" << std::endl; fuse_cmdline_help(); fuse_lowlevel_help(); ret = 0; throw 1; - } else if (opts.show_version) { + } else if(opts.show_version) { std::cout << "FUSE library version " << fuse_pkgversion() << std::endl; fuse_lowlevel_version(); ret = 0; throw 1; } - if (opts.mountpoint == nullptr) { + if(opts.mountpoint == nullptr) { std::cout << "usage: " << argv[0] << " [options] " << std::endl; ret = 1; throw 1; } - fuse_daemonize(opts.foreground); + fuse_daemonize(opts.foreground); // start session FuseClientContextType fcc; se = fuse_session_new(&args, &fs_ops, sizeof(fs_ops), &fcc); - if (se == nullptr) { + if(se == nullptr) { throw 1; } - if (fuse_set_signal_handlers(se) != 0) { + if(fuse_set_signal_handlers(se) != 0) { throw 2; } - if (fuse_session_mount(se, opts.mountpoint) != 0) { + if(fuse_session_mount(se, opts.mountpoint) != 0) { throw 3; } - //log_current_dir(opts.foreground); + // log_current_dir(opts.foreground); - /* Block until ctrl+c or fuserount -u */ - if (opts.singlethread) { + /* Block until ctrl+c or fusermount -u */ + if(opts.singlethread) { ret = fuse_session_loop(se); } else { ret = fuse_session_loop_mt(se, opts.clone_fd); } - + fuse_session_unmount(se); - } - catch (int& ex) { + } catch(int& ex) { switch(ex) { - case 3: - fuse_remove_signal_handlers(se); - case 2: - fuse_session_destroy(se); - case 1: - free(opts.mountpoint); - fuse_opt_free_args(&args); + case 3: + fuse_remove_signal_handlers(se); + case 2: + fuse_session_destroy(se); + case 1: + free(opts.mountpoint); + fuse_opt_free_args(&args); } } diff --git a/src/service/fuse/fuse_client_context.hpp b/src/service/fuse/fuse_client_context.hpp index 96f9aecd..0328468b 100644 --- a/src/service/fuse/fuse_client_context.hpp +++ b/src/service/fuse/fuse_client_context.hpp @@ -1,30 +1,30 @@ #pragma once -#include +#include "cascade/object_pool_metadata.hpp" #include -#include -#include -#include -#include #include +#include #include +#include +#include +#include +#include #include -#include +#include #include -#include "cascade/object_pool_metadata.hpp" #include #define GetCurrentDir getcwd namespace derecho { -namespace cascade{ +namespace cascade { using json = nlohmann::json; -#define FUSE_CLIENT_DEV_ID (0xCA7CADE) -#define FUSE_CLIENT_BLK_SIZE (4096) +#define FUSE_CLIENT_DEV_ID (0xCA7CADE) +#define FUSE_CLIENT_BLK_SIZE (4096) -#define META_FILE_NAME ".cascade" +#define META_FILE_NAME ".cascade" -#define TO_FOREVER (std::numeric_limits::max()) +#define TO_FOREVER (std::numeric_limits::max()) typedef enum { SITE = 0, @@ -43,15 +43,15 @@ class FileBytes { public: size_t size; uint8_t* bytes; - FileBytes():size(0),bytes(nullptr){} - FileBytes(size_t s):size(s) { + FileBytes() : size(0), bytes(nullptr) {} + FileBytes(size_t s) : size(s) { bytes = nullptr; - if (s > 0) { - bytes = (uint8_t*)malloc(s); + if(s > 0) { + bytes = (uint8_t*)malloc(s); } } virtual ~FileBytes() { - if (bytes){ + if(bytes) { free(bytes); } } @@ -71,10 +71,10 @@ class FuseClientINode { * get directory entries. This is the default implementation. * Override it as required. */ - virtual std::map get_dir_entries() { - std::map ret_map; - for (auto& child: this->children) { - ret_map.emplace(child->display_name,reinterpret_cast(child.get())); + virtual std::map get_dir_entries() { + std::map ret_map; + for(auto& child : this->children) { + ret_map.emplace(child->display_name, reinterpret_cast(child.get())); } return ret_map; } @@ -84,51 +84,49 @@ class FuseClientINode { } virtual uint64_t read_file(FileBytes* fb) { - (void) fb; + (void)fb; return 0; } - virtual void initialize(){ + virtual void initialize() { } - + // Helper function for get_dir_entries() and read_file() - void check_update(){ - struct timespec now; + void check_update() { + struct timespec now; clock_gettime(CLOCK_REALTIME, &now); - if (now.tv_sec > (last_update_sec + update_interval)){ + if(now.tv_sec > (last_update_sec + update_interval)) { clock_gettime(CLOCK_REALTIME, &now); - if (now.tv_sec > (last_update_sec + update_interval)){ - last_update_sec = now.tv_sec; + if(now.tv_sec > (last_update_sec + update_interval)) { + last_update_sec = now.tv_sec; update_contents(); } } } private: - // Helper functions for check_update() + // Helper functions for check_update() virtual void update_contents() { return; } }; - template -struct TypeName { static const char *name; }; +struct TypeName { static const char* name; }; template -const char *TypeName::name = "unknown"; +const char* TypeName::name = "unknown"; template <> -const char *TypeName::name = "VolatileCascadeStoreWithStringKey"; +const char* TypeName::name = "VolatileCascadeStoreWithStringKey"; template <> -const char *TypeName::name = "PersistentCascadeStoreWithStringKey"; +const char* TypeName::name = "PersistentCascadeStoreWithStringKey"; template <> -const char *TypeName::name = "TriggerCascadeNoStoreWithStringKey"; +const char* TypeName::name = "TriggerCascadeNoStoreWithStringKey"; -#define CONF_LAYOUT "layout" - +#define CONF_LAYOUT "layout" template class SubgroupINode; @@ -163,25 +161,24 @@ template class CascadeTypeINode : public FuseClientINode { public: CascadeTypeINode() { - this->type = INodeType::CASCADE_TYPE; - this->display_name = std::string(TypeName::name); - this->parent = FUSE_ROOT_ID; + this->type = INodeType::CASCADE_TYPE; + this->display_name = std::string(TypeName::name); + this->parent = FUSE_ROOT_ID; } /** initialize */ void initialize(const json& group_layout, ServiceClientAPI& capi) { this->display_name = group_layout["type_alias"]; - uint32_t sidx=0; - for (auto subgroup_it:group_layout[CONF_LAYOUT]) { - - children.emplace_back(std::make_unique>(sidx,reinterpret_cast(this))); + uint32_t sidx = 0; + for(auto subgroup_it : group_layout[CONF_LAYOUT]) { + children.emplace_back(std::make_unique>(sidx, reinterpret_cast(this))); size_t num_shards = subgroup_it[MIN_NODES_BY_SHARD].size(); - for (uint32_t shidx = 0; shidx < num_shards; shidx ++) { + for(uint32_t shidx = 0; shidx < num_shards; shidx++) { this->children[sidx]->children.emplace_back( - std::make_unique>( - shidx,reinterpret_cast(this->children[sidx].get()),capi)); + std::make_unique>( + shidx, reinterpret_cast(this->children[sidx].get()), capi)); } - sidx ++; + sidx++; } } }; @@ -189,10 +186,9 @@ class CascadeTypeINode : public FuseClientINode { class RootMetaINode : public FuseClientINode { ServiceClientAPI& capi; std::string contents; - + public: - RootMetaINode (ServiceClientAPI& _capi) : - capi(_capi) { + RootMetaINode(ServiceClientAPI& _capi) : capi(_capi) { this->update_interval = 2; this->last_update_sec = 0; this->type = INodeType::META; @@ -200,24 +196,24 @@ class RootMetaINode : public FuseClientINode { } virtual uint64_t get_file_size() override { - dbg_default_trace("[{}]entering {}.",gettid(),__func__); - check_update(); + dbg_default_trace("[{}]entering {}.", gettid(), __func__); + check_update(); return contents.size(); } private: - virtual void update_contents () override { - dbg_default_trace("[{}]entering {}.",gettid(),__func__); + virtual void update_contents() override { + dbg_default_trace("[{}]entering {}.", gettid(), __func__); auto members = capi.get_members(); contents = "number of nodes in cascade service: " + std::to_string(members.size()) + ".\nnode IDs: "; - for (auto& nid : members) { + for(auto& nid : members) { contents += std::to_string(nid) + ","; } contents += "\n"; - + auto objectpools = capi.list_object_pools(true); contents += "number of objectpool in cascade service: " + std::to_string(objectpools.size()) + ".\nObjectpool paths: "; - for (auto& objectpool_path : objectpools) { + for(auto& objectpool_path : objectpools) { contents += objectpool_path + ","; } contents += "\n"; @@ -236,7 +232,7 @@ class SubgroupINode : public FuseClientINode { public: const uint32_t subgroup_index; - SubgroupINode (uint32_t sidx, fuse_ino_t pino) : subgroup_index(sidx) { + SubgroupINode(uint32_t sidx, fuse_ino_t pino) : subgroup_index(sidx) { this->type = INodeType::SUBGROUP; this->display_name = "subgroup-" + std::to_string(sidx); this->parent = pino; @@ -250,41 +246,40 @@ class ShardINode : public FuseClientINode { ServiceClientAPI& capi; std::map key_to_ino; - ShardINode (uint32_t shidx, fuse_ino_t pino, ServiceClientAPI& _capi) : - shard_index (shidx), capi(_capi) { + ShardINode(uint32_t shidx, fuse_ino_t pino, ServiceClientAPI& _capi) : shard_index(shidx), capi(_capi) { this->type = INodeType::SHARD; this->display_name = "shard-" + std::to_string(shidx); this->parent = pino; SubgroupINode* p_subgroup_inode = reinterpret_cast*>(pino); - this->children.emplace_back(std::make_unique>(shidx,p_subgroup_inode->subgroup_index,capi)); + this->children.emplace_back(std::make_unique>(shidx, p_subgroup_inode->subgroup_index, capi)); } // TODO: rethinking about the consistency - virtual std::map get_dir_entries() override { - dbg_default_trace("[{}]entering {}.",gettid(),__func__); - //std::map ret_map; + virtual std::map get_dir_entries() override { + dbg_default_trace("[{}]entering {}.", gettid(), __func__); + // std::map ret_map; /** we always retrieve the key list for a shard inode because the data is highly dynamic */ uint32_t subgroup_index = reinterpret_cast*>(this->parent)->subgroup_index; - auto result = capi.template list_keys(CURRENT_VERSION, true, subgroup_index, this->shard_index); - for (auto& reply_future:result.get()) { + auto result = capi.template list_keys(CURRENT_VERSION, true, subgroup_index, this->shard_index); + for(auto& reply_future : result.get()) { auto reply = reply_future.second.get(); std::unique_lock wlck(this->children_mutex); - for (auto& key: reply) { - if (key_to_ino.find(key) == key_to_ino.end()) { - this->children.emplace_back(std::make_unique>(key,reinterpret_cast(this),capi)); - // To solve the issue of '/' in display name , which will cause: reading directory '.': input/output error - // TODO: if there are better replacement of /, instead of - - key = std::string("key-") + key; - std::replace( key.begin(), key.end(), '/', '-'); + for(auto& key : reply) { + if(key_to_ino.find(key) == key_to_ino.end()) { + this->children.emplace_back(std::make_unique>(key, reinterpret_cast(this), capi)); + // To solve the issue of '/' in display name , which will cause: reading directory '.': input/output error + // TODO: if there are better replacement of /, instead of - + key = std::string("key-") + key; + std::replace(key.begin(), key.end(), '/', '-'); key_to_ino[key] = reinterpret_cast(this->children.back().get()); } } } - dbg_default_trace("[{}]leaving {}.",gettid(),__func__); - return FuseClientINode::get_dir_entries(); + dbg_default_trace("[{}]leaving {}.", gettid(), __func__); + return FuseClientINode::get_dir_entries(); } }; - + template <> class ShardINode : public FuseClientINode { public: @@ -292,19 +287,18 @@ class ShardINode : public FuseClientINode { ServiceClientAPI& capi; std::map key_to_ino; - ShardINode (uint32_t shidx, fuse_ino_t pino, ServiceClientAPI& _capi) : - shard_index (shidx), capi(_capi) { + ShardINode(uint32_t shidx, fuse_ino_t pino, ServiceClientAPI& _capi) : shard_index(shidx), capi(_capi) { this->type = INodeType::SHARD; this->display_name = "shard-" + std::to_string(shidx); this->parent = pino; SubgroupINode* p_subgroup_inode = reinterpret_cast*>(pino); - this->children.emplace_back(std::make_unique>(shidx,p_subgroup_inode->subgroup_index,capi)); + this->children.emplace_back(std::make_unique>(shidx, p_subgroup_inode->subgroup_index, capi)); } - virtual std::map get_dir_entries() override { - dbg_default_trace("[{}]entering {}.",gettid(),__func__); - std::map ret_map; - dbg_default_trace("[{}]leaving {}.",gettid(),__func__); + virtual std::map get_dir_entries() override { + dbg_default_trace("[{}]entering {}.", gettid(), __func__); + std::map ret_map; + dbg_default_trace("[{}]leaving {}.", gettid(), __func__); return ret_map; } }; @@ -320,52 +314,52 @@ class ShardMetaINode : public FuseClientINode { /** * update the metadata. need write lock. */ - virtual void update_contents () override { - dbg_default_trace("[{}]entering {}.",gettid(),__func__); - auto members = capi.template get_shard_members(subgroup_index,shard_index); + virtual void update_contents() override { + dbg_default_trace("[{}]entering {}.", gettid(), __func__); + auto members = capi.template get_shard_members(subgroup_index, shard_index); contents = "number of nodes shard: " + std::to_string(members.size()) + ".\nnode IDs: "; - for (auto& nid : members) { + for(auto& nid : members) { contents += std::to_string(nid) + ","; } contents += "\n"; - auto policy = capi.template get_member_selection_policy(subgroup_index,shard_index); + auto policy = capi.template get_member_selection_policy(subgroup_index, shard_index); contents += "member selection policy:"; switch(std::get<0>(policy)) { - case FirstMember: - contents += "FirstMember\n"; - break; - case LastMember: - contents += "LastMember\n"; - break; - case Random: - contents += "Random\n"; - break; - case FixedRandom: - contents += "FixedRandom("; - contents += std::to_string(std::get<1>(policy)); - contents += ")\n"; - break; - case RoundRobin: - contents += "RoundRobin\n"; - break; - case UserSpecified: - contents += "UserSpecified("; - contents += std::to_string(std::get<1>(policy)); - contents += ")\n"; - break; - default: - contents += "Unknown\n"; - break; + case FirstMember: + contents += "FirstMember\n"; + break; + case LastMember: + contents += "LastMember\n"; + break; + case Random: + contents += "Random\n"; + break; + case FixedRandom: + contents += "FixedRandom("; + contents += std::to_string(std::get<1>(policy)); + contents += ")\n"; + break; + case RoundRobin: + contents += "RoundRobin\n"; + break; + case UserSpecified: + contents += "UserSpecified("; + contents += std::to_string(std::get<1>(policy)); + contents += ")\n"; + break; + default: + contents += "Unknown\n"; + break; } - dbg_default_trace("[{}]leaving {}.",gettid(),__func__); + dbg_default_trace("[{}]leaving {}.", gettid(), __func__); } + public: - ShardMetaINode (const uint32_t _shard_index, const uint32_t _subgroup_index, ServiceClientAPI& _capi) : - shard_index(_shard_index), - subgroup_index(_subgroup_index), - capi (_capi) { + ShardMetaINode(const uint32_t _shard_index, const uint32_t _subgroup_index, ServiceClientAPI& _capi) : shard_index(_shard_index), + subgroup_index(_subgroup_index), + capi(_capi) { this->update_interval = 2; - this->last_update_sec = 0; + this->last_update_sec = 0; this->type = INodeType::META; this->display_name = META_FILE_NAME; } @@ -376,55 +370,52 @@ class ShardMetaINode : public FuseClientINode { } virtual uint64_t read_file(FileBytes* file_bytes) override { - check_update(); - file_bytes->size = contents.size(); - file_bytes->bytes = reinterpret_cast(strdup(contents.c_str())); - return 0; + check_update(); + file_bytes->size = contents.size(); + file_bytes->bytes = reinterpret_cast(strdup(contents.c_str())); + return 0; } }; - template class KeyINode : public FuseClientINode { public: typename CascadeType::KeyType key; std::unique_ptr file_bytes; uint64_t file_size; - persistent::version_t version; - uint64_t timestamp_us; - persistent::version_t previous_version; - persistent::version_t previous_version_by_key; // previous version by key, INVALID_VERSION for the first value of the key. - ServiceClientAPI& capi; - - KeyINode(typename CascadeType::KeyType& k, fuse_ino_t pino, ServiceClientAPI& _capi) : - key(k), - file_bytes(std::make_unique()), - capi(_capi){ + persistent::version_t version; + uint64_t timestamp_us; + persistent::version_t previous_version; + persistent::version_t previous_version_by_key; // previous version by key, INVALID_VERSION for the first value of the key. + ServiceClientAPI& capi; + + KeyINode(typename CascadeType::KeyType& k, fuse_ino_t pino, ServiceClientAPI& _capi) : key(k), + file_bytes(std::make_unique()), + capi(_capi) { dbg_default_trace("[{}]entering {}.", gettid(), __func__); - this->update_interval = 2; + this->update_interval = 2; this->last_update_sec = 0; this->type = INodeType::KEY; - if constexpr (std::is_same, char*>::value || - std::is_same, std::string>::value) { + if constexpr(std::is_same, char*>::value || std::is_same, std::string>::value) { this->display_name = std::string("key-") + k; - } else if constexpr (std::is_arithmetic>::value) { + } else if constexpr(std::is_arithmetic>::value) { this->display_name = std::string("key-") + std::to_string(k); } else { // KeyType is required to implement to_string() for types other than type string/arithmetic. this->display_name = std::string("key-") + key.to_string(); } // '/' in display name, will cause: reading directory '.': input/output error - std::replace( this->display_name.begin(), this->display_name.end(), '/', '\\'); - this->parent = pino; + std::replace(this->display_name.begin(), this->display_name.end(), '/', '\\'); + this->parent = pino; dbg_default_trace("[{}]leaving {}.", gettid(), __func__); } virtual uint64_t read_file(FileBytes* _file_bytes) override { dbg_default_trace("[{}]entering {}.", gettid(), __func__); check_update(); - _file_bytes->size = this->file_bytes.get()->size; - _file_bytes->bytes = static_cast(malloc(this->file_bytes.get()->size)); - memcpy(_file_bytes->bytes,this->file_bytes.get()->bytes, this->file_bytes.get()->size); + _file_bytes->size = this->file_bytes.get()->size; + _file_bytes->bytes = static_cast(malloc(this->file_bytes.get()->size)); + memcpy(_file_bytes->bytes, this->file_bytes.get()->bytes, this->file_bytes.get()->size); dbg_default_trace("[{}]leaving {}.", gettid(), __func__); return 0; } @@ -434,148 +425,147 @@ class KeyINode : public FuseClientINode { return this->file_bytes.get()->size; } - KeyINode(KeyINode&& fci){ + KeyINode(KeyINode&& fci) { this->type = std::move(fci.type); this->display_name = std::move(fci.display_name); this->parent = std::move(fci.parent); - this->key = std::move(fci.key ); + this->key = std::move(fci.key); this->capi = std::move(fci.capi); } virtual ~KeyINode() { dbg_default_info("[{}] entering {}.", gettid(), __func__); - file_bytes.reset(nullptr); + file_bytes.reset(nullptr); dbg_default_info("[{}] leaving {}.", gettid(), __func__); } + private: - virtual void update_contents () override{ - ShardINode *pino_shard = reinterpret_cast*>(this->parent); - SubgroupINode *pino_subgroup = reinterpret_cast*>(pino_shard->parent); + virtual void update_contents() override { + ShardINode* pino_shard = reinterpret_cast*>(this->parent); + SubgroupINode* pino_subgroup = reinterpret_cast*>(pino_shard->parent); auto result = capi.template get( - key,CURRENT_VERSION,true,pino_subgroup->subgroup_index,pino_shard->shard_index); - Blob blob; - for(auto& reply_future:result.get()){ - auto reply = reply_future.second.get(); - dbg_default_trace("------ KEY INODE reply {}.", reply); - if(std::is_base_of::value){ - ObjectWithStringKey *access_ptr = reinterpret_cast(&reply); - this->version = access_ptr->version; - this->timestamp_us = access_ptr->timestamp_us; - this->previous_version = access_ptr->previous_version; - this->previous_version_by_key = access_ptr->previous_version_by_key; - blob = access_ptr->blob; - }else if(std::is_base_of::value){ - ObjectWithUInt64Key *access_ptr = reinterpret_cast(&reply); - this->version = access_ptr->version; - this->timestamp_us = access_ptr->timestamp_us; - this->previous_version = access_ptr->previous_version; - this->previous_version_by_key = access_ptr->previous_version_by_key; - blob = access_ptr->blob; - } - else{ - this->file_bytes.get()->size = mutils::bytes_size(reply); - this->file_bytes.get()->bytes = static_cast(malloc(this->file_bytes.get()->size)); - reply.to_bytes(this->file_bytes.get()->bytes); - return; - } - file_bytes.get()->size = blob.size; - file_bytes.get()->bytes = static_cast(malloc(file_bytes.get()->size)); - memcpy(file_bytes.get()->bytes, blob.bytes, file_bytes.get()->size); - return; + key, CURRENT_VERSION, true, pino_subgroup->subgroup_index, pino_shard->shard_index); + Blob blob; + for(auto& reply_future : result.get()) { + auto reply = reply_future.second.get(); + dbg_default_trace("------ KEY INODE reply {}.", reply); + if(std::is_base_of::value) { + ObjectWithStringKey* access_ptr = reinterpret_cast(&reply); + this->version = access_ptr->version; + this->timestamp_us = access_ptr->timestamp_us; + this->previous_version = access_ptr->previous_version; + this->previous_version_by_key = access_ptr->previous_version_by_key; + blob = access_ptr->blob; + } else if(std::is_base_of::value) { + ObjectWithUInt64Key* access_ptr = reinterpret_cast(&reply); + this->version = access_ptr->version; + this->timestamp_us = access_ptr->timestamp_us; + this->previous_version = access_ptr->previous_version; + this->previous_version_by_key = access_ptr->previous_version_by_key; + blob = access_ptr->blob; + } else { + this->file_bytes.get()->size = mutils::bytes_size(reply); + this->file_bytes.get()->bytes = static_cast(malloc(this->file_bytes.get()->size)); + reply.to_bytes(this->file_bytes.get()->bytes); + return; + } + file_bytes.get()->size = blob.size; + file_bytes.get()->bytes = static_cast(malloc(file_bytes.get()->size)); + memcpy(file_bytes.get()->bytes, blob.bytes, file_bytes.get()->size); + return; } } }; - class ObjectPoolMetaINode : public FuseClientINode { private: std::string cur_pathname; /** objp_collection contains the all the objp with the same cur_pathname prefix - * i.e. if cur_path name is "/a", + * i.e. if cur_path name is "/a", * object pools "/a/b1", "/a/b2" share this level of ObjectPoolPathINode*/ std::vector objp_collection; - bool is_object_pool; - uint32_t subgroup_type_index; - uint32_t subgroup_index; + bool is_object_pool; + uint32_t subgroup_type_index; + uint32_t subgroup_index; sharding_policy_t sharding_policy; - bool deleted; + bool deleted; ServiceClientAPI& capi; std::string contents; - virtual void update_contents () override{ + virtual void update_contents() override { this->contents = "Current Directory Pathname: "; - this->contents += (cur_pathname=="") ? "objectPoolRoot" : cur_pathname; - this->contents += "\n"; - this->objp_collection.clear(); - this->contents += "contains the below object pools in its subdirs:\n"; - std::string objp_contents; - // Check the objectPools under cur_pathname directory - size_t cur_len = cur_pathname.length(); - for (std::string pathname : capi.list_object_pools(true)) { - if(cur_pathname.length() == 0 || (pathname.length() > cur_len - 1 && (pathname.substr(0,cur_len) == cur_pathname) )){ - if(pathname.length() == cur_len){ - this->objp_collection.emplace_back(pathname); - this->contents += " " + pathname + ",\n"; - this->is_object_pool = true; - this->objectPool_contents( objp_contents); - }else if(pathname[cur_len] == '/'){ - this->objp_collection.emplace_back(pathname); - this->contents += " " + pathname + ",\n"; - } - } - } - this->contents += objp_contents; - } - + this->contents += (cur_pathname == "") ? "objectPoolRoot" : cur_pathname; + this->contents += "\n"; + this->objp_collection.clear(); + this->contents += "contains the below object pools in its subdirs:\n"; + std::string objp_contents; + // Check the objectPools under cur_pathname directory + size_t cur_len = cur_pathname.length(); + for(std::string pathname : capi.list_object_pools(true)) { + if(cur_pathname.length() == 0 || (pathname.length() > cur_len - 1 && (pathname.substr(0, cur_len) == cur_pathname))) { + if(pathname.length() == cur_len) { + this->objp_collection.emplace_back(pathname); + this->contents += " " + pathname + ",\n"; + this->is_object_pool = true; + this->objectPool_contents(objp_contents); + } else if(pathname[cur_len] == '/') { + this->objp_collection.emplace_back(pathname); + this->contents += " " + pathname + ",\n"; + } + } + } + this->contents += objp_contents; + } + /** * Only fill the object pool contents when cur_pathname is an object pool */ - void objectPool_contents ( std::string& objp_contents) { + void objectPool_contents(std::string& objp_contents) { auto op_metadata = capi.find_object_pool(this->cur_pathname); - objp_contents += "Current Object Pool Pathname: "; - objp_contents += cur_pathname + "\n"; + objp_contents += "Current Object Pool Pathname: "; + objp_contents += cur_pathname + "\n"; this->deleted = op_metadata.deleted; - objp_contents += "- is deleted: "; - objp_contents += this->deleted? "true": "false"; - objp_contents += "\n"; + objp_contents += "- is deleted: "; + objp_contents += this->deleted ? "true" : "false"; + objp_contents += "\n"; this->subgroup_type_index = op_metadata.subgroup_type_index; - objp_contents += "- subgroup type index: "; - objp_contents += std::to_string(this->subgroup_type_index) + "\n"; + objp_contents += "- subgroup type index: "; + objp_contents += std::to_string(this->subgroup_type_index) + "\n"; this->subgroup_index = op_metadata.subgroup_index; - objp_contents += "- subgroup index: "; - objp_contents += std::to_string(this->subgroup_index) + "\n"; + objp_contents += "- subgroup index: "; + objp_contents += std::to_string(this->subgroup_index) + "\n"; auto policy = op_metadata.sharding_policy; objp_contents += "- sharding policy: "; switch(policy) { - case HASH: - objp_contents += "Hashing\n"; - break; - case RANGE: - objp_contents += "Range\n"; - break; - default: - objp_contents += "Unknown\n"; - break; + case HASH: + objp_contents += "Hashing\n"; + break; + case RANGE: + objp_contents += "Range\n"; + break; + default: + objp_contents += "Unknown\n"; + break; } } + public: - ObjectPoolMetaINode (const std::string _cur_pathname, ServiceClientAPI& _capi) : - cur_pathname(_cur_pathname), - capi(_capi) { + ObjectPoolMetaINode(const std::string _cur_pathname, ServiceClientAPI& _capi) : cur_pathname(_cur_pathname), + capi(_capi) { this->update_interval = 2; this->last_update_sec = 0; this->type = INodeType::META; this->display_name = META_FILE_NAME; } - void add_objp(std::string new_objp_pathname){ - for(auto &pathname : objp_collection){ - if(pathname == new_objp_pathname){ - return; - } - } - objp_collection.emplace_back(new_objp_pathname); - } + void add_objp(std::string new_objp_pathname) { + for(auto& pathname : objp_collection) { + if(pathname == new_objp_pathname) { + return; + } + } + objp_collection.emplace_back(new_objp_pathname); + } virtual uint64_t get_file_size() override { check_update(); @@ -583,13 +573,12 @@ class ObjectPoolMetaINode : public FuseClientINode { } virtual uint64_t read_file(FileBytes* file_bytes) override { - check_update(); - file_bytes->size = contents.size(); - file_bytes->bytes = reinterpret_cast(strdup(contents.c_str())); - return 0; + check_update(); + file_bytes->size = contents.size(); + file_bytes->bytes = reinterpret_cast(strdup(contents.c_str())); + return 0; } }; - class ObjectPoolPathINode : public FuseClientINode { public: @@ -599,227 +588,221 @@ class ObjectPoolPathINode : public FuseClientINode { std::set key_children; std::set objp_children; - - ObjectPoolPathINode (fuse_ino_t pino, ServiceClientAPI& _capi): - capi(_capi){ - this->update_interval = 10; - this->last_update_sec = 0; + ObjectPoolPathINode(fuse_ino_t pino, ServiceClientAPI& _capi) : capi(_capi) { + this->update_interval = 10; + this->last_update_sec = 0; this->type = INodeType::OBJECTPOOL_PATH; this->parent = pino; - this->is_object_pool = false; - this->cur_pathname = ""; - this->children.emplace_back(std::make_unique(cur_pathname, capi)); + this->is_object_pool = false; + this->cur_pathname = ""; + this->children.emplace_back(std::make_unique(cur_pathname, capi)); } - ObjectPoolPathINode (std::string _cur_pathname,fuse_ino_t pino, ServiceClientAPI& _capi) : - capi(_capi), - cur_pathname(_cur_pathname){ + ObjectPoolPathINode(std::string _cur_pathname, fuse_ino_t pino, ServiceClientAPI& _capi) : capi(_capi), + cur_pathname(_cur_pathname) { this->update_interval = 10; this->last_update_sec = 10; this->type = INodeType::OBJECTPOOL_PATH; - std::size_t pos = cur_pathname.find_last_of('/'); + std::size_t pos = cur_pathname.find_last_of('/'); this->display_name = cur_pathname.substr(pos + 1); this->parent = pino; - this->is_object_pool = false; - //this->objp_collection.emplace_back(objp_pathname); - this->children.emplace_back(std::make_unique(cur_pathname, capi)); + this->is_object_pool = false; + // this->objp_collection.emplace_back(objp_pathname); + this->children.emplace_back(std::make_unique(cur_pathname, capi)); } - /** Helper function: + /** Helper function: * @object_pool_pathname: the object pool pathname to parse * @return: next level pathname * e.g. if cur_pathname is "/a", for object_pool_pathname "/a/b/c" this function returns "/a/b" */ - std::string get_next_level_pathname(std::string &object_pool_pathname){ - size_t cur_len = cur_pathname.length(); - std::string remain_pathname = object_pool_pathname.substr(cur_len); + std::string get_next_level_pathname(std::string& object_pool_pathname) { + size_t cur_len = cur_pathname.length(); + std::string remain_pathname = object_pool_pathname.substr(cur_len); auto start_pos = remain_pathname.find("/"); - if (start_pos == std::string::npos){ + if(start_pos == std::string::npos) { return ""; } - // get the next level of directory pathname. + // get the next level of directory pathname. auto end_pos = remain_pathname.substr(start_pos + 1).find("/"); std::string next_level_pathname; - if (end_pos == std::string::npos){ - next_level_pathname = cur_pathname + remain_pathname.substr(start_pos ); - }else{ - next_level_pathname = cur_pathname + remain_pathname.substr(start_pos , end_pos + 1); + if(end_pos == std::string::npos) { + next_level_pathname = cur_pathname + remain_pathname.substr(start_pos); + } else { + next_level_pathname = cur_pathname + remain_pathname.substr(start_pos, end_pos + 1); } - return next_level_pathname; - } + return next_level_pathname; + } - /** Construct the next level objectPoolINodes, starting from remain pathname - *e.g./a/b/c should have three layers: /a, /a/b, /a/b/c - *if this->cur_pathname is "/a", whose child INode's cur_pathname is "/a/b", whose child INode "/a/b/c" + /** Construct the next level objectPoolINodes, starting from remain pathname + *e.g./a/b/c should have three layers: /a, /a/b, /a/b/c + *if this->cur_pathname is "/a", whose child INode's cur_pathname is "/a/b", whose child INode "/a/b/c" */ - void construct_nextlevel_objectpool_path(std::string &object_pool_pathname){ + void construct_nextlevel_objectpool_path(std::string& object_pool_pathname) { // check path_name - std::string next_level_pathname = get_next_level_pathname(object_pool_pathname); - if(next_level_pathname.length() == 0){ - return; - } - // Check if this objectPoolPathInode with the same pathname exists - // case 1: If this level pathname directory exists - for(auto& inode : this->children){ - if (inode->type == INodeType::OBJECTPOOL_PATH){ - if(static_cast(inode.get())->cur_pathname == next_level_pathname){ - return; - } - } + std::string next_level_pathname = get_next_level_pathname(object_pool_pathname); + if(next_level_pathname.length() == 0) { + return; } - // case2: If this level ObjectPoolPathInode doesn't exists, create one + // Check if this objectPoolPathInode with the same pathname exists + // case 1: If this level pathname directory exists + for(auto& inode : this->children) { + if(inode->type == INodeType::OBJECTPOOL_PATH) { + if(static_cast(inode.get())->cur_pathname == next_level_pathname) { + return; + } + } + } + // case2: If this level ObjectPoolPathInode doesn't exists, create one // std::unique_lock wlck(this->children_mutex); - this->children.emplace_back(std::make_unique(next_level_pathname,reinterpret_cast(this),capi)); - objp_children.insert(cur_pathname); + this->children.emplace_back(std::make_unique(next_level_pathname, reinterpret_cast(this), capi)); + objp_children.insert(cur_pathname); } - - void update_objpINodes(){ + void update_objpINodes() { size_t cur_len = cur_pathname.length(); - std::vector reply = capi.list_object_pools(true); - std::vector valid_subdirs; + std::vector reply = capi.list_object_pools(true); + std::vector valid_subdirs; // Check if need to add new ObjectPoolPathINode to children inodes - for (std::string &object_pool : reply) { - if(object_pool.length() < cur_len){ - continue; - } - if(object_pool == cur_pathname){ - this->is_object_pool = true; - continue; - } - bool is_subdir = cur_pathname.length() == 0 || (object_pool.substr(0,cur_len) == cur_pathname && object_pool[cur_len] == '/') ; - if(!is_subdir){ - continue; - } - std::string next_level_pathname(get_next_level_pathname(object_pool)); - valid_subdirs.emplace_back(next_level_pathname); - if( objp_children.find(next_level_pathname) == objp_children.end() ){ - construct_nextlevel_objectpool_path(object_pool); - } + for(std::string& object_pool : reply) { + if(object_pool.length() < cur_len) { + continue; + } + if(object_pool == cur_pathname) { + this->is_object_pool = true; + continue; + } + bool is_subdir = cur_pathname.length() == 0 || (object_pool.substr(0, cur_len) == cur_pathname && object_pool[cur_len] == '/'); + if(!is_subdir) { + continue; + } + std::string next_level_pathname(get_next_level_pathname(object_pool)); + valid_subdirs.emplace_back(next_level_pathname); + if(objp_children.find(next_level_pathname) == objp_children.end()) { + construct_nextlevel_objectpool_path(object_pool); + } + } + // Check if need to remove existing ObjectPoolPathINode from children inodes + if(std::find(reply.begin(), reply.end(), this->cur_pathname) == reply.end()) { + this->is_object_pool = false; } - // Check if need to remove existing ObjectPoolPathINode from children inodes - if(std::find(reply.begin(), reply.end(), this->cur_pathname) == reply.end()){ - this->is_object_pool = false; - } auto it = this->children.begin(); - while(it != this->children.end()){ - std::string name = cur_pathname + "/" + (*it)->display_name; - if ((*it)->type == INodeType::OBJECTPOOL_PATH && (std::find(valid_subdirs.begin(),valid_subdirs.end(), name ) == valid_subdirs.end())){ - objp_children.erase(objp_children.find(name)); - it = this->children.erase(it); - }else{ - ++it; - } - } - } - - void update_keyINodes(){ - // case1. if object pool of cur_pathname donesn't exists anymore, remove all the keyINode from children - if(!this->is_object_pool){ - auto it = this->children.begin(); - while(it != this->children.end()){ - if ((*it)->type == INodeType::KEY){ - it = this->children.erase(it); - }else{ - ++it; - } - } - return; - } - // case2. if cur_pathname is a valid object pool, refetch keys in this object pool - persistent::version_t version = CURRENT_VERSION; - std::vector>>> future_result = capi.list_keys(version, true, cur_pathname); - std::vector reply = capi.wait_list_keys(future_result); - // Check if need to add new keyINode to children inodes - for (auto& key: reply) { - if(key_children.find(key) == key_children.end() ){ - this->children.emplace_back(std::make_unique(key,reinterpret_cast(this),capi)); - key_children.insert(key); - } - } - // Check if need to remove existing keyINode from children inodes - auto it = this->children.begin(); - while(it != this->children.end()){ - std::string name = cur_pathname + "/" + (*it)->display_name; - if ((*it)->type == INodeType::KEY && (std::find(reply.begin(),reply.end(), name ) == reply.end())){ - key_children.erase(key_children.find(name)); - it = this->children.erase(it); - - }else{ - ++it; - } - } - } - - - virtual std::map get_dir_entries() override { - dbg_default_trace("[{}]entering {}.",gettid(),__func__); - check_update(); - dbg_default_trace("[{}]leaving {}.",gettid(),__func__); - return FuseClientINode::get_dir_entries(); + while(it != this->children.end()) { + std::string name = cur_pathname + "/" + (*it)->display_name; + if((*it)->type == INodeType::OBJECTPOOL_PATH && (std::find(valid_subdirs.begin(), valid_subdirs.end(), name) == valid_subdirs.end())) { + objp_children.erase(objp_children.find(name)); + it = this->children.erase(it); + } else { + ++it; + } + } } + + void update_keyINodes() { + // case1. if object pool of cur_pathname donesn't exists anymore, remove all the keyINode from children + if(!this->is_object_pool) { + auto it = this->children.begin(); + while(it != this->children.end()) { + if((*it)->type == INodeType::KEY) { + it = this->children.erase(it); + } else { + ++it; + } + } + return; + } + // case2. if cur_pathname is a valid object pool, refetch keys in this object pool + persistent::version_t version = CURRENT_VERSION; + std::vector>>> future_result = capi.list_keys(version, true, cur_pathname); + std::vector reply = capi.wait_list_keys(future_result); + // Check if need to add new keyINode to children inodes + for(auto& key : reply) { + if(key_children.find(key) == key_children.end()) { + this->children.emplace_back(std::make_unique(key, reinterpret_cast(this), capi)); + key_children.insert(key); + } + } + // Check if need to remove existing keyINode from children inodes + auto it = this->children.begin(); + while(it != this->children.end()) { + std::string name = cur_pathname + "/" + (*it)->display_name; + if((*it)->type == INodeType::KEY && (std::find(reply.begin(), reply.end(), name) == reply.end())) { + key_children.erase(key_children.find(name)); + it = this->children.erase(it); + + } else { + ++it; + } + } + } + + virtual std::map get_dir_entries() override { + dbg_default_trace("[{}]entering {}.", gettid(), __func__); + check_update(); + dbg_default_trace("[{}]leaving {}.", gettid(), __func__); + return FuseClientINode::get_dir_entries(); + } + private: - virtual void update_contents () override{ - update_objpINodes(); - update_keyINodes(); - } + virtual void update_contents() override { + update_objpINodes(); + update_keyINodes(); + } }; - + class ObjectPoolRootINode : public ObjectPoolPathINode { public: - ObjectPoolRootINode (ServiceClientAPI& _capi, fuse_ino_t pino=FUSE_ROOT_ID) : - ObjectPoolPathINode(pino, _capi) { + ObjectPoolRootINode(ServiceClientAPI& _capi, fuse_ino_t pino = FUSE_ROOT_ID) : ObjectPoolPathINode(pino, _capi) { this->display_name = "ObjectPools"; this->parent = pino; } - /** this function constructs the whole object pool directories at the beginning - * for all the object pools stored in metadata service - */ - virtual std::map get_dir_entries() override { - dbg_default_trace("[{}]entering {}.",gettid(),__func__); - check_update(); - dbg_default_trace("[{}]leaving {}.",gettid(),__func__); - return FuseClientINode::get_dir_entries(); + /** this function constructs the whole object pool directories at the beginning + * for all the object pools stored in metadata service + */ + virtual std::map get_dir_entries() override { + dbg_default_trace("[{}]entering {}.", gettid(), __func__); + check_update(); + dbg_default_trace("[{}]leaving {}.", gettid(), __func__); + return FuseClientINode::get_dir_entries(); } + private: - virtual void update_contents () override{ - update_objpINodes(); - update_keyINodes(); - } + virtual void update_contents() override { + update_objpINodes(); + update_keyINodes(); + } }; - -class ObjectPoolKeyINode : public FuseClientINode{ +class ObjectPoolKeyINode : public FuseClientINode { public: std::string key; - std::unique_ptr file_bytes; - persistent::version_t version; - uint64_t timestamp_us; - persistent::version_t previous_version; - persistent::version_t previous_version_by_key; // previous version by key, INVALID_VERSION for the first value of the key. + std::unique_ptr file_bytes; + persistent::version_t version; + uint64_t timestamp_us; + persistent::version_t previous_version; + persistent::version_t previous_version_by_key; // previous version by key, INVALID_VERSION for the first value of the key. ServiceClientAPI& capi; - ObjectPoolKeyINode(std::string k, fuse_ino_t pino, ServiceClientAPI& _capi) : - key(k), - file_bytes(std::make_unique()), - capi(_capi){ + ObjectPoolKeyINode(std::string k, fuse_ino_t pino, ServiceClientAPI& _capi) : key(k), + file_bytes(std::make_unique()), + capi(_capi) { dbg_default_trace("[{}]entering {}.", gettid(), __func__); this->update_interval = 2; this->last_update_sec = 0; std::size_t pos = k.find_last_of('/'); this->display_name = k.substr(pos + 1); this->parent = pino; - this->type = INodeType::KEY; + this->type = INodeType::KEY; dbg_default_trace("[{}]leaving {}.", gettid(), __func__); } virtual uint64_t read_file(FileBytes* _file_bytes) override { - dbg_default_debug("-- READ FILE of key:[{}], [{}]entering {}.",this->key ,gettid(), __func__); - check_update(); - _file_bytes->size = this->file_bytes.get()->size; - _file_bytes->bytes = static_cast(malloc(this->file_bytes.get()->size)); - memcpy(_file_bytes->bytes,this->file_bytes.get()->bytes, this->file_bytes.get()->size); + dbg_default_debug("-- READ FILE of key:[{}], [{}]entering {}.", this->key, gettid(), __func__); + check_update(); + _file_bytes->size = this->file_bytes.get()->size; + _file_bytes->bytes = static_cast(malloc(this->file_bytes.get()->size)); + memcpy(_file_bytes->bytes, this->file_bytes.get()->bytes, this->file_bytes.get()->size); dbg_default_debug("[{}]leaving {}.", gettid(), __func__); return 0; } @@ -827,33 +810,34 @@ class ObjectPoolKeyINode : public FuseClientINode{ virtual uint64_t get_file_size() override { dbg_default_debug("----GET FILE SIZE key is [{}].", this->key); check_update(); - return this->file_bytes.get()->size; + return this->file_bytes.get()->size; } virtual ~ObjectPoolKeyINode() { dbg_default_info("[{}] entering {}.", gettid(), __func__); - file_bytes.reset(nullptr); + file_bytes.reset(nullptr); dbg_default_info("[{}] leaving {}.", gettid(), __func__); } + private: - virtual void update_contents () override{ - dbg_default_debug("\n \n ----OBJP keyInode key is:[{}] - update content [{}] entering {}.", this->key ,gettid(), __func__); + virtual void update_contents() override { + dbg_default_debug("\n \n ----OBJP keyInode key is:[{}] - update content [{}] entering {}.", this->key, gettid(), __func__); Blob blob; - auto result = capi.get(key,CURRENT_VERSION,true); - for (auto& reply_future:result.get()) { + auto result = capi.get(key, CURRENT_VERSION, true); + for(auto& reply_future : result.get()) { auto reply = reply_future.second.get(); - ObjectWithStringKey *access_ptr = reinterpret_cast(&reply); - this->version = access_ptr->version; - this->timestamp_us = access_ptr->timestamp_us; - this->previous_version = access_ptr->previous_version; - this->previous_version_by_key = access_ptr->previous_version_by_key; - blob = access_ptr->blob; - this->file_bytes.get()->size = blob.size; - this->file_bytes.get()->bytes = static_cast(malloc(blob.size)); - memcpy(this->file_bytes.get()->bytes, blob.bytes, blob.size); - return; + ObjectWithStringKey* access_ptr = reinterpret_cast(&reply); + this->version = access_ptr->version; + this->timestamp_us = access_ptr->timestamp_us; + this->previous_version = access_ptr->previous_version; + this->previous_version_by_key = access_ptr->previous_version_by_key; + blob = access_ptr->blob; + this->file_bytes.get()->size = blob.size; + this->file_bytes.get()->bytes = static_cast(malloc(blob.size)); + memcpy(this->file_bytes.get()->bytes, blob.bytes, blob.size); + return; } - dbg_default_trace("\n \n ----OBJP keyInode update content [{}] leaving {}.", gettid(), __func__); + dbg_default_trace("\n \n ----OBJP keyInode update content [{}] leaving {}.", gettid(), __func__); } }; @@ -861,32 +845,29 @@ class MetadataServiceRootINode : public FuseClientINode { ServiceClientAPI& capi; public: - MetadataServiceRootINode(ServiceClientAPI& _capi) : - capi(_capi) { + MetadataServiceRootINode(ServiceClientAPI& _capi) : capi(_capi) { this->type = INodeType::METADATA_SERVICE; this->display_name = "MetadataService"; this->parent = FUSE_ROOT_ID; } void initialize() { - dbg_default_trace("[{}]entering {}.",gettid(),__func__); + dbg_default_trace("[{}]entering {}.", gettid(), __func__); children.emplace_back(std::make_unique(capi, reinterpret_cast(this))); this->children.back().get()->initialize(); children.emplace_back(std::make_unique(capi, reinterpret_cast(this))); this->children.back().get()->initialize(); } - }; - /** - * TODO: add DLL FUSE support - */ +/** + * TODO: add DLL FUSE support + */ class DataPathLogicRootINode : public FuseClientINode { ServiceClientAPI& capi; public: - DataPathLogicRootINode (ServiceClientAPI& _capi, fuse_ino_t pino) : - capi(_capi) { + DataPathLogicRootINode(ServiceClientAPI& _capi, fuse_ino_t pino) : capi(_capi) { this->type = INodeType::DATAPATH_LOGIC; this->display_name = "DataPathLogic"; this->parent = FUSE_ROOT_ID; @@ -895,28 +876,26 @@ class DataPathLogicRootINode : public FuseClientINode { /** initialize */ void initialize() { - dbg_default_trace("[{}]entering {}.",gettid(),__func__); + dbg_default_trace("[{}]entering {}.", gettid(), __func__); } }; - - /** - * TODO: add DLL FUSE support - */ +/** + * TODO: add DLL FUSE support + */ template class DLLINode : public FuseClientINode { public: - std::string file_name; // DLL id? + std::string file_name; // DLL id? ServiceClientAPI& capi; - DLLINode(std::string& _filename, fuse_ino_t pino, ServiceClientAPI& _capi) : - file_name(_filename), capi(_capi) { + DLLINode(std::string& _filename, fuse_ino_t pino, ServiceClientAPI& _capi) : file_name(_filename), capi(_capi) { dbg_default_trace("[{}]entering {}.", gettid(), __func__); this->type = INodeType::DLL; this->display_name = std::string("dllfile") + _filename; this->parent = pino; dbg_default_trace("[{}]leaving {}.", gettid(), __func__); } - + virtual uint64_t read_file(FileBytes* file_bytes) override { dbg_default_trace("[{}]entering {}.", gettid(), __func__); dbg_default_trace("[{}]leaving {}.", gettid(), __func__); @@ -930,11 +909,11 @@ class DLLINode : public FuseClientINode { return fsize; } - DLLINode(DLLINode&& fci){ + DLLINode(DLLINode&& fci) { this->type = std::move(fci.type); this->display_name = std::move(fci.display_name); this->parent = std::move(fci.parent); - this->file_name = std::move(fci.file_name ); + this->file_name = std::move(fci.file_name); this->capi = std::move(fci.capi); } @@ -944,14 +923,15 @@ class DLLINode : public FuseClientINode { } }; - /** * The fuse filesystem context for fuse_client. This context will be used as 'userdata' on starting a fuse session. */ template class FuseClientContext { - template using _CascadeTypeINode = CascadeTypeINode; + template + using _CascadeTypeINode = CascadeTypeINode; + private: /** initialization flag */ std::atomic is_initialized; @@ -963,7 +943,7 @@ class FuseClientContext { ServiceClientAPI& capi; /** The inodes are stored in \a inodes. */ - mutils::KindMap<_CascadeTypeINode,CascadeTypes...> inodes; + mutils::KindMap<_CascadeTypeINode, CascadeTypes...> inodes; /** Metadata */ RootMetaINode metadata_inode; @@ -976,12 +956,12 @@ class FuseClientContext { /** fill inodes */ void populate_inodes(const json& group_layout) { - if (!group_layout.is_array()) { + if(!group_layout.is_array()) { dbg_default_error("JSON group layout is invalid (array expected): {}.", group_layout.get()); throw std::runtime_error("JSON group layout is invalid."); } // populate from the second layout, since the first one is for metadata service - do_populate_inodes(group_layout,1); + do_populate_inodes(group_layout, 1); } template @@ -991,41 +971,40 @@ class FuseClientContext { template void do_populate_inodes(const json& group_layout, int type_idx) { this->do_populate_inodes(group_layout, type_idx); - this->do_populate_inodes(group_layout, type_idx+1); + this->do_populate_inodes(group_layout, type_idx + 1); } public: - FuseClientContext() : - capi(ServiceClientAPI::get_service_client()), - metadata_inode(capi), - objectpool_inode(capi), - admin_metadata_inode(capi){} + FuseClientContext() : capi(ServiceClientAPI::get_service_client()), + metadata_inode(capi), + objectpool_inode(capi), + admin_metadata_inode(capi) {} /** initialize */ void initialize(const json& group_layout) { dbg_default_trace("[{}]entering {} .", gettid(), __func__); populate_inodes(group_layout); this->admin_metadata_inode.initialize(); - clock_gettime(CLOCK_REALTIME,&this->init_timestamp); + clock_gettime(CLOCK_REALTIME, &this->init_timestamp); this->is_initialized.store(true); dbg_default_trace("[{}]leaving {}.", gettid(), __func__); } /** read directory entries by ino */ - std::map get_dir_entries(fuse_ino_t ino) { + std::map get_dir_entries(fuse_ino_t ino) { dbg_default_trace("[{}]entering {} with ino ={:x}.", gettid(), __func__, ino); std::map ret_map; - if (ino == FUSE_ROOT_ID) { + if(ino == FUSE_ROOT_ID) { this->inodes.for_each( - [&ret_map](auto k,auto& v){ - // CAUTION: only works up to 64bit virtual address CPU architectures. - ret_map.emplace(v.display_name,reinterpret_cast(&v)); - }); - ret_map.emplace(metadata_inode.display_name,reinterpret_cast(&this->metadata_inode)); - ret_map.emplace(objectpool_inode.display_name,reinterpret_cast(&this->objectpool_inode)); - ret_map.emplace(admin_metadata_inode.display_name,reinterpret_cast(&this->admin_metadata_inode)); + [&ret_map](auto k, auto& v) { + // CAUTION: only works up to 64bit virtual address CPU architectures. + ret_map.emplace(v.display_name, reinterpret_cast(&v)); + }); + ret_map.emplace(metadata_inode.display_name, reinterpret_cast(&this->metadata_inode)); + ret_map.emplace(objectpool_inode.display_name, reinterpret_cast(&this->objectpool_inode)); + ret_map.emplace(admin_metadata_inode.display_name, reinterpret_cast(&this->admin_metadata_inode)); } else { FuseClientINode* pfci = reinterpret_cast(ino); - ret_map = pfci->get_dir_entries(); // RVO + ret_map = pfci->get_dir_entries(); // RVO } dbg_default_trace(" [{}]leaving {}.", gettid(), __func__); return ret_map; @@ -1037,8 +1016,8 @@ class FuseClientContext { * */ double fill_stbuf_by_ino(struct stat& stbuf) { - dbg_default_trace("[{}]entering {}.",gettid(),__func__); - double timeout_sec = 1.0; //TO_FOREVER; + dbg_default_trace("[{}]entering {}.", gettid(), __func__); + double timeout_sec = 1.0; // TO_FOREVER; // 1 - common attributes stbuf.st_dev = FUSE_CLIENT_DEV_ID; stbuf.st_nlink = 1; @@ -1048,76 +1027,75 @@ class FuseClientContext { stbuf.st_mtim = this->init_timestamp; stbuf.st_ctim = this->init_timestamp; // 2 - special attributes - if (stbuf.st_ino == FUSE_ROOT_ID) { + if(stbuf.st_ino == FUSE_ROOT_ID) { stbuf.st_mode = S_IFDIR | 0755; stbuf.st_size = FUSE_CLIENT_BLK_SIZE; stbuf.st_blocks = 1; stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; } else { FuseClientINode* pfci = reinterpret_cast(stbuf.st_ino); - switch (pfci->type) { - case INodeType::SITE: - // TODO: - break; - case INodeType::CASCADE_TYPE: - stbuf.st_mode = S_IFDIR | 0755; - stbuf.st_size = pfci->get_file_size(); - stbuf.st_blocks = (stbuf.st_size+FUSE_CLIENT_BLK_SIZE-1)/FUSE_CLIENT_BLK_SIZE; - stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; - break; - case INodeType::METADATA_SERVICE: - stbuf.st_mode = S_IFDIR | 0755; - stbuf.st_size = pfci->get_file_size(); - stbuf.st_blocks = (stbuf.st_size+FUSE_CLIENT_BLK_SIZE-1)/FUSE_CLIENT_BLK_SIZE; - stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; - break; - case INodeType::SUBGROUP: - stbuf.st_mode = S_IFDIR | 0755; - stbuf.st_size = pfci->get_file_size(); - stbuf.st_blocks = (stbuf.st_size+FUSE_CLIENT_BLK_SIZE-1)/FUSE_CLIENT_BLK_SIZE; - stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; - break; - case INodeType::SHARD: - stbuf.st_mode = S_IFDIR | 0755; - stbuf.st_size = pfci->get_file_size(); - stbuf.st_blocks = (stbuf.st_size+FUSE_CLIENT_BLK_SIZE-1)/FUSE_CLIENT_BLK_SIZE; - stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; - break; - case INodeType::KEY: - stbuf.st_mode = S_IFREG| 0444; - stbuf.st_size = pfci->get_file_size(); - stbuf.st_blocks = (stbuf.st_size + FUSE_CLIENT_BLK_SIZE - 1)/FUSE_CLIENT_BLK_SIZE; - stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; - break; - case INodeType::META: - stbuf.st_mode = S_IFREG| 0444; - stbuf.st_size = pfci->get_file_size(); - stbuf.st_blocks = (stbuf.st_size + FUSE_CLIENT_BLK_SIZE - 1)/FUSE_CLIENT_BLK_SIZE; - stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; - break; - case INodeType::OBJECTPOOL_PATH: - stbuf.st_mode = S_IFDIR | 0755; - stbuf.st_size = pfci->get_file_size(); - stbuf.st_blocks = (stbuf.st_size+FUSE_CLIENT_BLK_SIZE-1)/FUSE_CLIENT_BLK_SIZE; - stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; - break; - case INodeType::DATAPATH_LOGIC: - stbuf.st_mode = S_IFDIR | 0755; - stbuf.st_size = pfci->get_file_size(); - stbuf.st_blocks = (stbuf.st_size+FUSE_CLIENT_BLK_SIZE-1)/FUSE_CLIENT_BLK_SIZE; - stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; - break; - case INodeType::DLL: - stbuf.st_mode = S_IFDIR | 0755; - stbuf.st_size = pfci->get_file_size(); - stbuf.st_blocks = (stbuf.st_size+FUSE_CLIENT_BLK_SIZE-1)/FUSE_CLIENT_BLK_SIZE; - stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; - break; - default: - ; + switch(pfci->type) { + case INodeType::SITE: + // TODO: + break; + case INodeType::CASCADE_TYPE: + stbuf.st_mode = S_IFDIR | 0755; + stbuf.st_size = pfci->get_file_size(); + stbuf.st_blocks = (stbuf.st_size + FUSE_CLIENT_BLK_SIZE - 1) / FUSE_CLIENT_BLK_SIZE; + stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; + break; + case INodeType::METADATA_SERVICE: + stbuf.st_mode = S_IFDIR | 0755; + stbuf.st_size = pfci->get_file_size(); + stbuf.st_blocks = (stbuf.st_size + FUSE_CLIENT_BLK_SIZE - 1) / FUSE_CLIENT_BLK_SIZE; + stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; + break; + case INodeType::SUBGROUP: + stbuf.st_mode = S_IFDIR | 0755; + stbuf.st_size = pfci->get_file_size(); + stbuf.st_blocks = (stbuf.st_size + FUSE_CLIENT_BLK_SIZE - 1) / FUSE_CLIENT_BLK_SIZE; + stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; + break; + case INodeType::SHARD: + stbuf.st_mode = S_IFDIR | 0755; + stbuf.st_size = pfci->get_file_size(); + stbuf.st_blocks = (stbuf.st_size + FUSE_CLIENT_BLK_SIZE - 1) / FUSE_CLIENT_BLK_SIZE; + stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; + break; + case INodeType::KEY: + stbuf.st_mode = S_IFREG | 0444; + stbuf.st_size = pfci->get_file_size(); + stbuf.st_blocks = (stbuf.st_size + FUSE_CLIENT_BLK_SIZE - 1) / FUSE_CLIENT_BLK_SIZE; + stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; + break; + case INodeType::META: + stbuf.st_mode = S_IFREG | 0444; + stbuf.st_size = pfci->get_file_size(); + stbuf.st_blocks = (stbuf.st_size + FUSE_CLIENT_BLK_SIZE - 1) / FUSE_CLIENT_BLK_SIZE; + stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; + break; + case INodeType::OBJECTPOOL_PATH: + stbuf.st_mode = S_IFDIR | 0755; + stbuf.st_size = pfci->get_file_size(); + stbuf.st_blocks = (stbuf.st_size + FUSE_CLIENT_BLK_SIZE - 1) / FUSE_CLIENT_BLK_SIZE; + stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; + break; + case INodeType::DATAPATH_LOGIC: + stbuf.st_mode = S_IFDIR | 0755; + stbuf.st_size = pfci->get_file_size(); + stbuf.st_blocks = (stbuf.st_size + FUSE_CLIENT_BLK_SIZE - 1) / FUSE_CLIENT_BLK_SIZE; + stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; + break; + case INodeType::DLL: + stbuf.st_mode = S_IFDIR | 0755; + stbuf.st_size = pfci->get_file_size(); + stbuf.st_blocks = (stbuf.st_size + FUSE_CLIENT_BLK_SIZE - 1) / FUSE_CLIENT_BLK_SIZE; + stbuf.st_blksize = FUSE_CLIENT_BLK_SIZE; + break; + default:; } } - dbg_default_trace("[{}]leaving {}.",gettid(),__func__); + dbg_default_trace("[{}]leaving {}.", gettid(), __func__); return timeout_sec; } @@ -1127,17 +1105,16 @@ class FuseClientContext { * @param fi file structure shared among processes opening this file. * @return error code. 0 for success. */ - int open_file(fuse_ino_t ino, struct fuse_file_info *fi) { + int open_file(fuse_ino_t ino, struct fuse_file_info* fi) { dbg_default_trace("[{}]entering {} with ino={:x}.", gettid(), __func__, ino); FuseClientINode* pfci = reinterpret_cast(ino); - if (pfci->type != INodeType::KEY && - pfci->type != INodeType::META) { + if(pfci->type != INodeType::KEY && pfci->type != INodeType::META) { return EISDIR; } FileBytes* fb = new FileBytes(); pfci->read_file(fb); fi->fh = reinterpret_cast(fb); - dbg_default_trace("[{}]leaving {}.",gettid(),__func__); + dbg_default_trace("[{}]leaving {}.", gettid(), __func__); return 0; } @@ -1147,16 +1124,16 @@ class FuseClientContext { * @param fi file structure shared among processes opening this file. * @return error code. 0 for success. */ - int close_file(fuse_ino_t ino, struct fuse_file_info *fi) { + int close_file(fuse_ino_t ino, struct fuse_file_info* fi) { dbg_default_trace("[{}]entering {} with ino={:x}.", gettid(), __func__, ino); void* pfb = reinterpret_cast(fi->fh); - if (pfb!=nullptr) { + if(pfb != nullptr) { delete static_cast(pfb); } return 0; - dbg_default_trace("[{}]leaving {}.",gettid(),__func__); + dbg_default_trace("[{}]leaving {}.", gettid(), __func__); } }; -} // namespace cascade -} // namespace derecho +} // namespace cascade +} // namespace derecho diff --git a/src/service/fuse/testfuse.py b/src/service/fuse/testfuse.py index d1f0860a..d957b45b 100644 --- a/src/service/fuse/testfuse.py +++ b/src/service/fuse/testfuse.py @@ -1,176 +1,279 @@ #!/usr/bin/env python3 -from derecho.cascade.client import ServiceClientAPI -import subprocess +import errno +import filecmp +import getopt import multiprocessing -import threading import os -import sys,getopt -import stat +import queue import shutil -import filecmp +import stat +import subprocess +import sys import tempfile +import threading import time -import errno -import sys -import queue -from tempfile import NamedTemporaryFile from contextlib import contextmanager -from util import (wait_for_mount, compare_dirs, umount, cleanup, base_cmdline, - safe_sleep, basename, test_printcap, - fuse_proto, powerset) from os.path import join as pjoin +from tempfile import NamedTemporaryFile + +from derecho.cascade.client import ServiceClientAPI +from util import ( + base_cmdline, + basename, + cleanup, + compare_dirs, + fuse_proto, + powerset, + safe_sleep, + test_printcap, + umount, + wait_for_mount, +) TEST_FILE = __file__ -with open(TEST_FILE, 'rb') as fh: +with open(TEST_FILE, "rb") as fh: TEST_DATA = fh.read() + def name_generator(__ctr=[0]): __ctr[0] += 1 - return 'testfile_%d' % __ctr[0] + return "testfile_%d" % __ctr[0] + options = [] -if sys.platform == 'linux': - options.append('clone_fd') +if sys.platform == "linux": + options.append("clone_fd") def invoke_directly(mnt_dir, name, options): - cmdline = base_cmdline + [ pjoin(basename, 'example', name), - '-f', mnt_dir, '-o', ','.join(options) ] - if name == 'cascade_fuse_client': + cmdline = base_cmdline + [ + pjoin(basename, "example", name), + "-f", + mnt_dir, + "-o", + ",".join(options), + ] + if name == "cascade_fuse_client": # supports single-threading only - cmdline.append('-s') + cmdline.append("-s") return cmdline def readdir_inode(dir): - cmd = base_cmdline + [ pjoin(basename, 'test', 'readdir_inode'), dir ] - with subprocess.Popen(cmd, stdout=subprocess.PIPE, - universal_newlines=True) as proc: + cmd = base_cmdline + [pjoin(basename, "test", "readdir_inode"), dir] + with subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True) as proc: lines = proc.communicate()[0].splitlines() lines.sort() return lines -def error_msg( run_id, testcase, output, expected): +def error_msg(run_id, testcase, output, expected): msg = "[" + run_id + "]" + testcase + "(output--expected):" - msg += "(" + ','.join(output) - msg += " -- " + ','.join(expected) + ")" + msg += "(" + ",".join(output) + msg += " -- " + ",".join(expected) + ")" return msg - -put_keys = ['key0', '/a/a1/key0', '/b/b1/key1', '/c/c1/c2/key2'] -put_values = ['00', '00000000', '11111111', '22222222'] -vcss_expected_keys = [".cascade","key-key0", "key-\\a\\a1\\key0"] -pcss_expected_keys = [".cascade","key-\\b\\b1\\key1","key-\\c\\c1\\c2\\key2"] +put_keys = ["key0", "/a/a1/key0", "/b/b1/key1", "/c/c1/c2/key2"] +put_values = ["00", "00000000", "11111111", "22222222"] + +vcss_expected_keys = [".cascade", "key-key0", "key-\\a\\a1\\key0"] +pcss_expected_keys = [".cascade", "key-\\b\\b1\\key1", "key-\\c\\c1\\c2\\key2"] objp_expected_second_level = [".cascade", "a", "b", "c"] -objp_expected_third_level = [[], [".cascade", "a1"], [".cascade", "b1"], [".cascade", "c1"]] -objp_expected_keys = [[], [".cascade", "key0"], [".cascade", "key1"], [".cascade", "c2"]] +objp_expected_third_level = [ + [], + [".cascade", "a1"], + [".cascade", "b1"], + [".cascade", "c1"], +] +objp_expected_keys = [ + [], + [".cascade", "key0"], + [".cascade", "key1"], + [".cascade", "c2"], +] objp_expected_keys2 = [".cascade", "key2"] - def initial_setup(): capi = ServiceClientAPI() print("----------- CASCADE INITIAL SETUP -----------") - capi.put(put_keys[0], bytes(put_values[0],'utf-8'), subgroup_type='VolatileCascadeStoreWithStringKey', subgroup_index=0, shard_index=0) - capi.create_object_pool('/a/a1', 'VolatileCascadeStoreWithStringKey', 0) - capi.create_object_pool("/b/b1",'PersistentCascadeStoreWithStringKey', 0) - capi.create_object_pool("/c/c1/c2",'PersistentCascadeStoreWithStringKey', 0) + capi.put( + put_keys[0], + bytes(put_values[0], "utf-8"), + subgroup_type="VolatileCascadeStoreWithStringKey", + subgroup_index=0, + shard_index=0, + ) + capi.create_object_pool("/a/a1", "VolatileCascadeStoreWithStringKey", 0) + capi.create_object_pool("/b/b1", "PersistentCascadeStoreWithStringKey", 0) + capi.create_object_pool("/c/c1/c2", "PersistentCascadeStoreWithStringKey", 0) time.sleep(5) - - capi.put(put_keys[1], bytes(put_values[1],'utf-8'),blocking=True) - capi.put(put_keys[2],bytes(put_values[2],'utf-8'),blocking=True) - res = capi.put(put_keys[3], bytes(put_values[3],'utf-8'),blocking=True) + + capi.put(put_keys[1], bytes(put_values[1], "utf-8"), blocking=True) + capi.put(put_keys[2], bytes(put_values[2], "utf-8"), blocking=True) + res = capi.put(put_keys[3], bytes(put_values[3], "utf-8"), blocking=True) if res: odict = res.get_result() # print("------- THRID put result: " + f"{str(odict)}") - + # Test 1. check the directory for Cascade subgroups def tst_sg_dirs(mnt_dir, run_id=""): # First-level(subgroup_type) check mnt_first_level = os.listdir(mnt_dir) - expected_first_level = ['.cascade', 'MetadataService', 'PersistentCascadeStoreWithStringKey', 'VolatileCascadeStoreWithStringKey', 'TriggerCascadeNoStoreWithStringKey','ObjectPools' ] - error_m = error_msg(run_id, "first level", mnt_first_level, expected_first_level) + expected_first_level = [ + ".cascade", + "MetadataService", + "PersistentCascadeStoreWithStringKey", + "VolatileCascadeStoreWithStringKey", + "TriggerCascadeNoStoreWithStringKey", + "ObjectPools", + ] + error_m = error_msg(run_id, "first level", mnt_first_level, expected_first_level) assert compare_dirs(mnt_first_level, expected_first_level), error_m sg_expected_second_level = ["subgroup-0"] sg_expected_third_level = ["shard-0"] # Second-level(subgroup_index) check for subgroup_type in expected_first_level: - if(subgroup_type=='.cascade' or subgroup_type=='MetadataService' or subgroup_type=='ObjectPools'): + if ( + subgroup_type == ".cascade" + or subgroup_type == "MetadataService" + or subgroup_type == "ObjectPools" + ): continue mnt_second_level = os.listdir(mnt_dir + "/" + subgroup_type) assert compare_dirs(mnt_second_level, sg_expected_second_level) # Third-level(shard) check - mnt_third_level = os.listdir(mnt_dir + "/" + subgroup_type + "/" + sg_expected_second_level[0]) + mnt_third_level = os.listdir( + mnt_dir + "/" + subgroup_type + "/" + sg_expected_second_level[0] + ) assert compare_dirs(mnt_third_level, sg_expected_third_level) # Forth-level(key) check - if(subgroup_type == 'PersistentCascadeStoreWithStringKey'): - mnt_forth_level = os.listdir(mnt_dir + "/" + subgroup_type + "/" + sg_expected_second_level[0] + "/" + sg_expected_third_level[0]) - error_m = error_msg(run_id, "PCSS forthlevel", mnt_forth_level, pcss_expected_keys) + if subgroup_type == "PersistentCascadeStoreWithStringKey": + mnt_forth_level = os.listdir( + mnt_dir + + "/" + + subgroup_type + + "/" + + sg_expected_second_level[0] + + "/" + + sg_expected_third_level[0] + ) + error_m = error_msg( + run_id, "PCSS forthlevel", mnt_forth_level, pcss_expected_keys + ) assert compare_dirs(mnt_forth_level, pcss_expected_keys), error_m - if(subgroup_type == 'VolatileCascadeStoreWithStringKey'): - mnt_forth_level = os.listdir(mnt_dir + "/" + subgroup_type + "/" + sg_expected_second_level[0] + "/" + sg_expected_third_level[0]) - assert compare_dirs(mnt_forth_level, vcss_expected_keys) - print("["+ run_id +"]"+ "--- Test1 SubgroupTypes directories pass! ---") + if subgroup_type == "VolatileCascadeStoreWithStringKey": + mnt_forth_level = os.listdir( + mnt_dir + + "/" + + subgroup_type + + "/" + + sg_expected_second_level[0] + + "/" + + sg_expected_third_level[0] + ) + assert compare_dirs(mnt_forth_level, vcss_expected_keys) + print("[" + run_id + "]" + "--- Test1 SubgroupTypes directories pass! ---") + - - # Test 2. check the directory for Cascade object_pools +# Test 2. check the directory for Cascade object_pools def tst_objp_dirs(mnt_dir, run_id=""): mnt_objp_root = os.listdir(mnt_dir + "/" + "ObjectPools") - error_m = error_msg(run_id, "object pool test", mnt_objp_root, objp_expected_second_level) + error_m = error_msg( + run_id, "object pool test", mnt_objp_root, objp_expected_second_level + ) assert compare_dirs(mnt_objp_root, objp_expected_second_level), error_m - for i in range(1,4): - mnt_objp_dir = os.listdir(mnt_dir + "/" + "ObjectPools" + "/" + objp_expected_second_level[i]) - error_m = error_msg(run_id, f"object pool test{str(i)}", mnt_objp_dir, objp_expected_third_level[i]) + for i in range(1, 4): + mnt_objp_dir = os.listdir( + mnt_dir + "/" + "ObjectPools" + "/" + objp_expected_second_level[i] + ) + error_m = error_msg( + run_id, + f"object pool test{str(i)}", + mnt_objp_dir, + objp_expected_third_level[i], + ) assert compare_dirs(mnt_objp_dir, objp_expected_third_level[i]) - mnt_objp_key = os.listdir(mnt_dir + "/" + "ObjectPools" + "/" + objp_expected_second_level[i] + "/" + objp_expected_third_level[i][1]) + mnt_objp_key = os.listdir( + mnt_dir + + "/" + + "ObjectPools" + + "/" + + objp_expected_second_level[i] + + "/" + + objp_expected_third_level[i][1] + ) assert compare_dirs(mnt_objp_key, objp_expected_keys[i]) - if(i == 3): - mnt_objp_subdir = os.listdir(mnt_dir + "/" + "ObjectPools" + "/" + objp_expected_second_level[i] + "/" + objp_expected_third_level[i][1] + "/" + objp_expected_keys[i][1]) + if i == 3: + mnt_objp_subdir = os.listdir( + mnt_dir + + "/" + + "ObjectPools" + + "/" + + objp_expected_second_level[i] + + "/" + + objp_expected_third_level[i][1] + + "/" + + objp_expected_keys[i][1] + ) assert compare_dirs(mnt_objp_subdir, objp_expected_keys2) print("[" + run_id + "]" + "--- Test2 ObjectPools directories pass! ---") - + # Test 3. check the open and read content of files def tst_content(mnt_dir, run_id=""): # 1. subgroup content test subgroup_type = "/VolatileCascadeStoreWithStringKey/" - for i in range(1,2): - sg_mnt_key = mnt_dir + subgroup_type + "subgroup-0/shard-0/" +vcss_expected_keys[i+1] - with open(sg_mnt_key, 'r') as fh: + for i in range(1, 2): + sg_mnt_key = ( + mnt_dir + subgroup_type + "subgroup-0/shard-0/" + vcss_expected_keys[i + 1] + ) + with open(sg_mnt_key, "r") as fh: output = fh.read() - error_m = error_msg(run_id, f"sg content{sg_mnt_key}", output, put_values[i]) + error_m = error_msg( + run_id, f"sg content{sg_mnt_key}", output, put_values[i] + ) assert output == put_values[i], error_m subgroup_type = "/PersistentCascadeStoreWithStringKey/" - for i in range(2,4): - sg_mnt_key = mnt_dir + subgroup_type + "subgroup-0/shard-0/" +pcss_expected_keys[i-1] - with open(sg_mnt_key, 'r') as fh: + for i in range(2, 4): + sg_mnt_key = ( + mnt_dir + subgroup_type + "subgroup-0/shard-0/" + pcss_expected_keys[i - 1] + ) + with open(sg_mnt_key, "r") as fh: output = fh.read() - error_m = error_msg(run_id, f"sg content{sg_mnt_key}", output, put_values[i]) + error_m = error_msg( + run_id, f"sg content{sg_mnt_key}", output, put_values[i] + ) assert output == put_values[i], error_m # 2. Objectpool content test object_pool_dir = "/ObjectPools" - for i in range(1,4): + for i in range(1, 4): objp_mnt_key = mnt_dir + object_pool_dir + put_keys[i] - with open(objp_mnt_key, 'r') as fh: + with open(objp_mnt_key, "r") as fh: output = fh.read() - error_m = error_msg(run_id, f"objp content{objp_mnt_key}", output, put_values[i]) + error_m = error_msg( + run_id, f"objp content{objp_mnt_key}", output, put_values[i] + ) assert output == put_values[i], error_m - print("["+ run_id +"]" + "--- Test3 read content pass! ---") + print("[" + run_id + "]" + "--- Test3 read content pass! ---") + # def tst_attr(mnt_dir): - + + def cascade_fuse_mount(mnt_dir): # 1. create mounting piont subprocess.Popen(["mkdir", mnt_dir]) - cmd_input = base_cmdline + [ pjoin(basename, "cascade_fuse_client"), mnt_dir, "-f"] + cmd_input = base_cmdline + [pjoin(basename, "cascade_fuse_client"), mnt_dir, "-f"] # 2. run cascade_fuse_client - mount_process = subprocess.Popen(cmd_input, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + mount_process = subprocess.Popen( + cmd_input, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) # try: wait_for_mount(mount_process, mnt_dir) # except: @@ -179,18 +282,19 @@ def cascade_fuse_mount(mnt_dir): # else: # umount(mnt_dir) + def cascade_fuse_umount(mnt_dir): # 1. umount the mounting process at mnt_dir umount(mnt_dir) # 2. delete the mounted directory - subprocess.Popen(["rm", "-r",mnt_dir]) + subprocess.Popen(["rm", "-r", mnt_dir]) print(" ~~~~~~~~~~~~~ FINISHED cascade_fuse_client unmount ~~~~~~~~~~~\n") def fuse_test_cases(mnt_dir, run_id): - tst_sg_dirs(mnt_dir,run_id) - tst_objp_dirs(mnt_dir,run_id) - tst_content(mnt_dir,run_id) + tst_sg_dirs(mnt_dir, run_id) + tst_objp_dirs(mnt_dir, run_id) + tst_content(mnt_dir, run_id) # tst_attr(mnt) @@ -199,33 +303,41 @@ def main(argv): process_num = 1 thread_num = 1 try: - opts, args = getopt.getopt(argv,"ht:p:",["threadN=","processN="]) + opts, args = getopt.getopt(argv, "ht:p:", ["threadN=", "processN="]) except getopt.GetoptError: - print('fusetest.py -t -p ') + print("fusetest.py -t -p ") sys.exit(2) for opt, arg in opts: - if opt == '-h': - print ('fusetest.py -t -p ') + if opt == "-h": + print("fusetest.py -t -p ") sys.exit() elif opt in ("-t", "--ifile"): thread_num = int(arg) elif opt in ("-p", "--ofile"): process_num = int(arg) - + cascade_process = multiprocessing.Process(target=initial_setup) cascade_process.start() cascade_process.join() - mnt_dir = pjoin(basename,"testdir") # use build/src/service/fuse/testdir as mounting point + mnt_dir = pjoin( + basename, "testdir" + ) # use build/src/service/fuse/testdir as mounting point fuse_process = multiprocessing.Process(target=cascade_fuse_mount, args=(mnt_dir,)) fuse_process.start() - fuse_process.join() # used when fuse not run in foreground in fuse_client.cpp - + fuse_process.join() # used when fuse not run in foreground in fuse_client.cpp + # multi-processing test cases test_processes = [] for i in range(process_num): run_id = "process_" + str(i) - proc = multiprocessing.Process(target=fuse_test_cases, args=(mnt_dir,run_id,)) + proc = multiprocessing.Process( + target=fuse_test_cases, + args=( + mnt_dir, + run_id, + ), + ) test_processes.append(proc) proc.start() for proc in test_processes: @@ -235,7 +347,13 @@ def main(argv): test_threads = [] for i in range(thread_num): run_id = "thread_" + str(i) - thread = threading.Thread(target=fuse_test_cases, args=(mnt_dir,run_id,)) + thread = threading.Thread( + target=fuse_test_cases, + args=( + mnt_dir, + run_id, + ), + ) test_threads.append(thread) thread.start() for thread in test_threads: @@ -244,5 +362,6 @@ def main(argv): # fuse_process.terminate() cascade_fuse_umount(mnt_dir) + if __name__ == "__main__": - main(sys.argv[1:]) + main(sys.argv[1:]) diff --git a/src/service/fuse/util.py b/src/service/fuse/util.py index 45b239a8..1548b870 100644 --- a/src/service/fuse/util.py +++ b/src/service/fuse/util.py @@ -1,48 +1,47 @@ #!/usr/bin/env python3 -import subprocess -import pytest +import itertools import os +import re import stat +import subprocess +import sys import time from os.path import join as pjoin -import sys -import re -import itertools + +import pytest basename = os.path.dirname(os.path.abspath(__file__)) fusermount3_dir = "/home/yy354/opt-dev/bin" def test_printcap(): - cmdline = base_cmdline + [ pjoin(basename, 'example', 'printcap') ] - proc = subprocess.Popen(cmdline, stdout=subprocess.PIPE, - universal_newlines=True) + cmdline = base_cmdline + [pjoin(basename, "example", "printcap")] + proc = subprocess.Popen(cmdline, stdout=subprocess.PIPE, universal_newlines=True) (stdout, _) = proc.communicate(30) assert proc.returncode == 0 proto = None caps = set() - for line in stdout.split('\n'): - if line.startswith('\t'): + for line in stdout.split("\n"): + if line.startswith("\t"): caps.add(line.strip()) continue - hit = re.match(r'Protocol version: (\d+)\.(\d+)$', line) + hit = re.match(r"Protocol version: (\d+)\.(\d+)$", line) if hit: proto = (int(hit.group(1)), int(hit.group(2))) return (proto, caps) -def wait_for_mount(mount_process, mnt_dir, - test_fn=os.path.ismount): +def wait_for_mount(mount_process, mnt_dir, test_fn=os.path.ismount): elapsed = 0 while elapsed < 20: if test_fn(mnt_dir): print(" -------- Fuse mount succeed -------") return True if mount_process.poll() is not None: - pytest.fail('file system process terminated prematurely') + pytest.fail("file system process terminated prematurely") time.sleep(0.1) elapsed += 0.1 pytest.fail("mountpoint failed to come up") @@ -56,28 +55,27 @@ def compare_dirs(output, expected): if not dir in output: return False return True - + def cleanup(mount_process, mnt_dir): # Don't bother trying Valgrind if things already went wrong - if 'bsd' in sys.platform or 'dragonfly' in sys.platform: - cmd = [ 'umount', '-f', mnt_dir ] + if "bsd" in sys.platform or "dragonfly" in sys.platform: + cmd = ["umount", "-f", mnt_dir] else: - cmd = [pjoin(basename, 'util', 'fusermount3'), - '-z', '-u', mnt_dir] - subprocess.call(cmd, stdout=subprocess.DEVNULL, - stderr=subprocess.STDOUT) + cmd = [pjoin(basename, "util", "fusermount3"), "-z", "-u", mnt_dir] + subprocess.call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) mount_process.terminate() try: mount_process.wait(1) except subprocess.TimeoutExpired: mount_process.kill() + def umount(mnt_dir): - if 'bsd' in sys.platform or 'dragonfly' in sys.platform: - cmdline = [ 'umount', mnt_dir ] + if "bsd" in sys.platform or "dragonfly" in sys.platform: + cmdline = ["umount", mnt_dir] else: # fusermount3 will be setuid root, so we can only trace it with # valgrind if we're root @@ -85,21 +83,17 @@ def umount(mnt_dir): cmdline = base_cmdline else: cmdline = [] - cmdline = cmdline + [ pjoin(fusermount3_dir, 'fusermount3'), - '-z', '-u', mnt_dir ] + cmdline = cmdline + [pjoin(fusermount3_dir, "fusermount3"), "-z", "-u", mnt_dir] subprocess.check_call(cmdline) assert not os.path.ismount(mnt_dir) - - - def safe_sleep(secs): - '''Like time.sleep(), but sleep for at least *secs* + """Like time.sleep(), but sleep for at least *secs* `time.sleep` may sleep less than the given period if a signal is received. This function ensures that we sleep for at least the desired time. - ''' + """ now = time.time() end = now + secs @@ -109,26 +103,30 @@ def safe_sleep(secs): def powerset(iterable): - s = list(iterable) - return itertools.chain.from_iterable( - itertools.combinations(s, r) for r in range(len(s)+1)) + s = list(iterable) + return itertools.chain.from_iterable( + itertools.combinations(s, r) for r in range(len(s) + 1) + ) # Use valgrind if requested -if os.environ.get('TEST_WITH_VALGRIND', 'no').lower().strip() \ - not in ('no', 'false', '0'): - base_cmdline = [ 'valgrind', '-q', '--' ] +if os.environ.get("TEST_WITH_VALGRIND", "no").lower().strip() not in ( + "no", + "false", + "0", +): + base_cmdline = ["valgrind", "-q", "--"] else: base_cmdline = [] # Try to use local fusermount3 -os.environ['PATH'] = '%s:%s' % (pjoin(basename, 'util'), os.environ['PATH']) +os.environ["PATH"] = "%s:%s" % (pjoin(basename, "util"), os.environ["PATH"]) # Put example binaries on PATH -os.environ['PATH'] = '%s:%s' % (pjoin(basename, 'example'), os.environ['PATH']) +os.environ["PATH"] = "%s:%s" % (pjoin(basename, "example"), os.environ["PATH"]) try: (fuse_proto, fuse_caps) = test_printcap() except: # Rely on test to raise error - fuse_proto = (0,0) + fuse_proto = (0, 0) fuse_caps = set() diff --git a/src/service/server.cpp b/src/service/server.cpp index c7850d45..fb1c4960 100644 --- a/src/service/server.cpp +++ b/src/service/server.cpp @@ -1,10 +1,11 @@ #include "server.hpp" #include "cascade/cascade.hpp" +#include "cascade/object.hpp" #include "cascade/service.hpp" #include "cascade/service_types.hpp" -#include "cascade/object.hpp" +#include #include #include #include @@ -15,7 +16,36 @@ using namespace derecho::cascade; +void terminate() { + // wait for service to quit. + Service::shutdown(false); + dbg_default_trace("shutdown service gracefully"); + // you can do something here to parallel the destructing process. + Service::wait(); + dbg_default_trace("Finish shutdown."); +} + +void signal_handler(int signum) { + dbg_default_trace("received interrupt signal {}", signum); + + terminate(); + exit(signum); +} + int main(int argc, char** argv) { + // check for signal_arg + bool use_signal = false; + for(int i = 0; i < argc; ++i) { + printf("Argument %d : %s\n", i, argv[i]); + if(strcmp(argv[i], "--signal") == 0) { + use_signal = true; + } + } + // set proc name if(prctl(PR_SET_NAME, PROC_NAME, 0, 0, 0) != 0) { dbg_default_warn("Cannot set proc name to {}.", PROC_NAME); @@ -47,18 +77,18 @@ int main(int argc, char** argv) { meta_factory, vcss_factory, pcss_factory, tcss_factory); dbg_default_trace("started service, waiting till it ends."); - std::cout << "Press Enter to Shutdown." << std::endl; - std::cin.get(); - // wait for service to quit. - Service::shutdown(false); - dbg_default_trace("shutdown service gracefully"); - // you can do something here to parallel the destructing process. - Service::wait(); - dbg_default_trace("Finish shutdown."); + + if(use_signal) { + printf("Send SIGINT (Ctrl+C) to Shutdown.\n"); + signal(SIGINT, signal_handler); + while(true) { + sleep(60); + } + } else { + printf("Press Enter to Shutdown.\n"); + std::cin.get(); + terminate(); + } return 0; }