From 7c66e87592f0d29c93ae6ef4cc04e4cf2c0b0dcc Mon Sep 17 00:00:00 2001 From: Dave Dykstra Date: Wed, 20 Oct 2021 15:15:27 -0500 Subject: [PATCH 1/4] add thread safety --- client/Makefile | 4 +- client/fn-internal.h | 6 +- client/fn-zlib.c | 27 +++- client/fn-zlib.h | 2 + client/frontier-cpp.cc | 4 + client/frontier.c | 105 ++++++++++---- client/frontier_config.c | 9 ++ client/http/fn-socket.c | 9 ++ client/include/frontier_client/frontier-cpp.h | 2 + client/include/frontier_client/frontier.h | 13 ++ client/test-req.cc | 132 +++++++++++++----- 11 files changed, 250 insertions(+), 63 deletions(-) diff --git a/client/Makefile b/client/Makefile index fb74278..5366961 100644 --- a/client/Makefile +++ b/client/Makefile @@ -54,8 +54,8 @@ DYLIBTYPE := $(shell if [ -f /usr/lib/libc.dylib ]; then echo dylib; else echo s # GCC settings DEBUG_OPTIM = -g -O2 -CFLAGS = $(DEBUG_OPTIM) -CXXFLAGS = $(DEBUG_OPTIM) +CFLAGS = $(DEBUG_OPTIM) -pthread +CXXFLAGS = $(DEBUG_OPTIM) -pthread FNAPI_VERSION = $(FN_VER_MAJOR).$(FN_VER_MINOR) CC = gcc CXX = c++ diff --git a/client/fn-internal.h b/client/fn-internal.h index b249041..c3dba43 100644 --- a/client/fn-internal.h +++ b/client/fn-internal.h @@ -144,6 +144,7 @@ struct s_Channel char *ttllong_suffix; char *ttlforever_suffix; int client_cache_maxsize; + void *zsave; void *serverrsakey[FRONTIER_MAX_SERVERN]; fn_query_stat query_stat; unsigned int query_bytes; @@ -163,7 +164,10 @@ struct s_RSBlob }; typedef struct s_RSBlob RSBlob; - +void frontier_init_lock(); +void frontier_init_unlock(); +void frontier_lock(); +int frontier_unlock(); char *frontier_str_now(char *); diff --git a/client/fn-zlib.c b/client/fn-zlib.c index ca7cda0..3d51b12 100644 --- a/client/fn-zlib.c +++ b/client/fn-zlib.c @@ -21,6 +21,24 @@ static z_stream *dezstream=0; static z_stream *inzstream=0; +// Since inflates happen across multiple operations, for multithread +// support use fn_save to save the inflate stream when exiting the +// protected section and use fn_zrestore to restore it when entering +// the protected section. Initialize the value by calling +// fn_gunzip_init followed by fn_zsave. + +void *fn_zsave() + { + void *savep = (void *)inzstream; + inzstream=0; + return savep; + } + +void fn_zrestore(void *savep) + { + inzstream=(z_stream *)savep; + } + static void *fn_zalloc(void *opaque,uInt items,uInt size) { return frontier_mem_alloc(items*size); @@ -118,6 +136,8 @@ int fn_gzip_str2urlenc(const char *str,int size,char **out) unsigned char *abuf=0; if(size>MAX_STR2URL_SIZE) return FN_ZLIB_E_TOOBIG; + + frontier_init_lock(); if(str[size-1]=='\n') size--; // don't include trailing newline @@ -125,7 +145,11 @@ int fn_gzip_str2urlenc(const char *str,int size,char **out) zsize=(int)(((double)size)*1.001+12); zbuf=frontier_mem_alloc(zsize); - if(!zbuf) return FN_ZLIB_E_NOMEM; + if(!zbuf) + { + frontier_init_unlock(); + return FN_ZLIB_E_NOMEM; + } zret=fn_gzip_str(str,size,(char *)zbuf,zsize); if(zret<0) {ret=zret; goto end;} @@ -143,6 +167,7 @@ int fn_gzip_str2urlenc(const char *str,int size,char **out) end: if(abuf) frontier_mem_free(abuf); if(zbuf) frontier_mem_free(zbuf); + frontier_init_unlock(); return ret; } diff --git a/client/fn-zlib.h b/client/fn-zlib.h index 2b02c83..1dda8a3 100644 --- a/client/fn-zlib.h +++ b/client/fn-zlib.h @@ -27,5 +27,7 @@ void fn_gzip_cleanup(); int fn_gunzip_init(); int fn_gunzip_update(unsigned char *src,int *src_size,const unsigned char *dest,int *dest_size,int final); +void *fn_zsave(); +void fn_zrestore(void *savep); #endif //__H__FN_ZLIB_H diff --git a/client/frontier-cpp.cc b/client/frontier-cpp.cc index 9b98ac4..6dfc3e2 100644 --- a/client/frontier-cpp.cc +++ b/client/frontier-cpp.cc @@ -103,6 +103,10 @@ int frontier::init(const std::string& logfilename, const std::string& loglevel) return ret; } +void frontier::setThreadSafe() + { + frontier_setThreadSafe(); + } Connection::Connection(const std::string& server_url,const std::string* proxy_url) { diff --git a/client/frontier.c b/client/frontier.c index 018a2fc..9c32efb 100644 --- a/client/frontier.c +++ b/client/frontier.c @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -51,6 +52,36 @@ static void channel_delete(Channel *chn); static fn_client_cache_list *client_cache_list=0; +// See comments in frontier.h about thread safety +static pthread_mutex_t mutex_lock = PTHREAD_MUTEX_INITIALIZER; +static int threadsafe = 0; + +void frontier_init_lock() + { + pthread_mutex_lock(&mutex_lock); + } + +void frontier_init_unlock() + { + pthread_mutex_unlock(&mutex_lock); + } + +void frontier_lock() + { + if(threadsafe) pthread_mutex_lock(&mutex_lock); + } + +int frontier_unlock() + { + if(threadsafe) return(pthread_mutex_unlock(&mutex_lock)); + return 0; + } + +void frontier_setThreadSafe() + { + frontier_log(FRONTIER_LOGLEVEL_DEBUG,__FILE__,__LINE__,"set threadsafe"); + threadsafe = 1; + } // our own implementation of strndup char *frontier_str_ncopy(const char *str, size_t len) @@ -155,7 +186,12 @@ int frontier_init(void *(*f_mem_alloc)(size_t size),void (*f_mem_free)(void *ptr int frontier_initdebug(void *(*f_mem_alloc)(size_t size),void (*f_mem_free)(void *ptr), const char *logfilename, const char *loglevel) { - if(initialized) return FRONTIER_OK; + frontier_init_lock(); + if(initialized) + { + frontier_init_unlock(); + return FRONTIER_OK; + } if(!f_mem_alloc) {f_mem_alloc=malloc; f_mem_free=free;} if(!f_mem_free) {f_mem_alloc=malloc; f_mem_free=free;} @@ -203,6 +239,7 @@ int frontier_initdebug(void *(*f_mem_alloc)(size_t size),void (*f_mem_free)(void set_frontier_id(); initialized=1; + frontier_init_unlock(); return FRONTIER_OK; } @@ -220,13 +257,14 @@ static Channel *channel_create2(FrontierConfig *config, int *ec) int n,s; int longfresh=0; + frontier_lock(); chn=frontier_mem_alloc(sizeof(Channel)); if(!chn) { *ec=FRONTIER_EMEM; FRONTIER_MSG(*ec); if(config)frontierConfig_delete(config); - return (void*)0; + goto createfail; } bzero(chn,sizeof(Channel)); @@ -239,22 +277,19 @@ static Channel *channel_create2(FrontierConfig *config, int *ec) { *ec=FRONTIER_EMEM; FRONTIER_MSG(*ec); - channel_delete(chn); - return (void*)0; + goto createfail; } if(!chn->cfg->server_num) { *ec=FRONTIER_ECFG; frontier_setErrorMsg(__FILE__,__LINE__,"no servers configured"); - channel_delete(chn); - return (void*)0; + goto createfail; } chn->ht_clnt=frontierHttpClnt_create(ec); if(!chn->ht_clnt||*ec) { - channel_delete(chn); - return (void*)0; + goto createfail; } n=0; @@ -266,8 +301,7 @@ static Channel *channel_create2(FrontierConfig *config, int *ec) if(ret) { *ec=ret; - channel_delete(chn); - return (void*)0; + goto createfail; } n++; }while(frontierConfig_nextServer(chn->cfg)==0); @@ -276,8 +310,7 @@ static Channel *channel_create2(FrontierConfig *config, int *ec) { *ec=FRONTIER_ECFG; frontier_setErrorMsg(__FILE__,__LINE__,"no server configured"); - channel_delete(chn); - return (void*)0; + goto createfail; } if(frontierConfig_getBalancedServers(chn->cfg)) @@ -291,8 +324,7 @@ static Channel *channel_create2(FrontierConfig *config, int *ec) if(ret) { *ec=ret; - channel_delete(chn); - return (void*)0; + goto createfail; } }while(frontierConfig_nextProxy(chn->cfg)==0); @@ -307,8 +339,7 @@ static Channel *channel_create2(FrontierConfig *config, int *ec) { *ec=FRONTIER_EMEM; FRONTIER_MSG(*ec); - channel_delete(chn); - return (void*)0; + goto createfail; } // get the path component of one of the servers (they're all the same) @@ -337,18 +368,16 @@ static Channel *channel_create2(FrontierConfig *config, int *ec) { *ec=FRONTIER_EMEM; FRONTIER_MSG(*ec); - channel_delete(chn); frontier_mem_free(cache_listp); - return (void*)0; + goto createfail; } cache_listp->table=fn_inithashtable(); if(!cache_listp->table) { *ec=FRONTIER_EMEM; FRONTIER_MSG(*ec); - channel_delete(chn); frontier_mem_free(cache_listp); - return (void*)0; + goto createfail; } // tack the servlet name on the end, space was allocated above cache_listp->servlet=((char *)cache_listp)+sizeof(*cache_listp); @@ -382,8 +411,7 @@ static Channel *channel_create2(FrontierConfig *config, int *ec) { *ec=FRONTIER_EMEM; FRONTIER_MSG(*ec); - channel_delete(chn); - return (void*)0; + goto createfail; } *chn->ttlshort_suffix='\0'; *chn->ttllong_suffix='\0'; @@ -416,9 +444,19 @@ static Channel *channel_create2(FrontierConfig *config, int *ec) frontierHttpClnt_setWriteTimeoutSecs(chn->ht_clnt, frontierConfig_getWriteTimeoutSecs(chn->cfg)); + // initialize and save a gunzip stream for this thread + fn_gunzip_init(); + chn->zsave=fn_zsave(); + chn->ttl=2; // default time-to-live is "long" *ec=FRONTIER_OK; + frontier_unlock(); return chn; + +createfail: + if (chn) channel_delete(chn); + frontier_unlock(); + return (void*)0; } static Channel *channel_create(const char *srv,const char *proxy,int *ec) @@ -447,8 +485,9 @@ static void channel_delete(Channel *chn) RSA_free((RSA *)chn->serverrsakey[i]); if(chn->seqnum==chan_seqnum) frontier_statistics_stop_debug(); - frontier_mem_free(chn); + fn_zrestore(chn->zsave); fn_gzip_cleanup(); + frontier_mem_free(chn); frontier_log_close(); } @@ -476,7 +515,9 @@ FrontierChannel frontier_createChannel2(FrontierConfig* config, int *ec) { void frontier_closeChannel(FrontierChannel fchn) { + frontier_lock(); channel_delete((Channel*)fchn); + frontier_unlock(); } @@ -835,7 +876,11 @@ static int get_data(Channel *chn,const char *uri,const char *body,int curserver) chn->query_bytes=0; while(1) { + // frontierHttpClnt_read() can allow other threads to come in, so + // save & restore the unzip state around it. + chn->zsave=fn_zsave(); ret=frontierHttpClnt_read(chn->ht_clnt,buf,8192); + fn_zrestore(chn->zsave); if(ret<0) goto end; if(ret==0) break; #if 0 @@ -936,6 +981,8 @@ int frontier_postRawData(FrontierChannel u_channel,const char *uri,const char *b pid_t pid; char nowbuf[26]; + frontier_lock(); + fn_zrestore(chn->zsave); if((pid=getpid())!=frontier_pid) { pid_t oldpid; @@ -960,6 +1007,8 @@ int frontier_postRawData(FrontierChannel u_channel,const char *uri,const char *b if(!chn) { frontier_setErrorMsg(__FILE__,__LINE__,"wrong channel"); + chn->zsave=fn_zsave(); + frontier_unlock(); return FRONTIER_EIARG; } @@ -969,8 +1018,11 @@ int frontier_postRawData(FrontierChannel u_channel,const char *uri,const char *b { frontier_log(FRONTIER_LOGLEVEL_DEBUG,__FILE__,__LINE__,"HIT in %s client cache, skipping contacting server",chn->client_cache->servlet); ret=prepare_channel(chn,-1,0,0); - if(ret) return ret; - return write_data(chn->resp,hashval->data,hashval->len); + if (!ret) + ret=write_data(chn->resp,hashval->data,hashval->len); + chn->zsave=fn_zsave(); + frontier_unlock(); + return ret; } } @@ -1001,6 +1053,7 @@ int frontier_postRawData(FrontierChannel u_channel,const char *uri,const char *b ret=get_data(chn,uri,body,curserver); if(ret==FRONTIER_OK) { + frontier_log(FRONTIER_LOGLEVEL_DEBUG,__FILE__,__LINE__,"finalizing response on chan %d at %s",chn->seqnum,frontier_str_now(nowbuf)); ret=frontierResponse_finalize(chn->resp); frontier_turnErrorsIntoDebugs(0); frontier_log(FRONTIER_LOGLEVEL_DEBUG,__FILE__,__LINE__,"chan %d response %d finished at %s", @@ -1173,6 +1226,8 @@ int frontier_postRawData(FrontierChannel u_channel,const char *uri,const char *b if(ret!=FRONTIER_OK) frontierHttpClnt_clear(clnt); + chn->zsave=fn_zsave(); + frontier_unlock(); return ret; } diff --git a/client/frontier_config.c b/client/frontier_config.c index b63d393..6118a9a 100644 --- a/client/frontier_config.c +++ b/client/frontier_config.c @@ -49,6 +49,8 @@ static char *default_physical_servers=0; #define ENV_BUF_SIZE 1024 int frontier_pacparser_init(void); +void frontier_init_lock(); +void frontier_init_unlock(); static int getNumNonBackupProxies(FrontierConfig *cfg) { @@ -72,6 +74,8 @@ FrontierConfig *frontierConfig_get(const char *server_url,const char *proxy_url, bzero(cfg,sizeof(FrontierConfig)); + frontier_init_lock(); + // Set initial retrieve zip level first because it may be overridden // by a complex server string next. frontierConfig_setRetrieveZipLevel(cfg,frontierConfig_getDefaultRetrieveZipLevel()); @@ -217,10 +221,13 @@ FrontierConfig *frontierConfig_get(const char *server_url,const char *proxy_url, frontier_log(FRONTIER_LOGLEVEL_DEBUG,__FILE__,__LINE__,"Client cache max result size is %d",cfg->client_cache_max_result_size); frontier_log(FRONTIER_LOGLEVEL_DEBUG,__FILE__,__LINE__,"Failover to server is %s",cfg->failover_to_server?"yes":"no"); + frontier_init_unlock(); + return cfg; cleanup: frontierConfig_delete(cfg); + frontier_init_unlock(); return 0; } @@ -609,6 +616,8 @@ static int frontierConfig_parseComplexServerSpec(FrontierConfig *cfg,const char* frontierConfig_setCAPath(cfg,valp); else if(strcmp(keyp,"clientcachemaxresultsize")==0) frontierConfig_setClientCacheMaxResultSize(cfg,atoi(valp)); + else if((strcmp(keyp,"threadsafe")==0)&&(strcmp(valp,"yes")==0)) + frontier_setThreadSafe(); else { /* else ignore unrecognized keys */ diff --git a/client/http/fn-socket.c b/client/http/fn-socket.c index 44005fe..9354fce 100644 --- a/client/http/fn-socket.c +++ b/client/http/fn-socket.c @@ -14,6 +14,7 @@ */ #include +#include "../fn-internal.h" #include #include @@ -242,15 +243,23 @@ int frontier_write(int s,const char *buf, int len, int timeoutsecs,struct addrin int frontier_read(int s, char *buf, int size, int timeoutsecs,struct addrinfo *addr) { int ret; + int lockret,saveerrno; struct pollfd pfd; pfd.fd=s; pfd.events=POLLIN; + lockret=frontier_unlock(); do { pfd.revents=0; ret=poll(&pfd,1,timeoutsecs*1000); }while((ret<0)&&(errno==EINTR)); /*this loop is to support profiling*/ + if(lockret==0) + { + saveerrno=errno; + frontier_lock(); + errno=saveerrno; + } if(ret<0) { frontier_setErrorMsg(__FILE__,__LINE__,"system error %d on poll: %s",errno,strerror(errno)); diff --git a/client/include/frontier_client/frontier-cpp.h b/client/include/frontier_client/frontier-cpp.h index 8611fc7..f92fb74 100644 --- a/client/include/frontier_client/frontier-cpp.h +++ b/client/include/frontier_client/frontier-cpp.h @@ -74,6 +74,8 @@ int init(); // each level includes all messages at lower levels int init(const std::string& logfilename, const std::string& loglevel); +void setThreadSafe(); + // Enum sucks typedef unsigned char BLOB_TYPE; const BLOB_TYPE BLOB_BIT_NULL=(1<<7); diff --git a/client/include/frontier_client/frontier.h b/client/include/frontier_client/frontier.h index 9648e8f..b7c1cc1 100644 --- a/client/include/frontier_client/frontier.h +++ b/client/include/frontier_client/frontier.h @@ -12,6 +12,17 @@ * http://fermitools.fnal.gov/about/terms.html * */ +/* + * NOTEs on thread safety: + * frontier_init(), frontier_initdebug(), and frontier_createChannel() + * are always thread-safe. The other functions might not be unless + * the configuration option threadsafe=yes is set on any channel or + * the function frontier_setThreadSafe() is called. From then on all + * functions should be safe. Parallelism is limited however, and is + * primarily only allowed while waiting for input. + * With multiple threads, frontier_getErrorMsg() is not reliable outside + * of the client and may return a value from a different thread. + */ #ifndef __HEADER_H_FRONTIER_H #define __HEADER_H_FRONTIER_H @@ -74,6 +85,8 @@ char *frontier_str_copy(const char *str); void *frontier_malloc(size_t size); void frontier_free(void *ptr); +void frontier_setThreadSafe(); + // GZip and base64URL encode int fn_gzip_str2urlenc(const char *str,int size,char **out); diff --git a/client/test-req.cc b/client/test-req.cc index aac3439..cada415 100644 --- a/client/test-req.cc +++ b/client/test-req.cc @@ -22,6 +22,7 @@ #include #include #include +#include int do_main(int argc, char **argv); static std::string escape_list="\\\'"; @@ -64,27 +65,34 @@ static void print_usage(char **argv) std::cout<<" -n: do not print data\n"; std::cout<<" -c N: repeat the query N count times\n"; std::cout<<" -F N: fork after Nth repetition\n"; + std::cout<<" -t N: do the same query(ies) in N threads\n"; } + +static struct { + int ttl; + int do_print; + int repeat_count; + int fork_count; + int num_threads; + std::string sql; +} opts; + +static void *do_queries(void *param); int do_main(int argc, char **argv) { - //char vc; - int vi; - long long vl; - float vf; - double vd; - std::string *vs=0; - frontier::AnyData ad; char *file_name=0; int arg_ind; - int ttl=2; - int do_print=1; - int repeat_count=1; - int fork_count=0; - int idx; + int i; std::string sql(""); FrontierStatistics fstats; + opts.ttl=2; + opts.do_print=1; + opts.repeat_count=1; + opts.fork_count=0; + opts.num_threads=1; + try { frontier::init(); @@ -99,19 +107,25 @@ int do_main(int argc, char **argv) exit(0); } if(strcmp(argv[arg_ind],"-r")==0) - ttl=1; + opts.ttl=1; else if(strcmp(argv[arg_ind],"-R")==0) - ttl=3; + opts.ttl=3; else if(strcmp(argv[arg_ind],"-n")==0) - do_print=0; + opts.do_print=0; else if(argc>(arg_ind+1) && strcmp(argv[arg_ind],"-c")==0) { - repeat_count=atoi(argv[arg_ind+1]); + opts.repeat_count=atoi(argv[arg_ind+1]); arg_ind++; } else if(argc>(arg_ind+1) && strcmp(argv[arg_ind],"-F")==0) { - fork_count=atoi(argv[arg_ind+1]); + opts.fork_count=atoi(argv[arg_ind+1]); + arg_ind++; + } + else if(argc>(arg_ind+1) && strcmp(argv[arg_ind],"-t")==0) + { + frontier::setThreadSafe(); + opts.num_threads=atoi(argv[arg_ind+1]); arg_ind++; } else if(argc>(arg_ind+1) && strcmp(argv[arg_ind],"-f")==0) @@ -150,9 +164,68 @@ int do_main(int argc, char **argv) sql+=tmp; } if(file_name) {in_file.close();} - std::cout<<"Entered:\n"<1) + threads=new pthread_t[opts.num_threads-1]; + for(i=0; i 0) + { + std::cout << std::endl << "Read rate: " << std::endl << std::setprecision(4) + << fstats.bytes_per_query.avg / (float) fstats.msecs_per_query.avg << + " kbytes/sec" << std::endl; + } + frontier_statistics_stop(); + + return ret; + } + +void *do_queries(void *param) + { + //char vc; + int vi; + long long vl; + float vf; + double vd; + std::string *vs=0; + frontier::AnyData ad; + int idx; + + try + { + frontier::init(); + + std::cout<<"Entered:\n"< serverList; @@ -161,13 +234,13 @@ int do_main(int argc, char **argv) //frontier::DataSource ds(serverList, proxyList); frontier::Connection con(serverList, proxyList); - for(idx=0;idx0)&&(idx==fork_count)) + if((opts.fork_count>0)&&(idx==opts.fork_count)) fork(); frontier::Session ses(&con); - con.setTimeToLive(ttl); + con.setTimeToLive(opts.ttl); frontier::Request req(req_data,frontier::BLOB); req.addKey("p1",param); @@ -198,7 +271,7 @@ int do_main(int argc, char **argv) std::cout<<"\nResult contains "<< nrec<<" objects.\n"; while(ses.next()) { - if(!do_print)continue; + if(!opts.do_print)continue; for(int k=0;k 0) - { - std::cout << std::endl << "Read rate: " << std::endl << std::setprecision(4) - << fstats.bytes_per_query.avg / (float) fstats.msecs_per_query.avg << - " kbytes/sec" << std::endl; - } - frontier_statistics_stop(); - - return 0; + return (void *) 0; errexit: frontier_statistics_stop(); - return 1; + return (void *) 1; } From 146460fc152943046be9ad66d309726b720e3cec Mon Sep 17 00:00:00 2001 From: Dave Dykstra Date: Tue, 9 Jan 2018 16:38:51 -0600 Subject: [PATCH 2/4] move gunzip state into memdata structure --- client/fn-internal.h | 2 +- client/fn-zlib.c | 68 ++++++++++++++++++-------------------------- client/fn-zlib.h | 8 ++---- client/frontier.c | 14 --------- client/memdata.c | 6 ++-- 5 files changed, 35 insertions(+), 63 deletions(-) diff --git a/client/fn-internal.h b/client/fn-internal.h index c3dba43..19d8a2f 100644 --- a/client/fn-internal.h +++ b/client/fn-internal.h @@ -51,6 +51,7 @@ struct s_FrontierMemData int binzipped; unsigned char zipbuf[4096]; int zipbuflen; + void *zstate; }; typedef struct s_FrontierMemData FrontierMemData; FrontierMemData *frontierMemData_create(int zipped,int secured,const char *params1,const char *params2); @@ -144,7 +145,6 @@ struct s_Channel char *ttllong_suffix; char *ttlforever_suffix; int client_cache_maxsize; - void *zsave; void *serverrsakey[FRONTIER_MAX_SERVERN]; fn_query_stat query_stat; unsigned int query_bytes; diff --git a/client/fn-zlib.c b/client/fn-zlib.c index 3d51b12..f3aa71a 100644 --- a/client/fn-zlib.c +++ b/client/fn-zlib.c @@ -19,25 +19,6 @@ #include "zlib.h" static z_stream *dezstream=0; -static z_stream *inzstream=0; - -// Since inflates happen across multiple operations, for multithread -// support use fn_save to save the inflate stream when exiting the -// protected section and use fn_zrestore to restore it when entering -// the protected section. Initialize the value by calling -// fn_gunzip_init followed by fn_zsave. - -void *fn_zsave() - { - void *savep = (void *)inzstream; - inzstream=0; - return savep; - } - -void fn_zrestore(void *savep) - { - inzstream=(z_stream *)savep; - } static void *fn_zalloc(void *opaque,uInt items,uInt size) { @@ -49,7 +30,7 @@ static void fn_zfree(void *opaque,void *address) frontier_mem_free(address); } -static void fn_decleanup() +void fn_gzip_cleanup() { if(dezstream!=0) { @@ -59,22 +40,16 @@ static void fn_decleanup() } } -static void fn_incleanup() +void fn_gunzip_cleanup(void **inzstreamp) { - if(inzstream!=0) + if((inzstreamp != 0) && (*inzstreamp!=0)) { - inflateEnd(inzstream); - frontier_mem_free(inzstream); - inzstream=0; + inflateEnd((z_stream *)(*inzstreamp)); + frontier_mem_free(*inzstreamp); + *inzstreamp=0; } } -void fn_gzip_cleanup() - { - fn_decleanup(); - fn_incleanup(); - } - long fn_gzip_str(const char *src,long src_size,char *dest,long dest_size) { int ret; @@ -92,7 +67,7 @@ long fn_gzip_str(const char *src,long src_size,char *dest,long dest_size) ret=deflateInit(dezstream,9); if(ret!=Z_OK) { - fn_decleanup(); + fn_gzip_cleanup(); if(ret==Z_MEM_ERROR) return FN_ZLIB_E_NOMEM; return FN_ZLIB_E_OTHER; @@ -104,7 +79,7 @@ long fn_gzip_str(const char *src,long src_size,char *dest,long dest_size) ret=deflateReset(dezstream); if(ret!=Z_OK) { - fn_decleanup(); + fn_gzip_cleanup(); return FN_ZLIB_E_OTHER; } } @@ -120,7 +95,7 @@ long fn_gzip_str(const char *src,long src_size,char *dest,long dest_size) return dest_size-(long)dezstream->avail_out; } - fn_decleanup(); + fn_gzip_cleanup(); if(ret==Z_BUF_ERROR) return FN_ZLIB_E_SMALLBUF; return FN_ZLIB_E_OTHER; @@ -171,12 +146,16 @@ int fn_gzip_str2urlenc(const char *str,int size,char **out) return ret; } -int fn_gunzip_init() +int fn_gunzip_init(void **inzstreamp) { int ret=Z_OK; - if(inzstream==0) + if(inzstreamp==0) + return Z_MEM_ERROR; + + if(*inzstreamp==0) { + z_stream *inzstream; // open a stream and leave it open just like with deflate above inzstream=frontier_mem_alloc(sizeof(*inzstream)); if(inzstream==0) @@ -189,17 +168,18 @@ int fn_gunzip_init() ret=inflateInit(inzstream); if(ret!=Z_OK) { - fn_incleanup(); + fn_gunzip_cleanup((void **)&inzstream); return ret; } + *inzstreamp=inzstream; } else { // reuse existing stream - ret=inflateReset(inzstream); + ret=inflateReset((z_stream *)(*inzstreamp)); if(ret!=Z_OK) { - fn_incleanup(); + fn_gunzip_cleanup(inzstreamp); return ret; } } @@ -207,9 +187,15 @@ int fn_gunzip_init() return ret; } -int fn_gunzip_update(unsigned char *src,int *src_size,const unsigned char *dest,int *dest_size,int final) +int fn_gunzip_update(void **inzstreamp,unsigned char *src,int *src_size,const unsigned char *dest,int *dest_size,int final) { int ret; + z_stream *inzstream; + + if(inzstreamp==0) + return Z_MEM_ERROR; + inzstream=*inzstreamp; + inzstream->next_in=(Bytef *)src; inzstream->avail_in=(uLongf)*src_size; inzstream->next_out=(Bytef *)dest; @@ -231,7 +217,7 @@ int fn_gunzip_update(unsigned char *src,int *src_size,const unsigned char *dest, } // unsuccessful finish, clean up stream frontier_log(FRONTIER_LOGLEVEL_DEBUG,__FILE__,__LINE__,"final inflate error message: %s",inzstream->msg); - fn_incleanup(); + fn_gunzip_cleanup(inzstreamp); } else if(ret==Z_STREAM_END) { diff --git a/client/fn-zlib.h b/client/fn-zlib.h index 1dda8a3..239dfaf 100644 --- a/client/fn-zlib.h +++ b/client/fn-zlib.h @@ -24,10 +24,8 @@ long fn_gzip_str(const char *src,long src_size,char *dest,long dest_size); void fn_gzip_cleanup(); -int fn_gunzip_init(); -int fn_gunzip_update(unsigned char *src,int *src_size,const unsigned char *dest,int *dest_size,int final); - -void *fn_zsave(); -void fn_zrestore(void *savep); +int fn_gunzip_init(void **inzstreamp); +int fn_gunzip_update(void **inzstreamp,unsigned char *src,int *src_size,const unsigned char *dest,int *dest_size,int final); +void fn_gunzip_cleanup(void **inzstreamp); #endif //__H__FN_ZLIB_H diff --git a/client/frontier.c b/client/frontier.c index 9c32efb..c4de3c8 100644 --- a/client/frontier.c +++ b/client/frontier.c @@ -444,10 +444,6 @@ static Channel *channel_create2(FrontierConfig *config, int *ec) frontierHttpClnt_setWriteTimeoutSecs(chn->ht_clnt, frontierConfig_getWriteTimeoutSecs(chn->cfg)); - // initialize and save a gunzip stream for this thread - fn_gunzip_init(); - chn->zsave=fn_zsave(); - chn->ttl=2; // default time-to-live is "long" *ec=FRONTIER_OK; frontier_unlock(); @@ -485,8 +481,6 @@ static void channel_delete(Channel *chn) RSA_free((RSA *)chn->serverrsakey[i]); if(chn->seqnum==chan_seqnum) frontier_statistics_stop_debug(); - fn_zrestore(chn->zsave); - fn_gzip_cleanup(); frontier_mem_free(chn); frontier_log_close(); } @@ -876,11 +870,7 @@ static int get_data(Channel *chn,const char *uri,const char *body,int curserver) chn->query_bytes=0; while(1) { - // frontierHttpClnt_read() can allow other threads to come in, so - // save & restore the unzip state around it. - chn->zsave=fn_zsave(); ret=frontierHttpClnt_read(chn->ht_clnt,buf,8192); - fn_zrestore(chn->zsave); if(ret<0) goto end; if(ret==0) break; #if 0 @@ -982,7 +972,6 @@ int frontier_postRawData(FrontierChannel u_channel,const char *uri,const char *b char nowbuf[26]; frontier_lock(); - fn_zrestore(chn->zsave); if((pid=getpid())!=frontier_pid) { pid_t oldpid; @@ -1007,7 +996,6 @@ int frontier_postRawData(FrontierChannel u_channel,const char *uri,const char *b if(!chn) { frontier_setErrorMsg(__FILE__,__LINE__,"wrong channel"); - chn->zsave=fn_zsave(); frontier_unlock(); return FRONTIER_EIARG; } @@ -1020,7 +1008,6 @@ int frontier_postRawData(FrontierChannel u_channel,const char *uri,const char *b ret=prepare_channel(chn,-1,0,0); if (!ret) ret=write_data(chn->resp,hashval->data,hashval->len); - chn->zsave=fn_zsave(); frontier_unlock(); return ret; } @@ -1226,7 +1213,6 @@ int frontier_postRawData(FrontierChannel u_channel,const char *uri,const char *b if(ret!=FRONTIER_OK) frontierHttpClnt_clear(clnt); - chn->zsave=fn_zsave(); frontier_unlock(); return ret; } diff --git a/client/memdata.c b/client/memdata.c index 0594ead..80ba0a4 100644 --- a/client/memdata.c +++ b/client/memdata.c @@ -118,7 +118,8 @@ FrontierMemData *frontierMemData_create(int zipped,int secured,const char *param md->zipped_total=0; md->binzipped=zipped; md->zipbuflen=0; - fn_gunzip_init(); + md->zstate=0; + fn_gunzip_init(&md->zstate); return md; err: frontier_mem_free(md); @@ -141,6 +142,7 @@ void frontierMemData_delete(FrontierMemData *md) mb=nextmb; } + fn_gunzip_cleanup(&md->zstate); frontier_mem_free(md); } @@ -215,7 +217,7 @@ static int frontierMemData_append(FrontierMemData *md, int ret; sizeused=size2; spaceused=spaceleft2; - ret=fn_gunzip_update(p,&size2,p2,&spaceleft2,final); + ret=fn_gunzip_update(&md->zstate,p,&size2,p2,&spaceleft2,final); switch(ret) { case Z_OK: From 1a68aea7d77bb8dedaca6360d42c86c77314d7d0 Mon Sep 17 00:00:00 2001 From: Dave Dykstra Date: Wed, 20 Oct 2021 17:18:20 -0500 Subject: [PATCH 3/4] enable pthread error checking and FRONTIER_LOG_LEVEL=thread --- client/frontier.c | 47 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/client/frontier.c b/client/frontier.c index c4de3c8..69f5118 100644 --- a/client/frontier.c +++ b/client/frontier.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -53,11 +54,22 @@ static void channel_delete(Channel *chn); static fn_client_cache_list *client_cache_list=0; // See comments in frontier.h about thread safety -static pthread_mutex_t mutex_lock = PTHREAD_MUTEX_INITIALIZER; -static int threadsafe = 0; +static pthread_mutex_t mutex_lock; +static pthread_once_t mutex_init_control=PTHREAD_ONCE_INIT; +static pthread_mutexattr_t mutex_attr; +static int threadsafe=0; +static int thread_debug=0; + +static void init_mutex() + { + pthread_mutexattr_init(&mutex_attr); + pthread_mutexattr_settype(&mutex_attr,PTHREAD_MUTEX_ERRORCHECK); + pthread_mutex_init(&mutex_lock,&mutex_attr); + } void frontier_init_lock() { + pthread_once(&mutex_init_control,init_mutex); pthread_mutex_lock(&mutex_lock); } @@ -68,13 +80,31 @@ void frontier_init_unlock() void frontier_lock() { - if(threadsafe) pthread_mutex_lock(&mutex_lock); + if(!threadsafe) + return; + int lockret=pthread_mutex_lock(&mutex_lock); + if(lockret==0) + { + if(thread_debug) + frontier_log(FRONTIER_LOGLEVEL_DEBUG,__FILE__,__LINE__,"thread %d acquired lock",syscall(SYS_gettid)); + } + else + frontier_log(FRONTIER_LOGLEVEL_WARNING,__FILE__,__LINE__,"thread %d failed to acquire thread lock, error %d",syscall(SYS_gettid),lockret); } int frontier_unlock() { - if(threadsafe) return(pthread_mutex_unlock(&mutex_lock)); - return 0; + if(!threadsafe) + return 0; + int unlockret=pthread_mutex_unlock(&mutex_lock); + if(thread_debug) + { + if(unlockret==0) + frontier_log(FRONTIER_LOGLEVEL_DEBUG,__FILE__,__LINE__,"thread %d released lock",syscall(SYS_gettid)); + else + frontier_log(FRONTIER_LOGLEVEL_DEBUG,__FILE__,__LINE__,"thread %d failed to release thread lock, error %d",syscall(SYS_gettid),unlockret); + } + return unlockret; } void frontier_setThreadSafe() @@ -211,7 +241,12 @@ int frontier_initdebug(void *(*f_mem_alloc)(size_t size),void (*f_mem_free)(void if(strcasecmp(loglevel,"warning")==0 || strcasecmp(loglevel,"info")==0) frontier_log_level=FRONTIER_LOGLEVEL_WARNING; else if(strcasecmp(loglevel,"error")==0) frontier_log_level=FRONTIER_LOGLEVEL_ERROR; else if(strcasecmp(loglevel,"nolog")==0) frontier_log_level=FRONTIER_LOGLEVEL_NOLOG; - else frontier_log_level=FRONTIER_LOGLEVEL_DEBUG; + else + { + frontier_log_level=FRONTIER_LOGLEVEL_DEBUG; + if(strcasecmp(loglevel,"thread")==0) + thread_debug=1; + } if(!logfilename) { From 180c9d580b5a16f0f260e214a6d3f6cc83130883 Mon Sep 17 00:00:00 2001 From: Dave Dykstra Date: Wed, 20 Oct 2021 17:43:15 -0500 Subject: [PATCH 4/4] allow parallel threads while unpacking data --- client/http/fn-socket.c | 2 ++ client/include/frontier_client/frontier.h | 3 ++- client/memdata.c | 22 +++++++++++++++++++--- client/payload.c | 4 ++++ 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/client/http/fn-socket.c b/client/http/fn-socket.c index 9354fce..b9a1dd1 100644 --- a/client/http/fn-socket.c +++ b/client/http/fn-socket.c @@ -278,7 +278,9 @@ int frontier_read(int s, char *buf, int size, int timeoutsecs,struct addrinfo *a return FRONTIER_EUNKNOWN; } + lockret=frontier_unlock(); ret=recv(s,buf,size,0); + if(lockret==0)frontier_lock(); if(ret>=0) return ret; frontier_setErrorMsg(__FILE__,__LINE__,"system error %d: %s",errno,strerror(errno)); diff --git a/client/include/frontier_client/frontier.h b/client/include/frontier_client/frontier.h index b7c1cc1..fab6f81 100644 --- a/client/include/frontier_client/frontier.h +++ b/client/include/frontier_client/frontier.h @@ -19,7 +19,8 @@ * the configuration option threadsafe=yes is set on any channel or * the function frontier_setThreadSafe() is called. From then on all * functions should be safe. Parallelism is limited however, and is - * primarily only allowed while waiting for input. + * primarily only allowed while waiting for input and while doing + * the cpu-intensive unpacking of data. * With multiple threads, frontier_getErrorMsg() is not reliable outside * of the client and may return a value from a different thread. */ diff --git a/client/memdata.c b/client/memdata.c index 80ba0a4..94bcc3b 100644 --- a/client/memdata.c +++ b/client/memdata.c @@ -161,6 +161,10 @@ static FrontierMemBuf *frontierMemData_allocbuffer(FrontierMemData *md) // append new base64-encoded data: decode it, calculate the message digest // of the decoded data, and unzip it if it was zipped. Put the result in // membufs. If it is a final update, finalize the message digest & unzipping. +// This is the most cpu-intensive function in the frontier client so +// allow threading by cautiously releasing the lock. Be careful not to +// call anything that is thread-unsafe, including functions that can +// print or set an error message, while unlocked. static int frontierMemData_append(FrontierMemData *md, const unsigned char *buf,int size,int final) { @@ -170,6 +174,7 @@ static int frontierMemData_append(FrontierMemData *md, int sizeused,spaceused; unsigned char *p2=0; int size2,spaceleft2; + int lockret; // gets set to zero if the the lock was set, else -1 if(md->error!=FRONTIER_OK) { @@ -186,6 +191,7 @@ static int frontierMemData_append(FrontierMemData *md, spaceleft=sizeof(md->zipbuf)-md->zipbuflen; } + lockret=frontier_unlock(); while(1) { sizeused=size; @@ -201,6 +207,7 @@ static int frontierMemData_append(FrontierMemData *md, if((size==0)&&(!final)) { // all the incoming buffer was copied to the zipbuf + if(lockret==0) frontier_lock(); return FRONTIER_OK; } // else finished filling the zipbuf, update message digest @@ -227,14 +234,17 @@ static int frontierMemData_append(FrontierMemData *md, // so allocate another buffer and try again break; case Z_MEM_ERROR: + if(lockret==0) frontier_lock(); frontier_setErrorMsg(__FILE__,__LINE__,"unzip memory error"); md->error=FRONTIER_EMEM; return md->error; case Z_DATA_ERROR: + if(lockret==0) frontier_lock(); frontier_setErrorMsg(__FILE__,__LINE__,"unzip data error"); md->error=FRONTIER_EPROTO; return md->error; default: + if(lockret==0) frontier_lock(); frontier_setErrorMsg(__FILE__,__LINE__,"unzip unknown error"); md->error=FRONTIER_EUNKNOWN; return md->error; @@ -265,6 +275,7 @@ static int frontierMemData_append(FrontierMemData *md, mb=frontierMemData_allocbuffer(md); if(!mb) { + if(lockret==0) frontier_lock(); FRONTIER_MSG(FRONTIER_EMEM); return FRONTIER_EMEM; } @@ -277,7 +288,10 @@ static int frontierMemData_append(FrontierMemData *md, mb->len+=spaceused; md->total+=spaceused; if((size==0)&&!final) + { + if(lockret==0) frontier_lock(); return FRONTIER_OK; + } //else finished with this membuf, update message digest if(md->secured) (void)SHA256_Update(&md->sha256_ctx,((unsigned char *)mb)+sizeof(*mb),mb->len); @@ -289,9 +303,10 @@ static int frontierMemData_append(FrontierMemData *md, mb=frontierMemData_allocbuffer(md); if(!mb) { - md->error=FRONTIER_EMEM; - FRONTIER_MSG(md->error); - return md->error; + if(lockret==0) frontier_lock(); + md->error=FRONTIER_EMEM; + FRONTIER_MSG(md->error); + return md->error; } p=((unsigned char *)mb)+sizeof(*mb); spaceleft=MEMBUF_SIZE; @@ -303,6 +318,7 @@ static int frontierMemData_append(FrontierMemData *md, (void)SHA256_Final(md->sha256,&md->sha256_ctx); else (void)MD5_Final(md->md5,&md->md5_ctx); + if(lockret==0) frontier_lock(); return FRONTIER_OK; } } diff --git a/client/payload.c b/client/payload.c index 3047019..c61d4a5 100644 --- a/client/payload.c +++ b/client/payload.c @@ -92,6 +92,7 @@ int frontierPayload_finalize(FrontierPayload *fpl) int zipped_size=0; unsigned char *p; FrontierMemBuf *mb; + int lockret; fpl->blob = 0; fpl->error = FRONTIER_OK; @@ -147,6 +148,8 @@ int frontierPayload_finalize(FrontierPayload *fpl) fpl->error=FRONTIER_EMEM; goto errcleanup; } + // This is a time-consuming loop so release a thread lock + lockret=frontier_unlock(); mb=fpl->md->firstbuf; p=fpl->blob; while(mb!=0) @@ -155,6 +158,7 @@ int frontierPayload_finalize(FrontierPayload *fpl) p+=mb->len; mb=mb->nextbuf; } + if(lockret==0)frontier_lock(); zipped_size=fpl->md->zipped_total; frontierMemData_delete(fpl->md);