From 040379dbd188d3485eee011483fcc20386d54fde Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Wed, 28 Mar 2012 13:47:30 +0300 Subject: [PATCH] Per-user mapping for non-sqlmed clusters --- src/cluster.c | 191 +++++++++++++++++++++++++++++++++++++------------- src/execute.c | 26 +++++-- src/plproxy.h | 22 +++++- 3 files changed, 184 insertions(+), 55 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 257d855..5519dcf 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -108,14 +108,46 @@ static void conn_free(struct AANode *node, void *arg) { ProxyConnection *conn = (ProxyConnection *)node; + aatree_destroy(&conn->userstate_tree); if (conn->res) PQclear(conn->res); - if (conn->cur->db) - PQfinish(conn->cur->db); - pfree(conn->cur); pfree(conn); } +static int state_user_cmp(uintptr_t val, struct AANode *node) +{ + const char *name = (const char *)val; + const ProxyConnectionState *state = (ProxyConnectionState *)node; + + return strcmp(name, state->userinfo->username); +} + +static void state_free(struct AANode *node, void *arg) +{ + ProxyConnectionState *state = (ProxyConnectionState *)node; + if (state->db) + PQfinish(state->db); + memset(state, 0, sizeof(*state)); + pfree(state); +} + +static int userinfo_cmp(uintptr_t val, struct AANode *node) +{ + const char *name = (const char *)val; + const ConnUserInfo *info = (ConnUserInfo *)node; + + return strcmp(name, info->username); +} + +static void userinfo_free(struct AANode *node, void *arg) +{ + ConnUserInfo *info = (ConnUserInfo *)node; + pfree((void*)info->username); + pfree((void*)info->connstr); + memset(info, 0, sizeof(*info)); + pfree(info); +} + /* * Create cache memory area and prepare plans */ @@ -178,10 +210,12 @@ plproxy_cluster_plan_init(void) /* * Drop partition and connection data from cluster. */ + static void free_connlist(ProxyCluster *cluster) { aatree_destroy(&cluster->conn_tree); + aatree_destroy(&cluster->userinfo_tree); pfree(cluster->part_map); pfree(cluster->active_list); @@ -195,24 +229,11 @@ free_connlist(ProxyCluster *cluster) /* * Add new database connection if it does not exists. */ -static ProxyConnection * -add_connection(ProxyCluster *cluster, char *connstr, int part_num) +static void +add_connection(ProxyCluster *cluster, const char *connstr, int part_num) { struct AANode *node; ProxyConnection *conn = NULL; - char *username; - StringInfo final; - - final = makeStringInfo(); - appendStringInfoString(final, connstr); - - /* append current user if not specified in connstr */ - if (strstr(connstr, "user=") == NULL) - { - username = GetUserNameFromId(GetSessionUserId()); - appendStringInfo(final, " user=%s", username); - } - connstr = final->data; /* check if already have it */ node = aatree_search(&cluster->conn_tree, (uintptr_t)connstr); @@ -225,14 +246,13 @@ add_connection(ProxyCluster *cluster, char *connstr, int part_num) conn = MemoryContextAllocZero(cluster_mem, sizeof(ProxyConnection)); conn->connstr = MemoryContextStrdup(cluster_mem, connstr); conn->cluster = cluster; - conn->cur = MemoryContextAllocZero(cluster_mem, sizeof(ProxyConnectionState)); + + aatree_init(&conn->userstate_tree, state_user_cmp, state_free); aatree_insert(&cluster->conn_tree, (uintptr_t)connstr, &conn->node); } cluster->part_map[part_num] = conn; - - return conn; } /* @@ -709,6 +729,7 @@ static void inval_fserver(struct AANode *n, void *arg) cluster->needs_reload = true; } +static void inval_one_umap(struct AANode *n, void *arg) static void inval_umapping(struct AANode *n, void *arg) { ProxyCluster *cluster = (ProxyCluster *)n; @@ -787,21 +808,47 @@ new_cluster(const char *name) cluster = palloc0(sizeof(*cluster)); cluster->name = pstrdup(name); - cluster->needs_reload = true; aatree_init(&cluster->conn_tree, conn_cstr_cmp, conn_free); + aatree_init(&cluster->userinfo_tree, userinfo_cmp, userinfo_free); MemoryContextSwitchTo(old_ctx); return cluster; } +static void init_cluster_user(ProxyCluster *cluster, const char *username) +{ + ConnUserInfo *userinfo; + StringInfo tmp; + struct AANode *node; + + node = aatree_search(&cluster->userinfo_tree, (uintptr_t)username); + if (node) { + userinfo = (ConnUserInfo *)node; + } else { + tmp = makeStringInfo(); + appendStringInfo(tmp, "user=%s", username); + userinfo = MemoryContextAllocZero(cluster_mem, sizeof(*userinfo)); + userinfo->username = MemoryContextStrdup(cluster_mem, username); + userinfo->connstr = MemoryContextStrdup(cluster_mem, tmp->data); + } + + cluster->cur_userinfo = userinfo; +} + /* * Refresh the cluster. */ static void refresh_cluster(ProxyFunction *func, ProxyCluster *cluster) { + const char *username; + Oid user_oid; + + user_oid = GetSessionUserId(); + username = GetUserNameFromId(user_oid); + #ifdef PLPROXY_USE_SQLMED if (cluster->needs_reload) { @@ -823,10 +870,13 @@ refresh_cluster(ProxyFunction *func, ProxyCluster *cluster) #endif /* Either no SQL/MED support or no such foreign server */ - if (!cluster->sqlmed_cluster) + if (!cluster->sqlmed_cluster && !cluster->fake_cluster) reload_plproxy_cluster(func, cluster); + init_cluster_user(cluster, username); + cluster->needs_reload = false; + pfree((void*)username); } /* @@ -836,40 +886,37 @@ static ProxyCluster * fake_cluster(ProxyFunction *func, const char *connect_str) { ProxyCluster *cluster; - ProxyConnection *conn; MemoryContext old_ctx; struct AANode *n; /* search if cached */ n = aatree_search(&fake_cluster_tree, (uintptr_t)connect_str); if (n) - return (ProxyCluster *)n; + { + cluster = (ProxyCluster *)n; + goto done; + } /* create if not */ cluster = new_cluster(connect_str); old_ctx = MemoryContextSwitchTo(cluster_mem); - cluster->needs_reload = 0; + cluster->fake_cluster = true; cluster->version = 1; cluster->part_count = 1; cluster->part_mask = 0; cluster->part_map = palloc(cluster->part_count * sizeof(ProxyConnection *)); cluster->active_list = palloc(cluster->part_count * sizeof(ProxyConnection *)); - conn = palloc0(sizeof(ProxyConnection)); - conn->cluster = cluster; - conn->connstr = pstrdup(cluster->name); - conn->cur = palloc0(sizeof(ProxyConnectionState)); - conn->cur->state = C_NONE; - - aatree_insert(&cluster->conn_tree, (uintptr_t)conn->connstr, &conn->node); - cluster->part_map[0] = conn; - MemoryContextSwitchTo(old_ctx); + add_connection(cluster, connect_str, 0); + aatree_insert(&fake_cluster_tree, (uintptr_t)connect_str, &cluster->node); +done: + refresh_cluster(func, cluster); return cluster; } @@ -942,6 +989,7 @@ plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo) if (!cluster) { cluster = new_cluster(name); + cluster->needs_reload = true; aatree_insert(&cluster_tree, (uintptr_t)name, &cluster->node); } @@ -951,26 +999,53 @@ plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo) return cluster; } +/* + * Move connection to active list and init current + * connection state. + */ +void plproxy_activate_connection(struct ProxyConnection *conn) +{ + ProxyCluster *cluster = conn->cluster; + ConnUserInfo *userinfo = cluster->cur_userinfo; + const char *username = userinfo->username; + struct AANode *node; + ProxyConnectionState *cur; + + /* move connection to active_list */ + cluster->active_list[cluster->active_count] = conn; + cluster->active_count++; + + /* fill ->cur pointer */ + + node = aatree_search(&conn->userstate_tree, (uintptr_t)username); + if (node) { + cur = (ProxyConnectionState *)node; + } else { + cur = MemoryContextAlloc(cluster_mem, sizeof(*cur)); + cur->userinfo = userinfo; + aatree_insert(&conn->userstate_tree, (uintptr_t)username, &cur->node); + } + conn->cur = cur; +} + /* * Clean old connections and results from all clusters. */ -static void clean_conn(struct AANode *node, void *arg) +struct MaintInfo { + struct ProxyConfig *cf; + struct timeval *now; +}; + +static void clean_state(struct AANode *node, void *arg) { - ProxyConnection *conn = (ProxyConnection *)node; - ProxyConnectionState *cur; - ProxyConfig *cf = &conn->cluster->config; - struct timeval *now = arg; + ProxyConnectionState *cur = (ProxyConnectionState *)node; + struct MaintInfo *maint = arg; + ProxyConfig *cf = maint->cf; + struct timeval *now = maint->now; time_t age; bool drop; - if (conn->res) - { - PQclear(conn->res); - conn->res = NULL; - } - - cur = conn->cur; if (!cur->db) return; @@ -998,12 +1073,29 @@ static void clean_conn(struct AANode *node, void *arg) } } +static void clean_conn(struct AANode *node, void *arg) +{ + ProxyConnection *conn = (ProxyConnection *)node; + struct MaintInfo *maint = arg; + + if (conn->res) + { + PQclear(conn->res); + conn->res = NULL; + } + + aatree_walk(&conn->userstate_tree, AA_WALK_IN_ORDER, clean_state, maint); +} + static void clean_cluster(struct AANode *n, void *arg) { ProxyCluster *cluster = (ProxyCluster *)n; - struct timeval *now = arg; + struct MaintInfo maint; - aatree_walk(&cluster->conn_tree, AA_WALK_IN_ORDER, clean_conn, now); + maint.cf = &cluster->config; + maint.now = arg; + + aatree_walk(&cluster->conn_tree, AA_WALK_IN_ORDER, clean_conn, &maint); } void @@ -1012,3 +1104,4 @@ plproxy_cluster_maint(struct timeval * now) aatree_walk(&cluster_tree, AA_WALK_IN_ORDER, clean_cluster, now); aatree_walk(&fake_cluster_tree, AA_WALK_IN_ORDER, clean_cluster, now); } + diff --git a/src/execute.c b/src/execute.c index e332a09..320536f 100644 --- a/src/execute.c +++ b/src/execute.c @@ -375,11 +375,26 @@ handle_notice(void *arg, const PGresult *res) plproxy_remote_error(cluster->cur_func, conn, res, false); } +static const char * +get_connstr(ProxyConnection *conn) +{ + StringInfoData cstr; + ConnUserInfo *info = conn->cluster->cur_userinfo; + + if (strstr(conn->connstr, "user=") != NULL) + return pstrdup(conn->connstr); + + initStringInfo(&cstr); + appendStringInfo(&cstr, "%s %s", conn->connstr, info->connstr); + return cstr.data; +} + /* check existing conn status or launch new conn */ static void prepare_conn(ProxyFunction *func, ProxyConnection *conn) { struct timeval now; + const char *connstr; gettimeofday(&now, NULL); @@ -409,7 +424,8 @@ prepare_conn(ProxyFunction *func, ProxyConnection *conn) conn->cur->connect_time = now.tv_sec; /* launch new connection */ - conn->cur->db = PQconnectStart(conn->connstr); + connstr = get_connstr(conn); + conn->cur->db = PQconnectStart(connstr); if (conn->cur->db == NULL) plproxy_error(func, "No memory for PGconn"); @@ -805,10 +821,8 @@ static void tag_part(struct ProxyCluster *cluster, int i, int tag) ProxyConnection *conn = cluster->part_map[i]; if (!conn->run_tag) - { - cluster->active_list[cluster->active_count] = conn; - cluster->active_count++; - } + plproxy_activate_connection(conn); + conn->run_tag = tag; } @@ -1130,6 +1144,8 @@ plproxy_clean_results(ProxyCluster *cluster) conn->pos = 0; conn->run_tag = 0; conn->bstate = NULL; + conn->cur = NULL; + cluster->active_list[i] = NULL; } /* reset active_list */ diff --git a/src/plproxy.h b/src/plproxy.h index f9e4573..a2c7e95 100644 --- a/src/plproxy.h +++ b/src/plproxy.h @@ -136,7 +136,21 @@ typedef struct ProxyConfig int keepcnt; } ProxyConfig; +typedef struct ConnUserInfo { + struct AANode node; + + const char *username; + const char *connstr; + + SysCacheStamp umStamp; + bool needs_reload; +} ConnUserInfo; + typedef struct ProxyConnectionState { + struct AANode node; /* node head in user->state tree */ + + ConnUserInfo *userinfo; + PGconn *db; /* libpq connection handle */ ConnState state; /* Connection state */ time_t connect_time; /* When connection was started */ @@ -153,6 +167,8 @@ typedef struct ProxyConnection struct ProxyCluster *cluster; const char *connstr; /* Connection string for libpq */ + struct AATree userstate_tree; /* user->state tree */ + /* state */ PGresult *res; /* last resultset */ int pos; /* Current position inside res */ @@ -194,10 +210,14 @@ typedef struct ProxyCluster struct AATree conn_tree; /* connstr -> ProxyConnection */ + struct AATree userinfo_tree; /* username->userinfo tree */ + ConnUserInfo *cur_userinfo; /* userinfo struct for current request */ + int ret_cur_conn; /* Result walking: index of current conn */ int ret_cur_pos; /* Result walking: index of current row */ int ret_total; /* Result walking: total rows left */ + bool fake_cluster; /* single connect-string cluster */ bool sqlmed_cluster; /* True if the cluster is defined using SQL/MED */ bool needs_reload; /* True if the cluster partition list should be reloaded */ bool busy; /* True if the cluster is already involved in execution */ @@ -207,7 +227,6 @@ typedef struct ProxyCluster * Used in to perform cluster invalidation in syscache callbacks. */ SysCacheStamp clusterStamp; - SysCacheStamp umStamp; /* notice processing: provide info about currently executing function */ struct ProxyFunction *cur_func; @@ -398,6 +417,7 @@ void plproxy_cluster_cache_init(void); void plproxy_syscache_callback_init(void); ProxyCluster *plproxy_find_cluster(ProxyFunction *func, FunctionCallInfo fcinfo); void plproxy_cluster_maint(struct timeval * now); +void plproxy_activate_connection(struct ProxyConnection *conn); /* result.c */ Datum plproxy_result(ProxyFunction *func, FunctionCallInfo fcinfo); -- 2.39.5