From: Christopher Browne Date: Fri, 3 Dec 2010 22:11:37 +0000 (-0500) Subject: Remove SNMP references from codebase. X-Git-Tag: REL_2_1_0_B1~70 X-Git-Url: http://git.postgresql.org/gitweb/static/gitweb.js?a=commitdiff_plain;h=bf7377d0f7be8df21cfce17e7ac428bea3390475;p=slony1-engine.git Remove SNMP references from codebase. Ran thru test1 successfully after removal/autoconf/configure/build --- diff --git a/Makefile.global.in b/Makefile.global.in index f3d9f22e..ba1884e8 100644 --- a/Makefile.global.in +++ b/Makefile.global.in @@ -6,7 +6,7 @@ # Copyright (c) 2003-2009, PostgreSQL Global Development Group # Author: Jan Wieck, Afilias USA INC. # -# +# # ---------- # PostgreSQL Version @@ -18,7 +18,7 @@ PG_VERSION= @PG_VERSION@ pgincludedir= @PGINCLUDEDIR@ pgincludeserverdir= @PGINCLUDESERVERDIR@ -pglibdir= @PGLIBDIR@ +pglibdir= @PGLIBDIR@ pgpkglibdir= @PGPKGLIBDIR@ pgsharedir= @PGSHAREDIR@ pgbindir= @PGBINDIR@ @@ -28,10 +28,10 @@ distdir= @SLONYPATH@/@PACKAGE_NAME@-@PACKAGE_VERSION@ prefix= @prefix@ sysconfdir= @sysconfdir@ -host_os= @HOST_OS@ +host_os= @HOST_OS@ host_cpu= @host_cpu@ -PORTNAME= @PORTNAME@ +PORTNAME= @PORTNAME@ PACKAGE_NAME= @PACKAGE_NAME@ VERSION= @PACKAGE_VERSION@ @@ -44,7 +44,7 @@ FLEX= @LEX@ #build tool flags CFLAGS= @CFLAGS@ -PTHREAD_CFLAGS= @PTHREAD_CFLAGS@ +PTHREAD_CFLAGS= @PTHREAD_CFLAGS@ PTHREAD_LIBS= @PTHREAD_LIBS@ YFLAGS= @YFLAGS@ FLEXFLAGS= @LEXFLAGS@ $(LFLAGS) @@ -53,11 +53,6 @@ enable_rpath= @enable_rpath@ #libs PTHREAD_LIBS= @PTHREAD_LIBS@ -#netsnmp with_netsnmp -HAVE_NETSNMP= @HAVE_NETSNMP@ -NETSNMP_CFLAGS= @NETSNMP_CFLAGS@ -NETSNMP_AGENTLIBS= @NETSNMP_AGENTLIBS@ - # Documentation JADE= @JADE@ NSGMLS= @NSGMLS@ @@ -87,7 +82,7 @@ override CPPFLAGS := -I${pgincludedir} -I${pgincludeserverdir} $(CPPFLAGS) LDFLAGS = -L${pglibdir} -L${pgpkglibdir} -lpq @NLSLIB@ ifeq ($(GCC), yes) - CFLAGS += -Wall -Wmissing-prototypes -Wmissing-declarations + CFLAGS += -Wall -Wmissing-prototypes -Wmissing-declarations endif # Installation. diff --git a/config.h.in b/config.h.in index 7b04a598..90980291 100644 --- a/config.h.in +++ b/config.h.in @@ -6,7 +6,7 @@ * Copyright (c) 2003-2010, PostgreSQL Global Development Group * Author: Jan Wieck, Afilias USA INC. * - * + * * ---------- */ #ifndef SLONY_I_CONFIG_H @@ -24,46 +24,6 @@ #undef PGSHARE -#undef HAVE_NETSNMP -#ifdef HAVE_NETSNMP - -/* Define to 1 if the system has the type `uint32_t'. */ -#undef HAVE_UINT32_T - -/* Define to 1 if the system has the type `uint64_t'. */ -#undef HAVE_UINT64_T - -/* Define to 1 if the system has the type `ssize_t'. */ -#undef HAVE_SSIZE_T - -/* Define to 1 if the system has the type `int32_t'. */ -#undef HAVE_INT32_T - -/* Define to 1 if the system has the type `int64_t'. */ -#undef HAVE_INT64_T - -/* Define to 1 if you have the header file. */ -#undef HAVE_STDARG_H - -/* The size of a `int', as computed by sizeof. */ -#undef SIZEOF_INT - -/* The size of a `long', as computed by sizeof. */ -#undef SIZEOF_LONG - -/* The size of a `long long', as computed by sizeof. */ -#undef SIZEOF_LONG_LONG - -/* The size of a `short', as computed by sizeof. */ -#undef SIZEOF_SHORT - -#define NETSNMP_IMPORT extern -#define NETSNMP_INLINE -#define RETSIGTYPE void -#define NET_SNMP_CONFIG_H -#endif - - /* Set to 1 if libpq contains PQfreemem() */ #undef HAVE_PQFREEMEM #ifndef HAVE_PQFREEMEM diff --git a/configure.ac b/configure.ac index 86b19484..95f51489 100644 --- a/configure.ac +++ b/configure.ac @@ -1,4 +1,3 @@ - # ---------- # configure.ac # @@ -37,7 +36,7 @@ case $host_os in freebsd*) template=freebsd ;; hpux*) template=hpux ;; irix*) template=irix ;; - linux*|gnu*|k*bsd*-gnu) + linux*|gnu*|k*bsd*-gnu) template=linux ;; mingw*) template=win32 ;; netbsd*) template=netbsd ;; @@ -175,7 +174,7 @@ AC_HEADER_STDC AC_CHECK_HEADERS([fcntl.h]) AC_CHECK_HEADERS([limits.h]) AC_CHECK_HEADERS([stddef.h]) -AC_CHECK_HEADERS([sys/socket.h]) +AC_CHECK_HEADERS([sys/socket.h]) AC_CHECK_HEADERS([sys/time.h]) AC_CHECK_HEADERS([inttypes.h]) @@ -206,7 +205,6 @@ AC_ARG_WITH(pgincludeserverdir, [ --with-pgincludeserverdir= Location of AC_ARG_WITH(pglibdir, [ --with-pglibdir= Location of the PostgreSQL libs. ]) AC_ARG_WITH(pgpkglibdir, [ --with-pgpkglibdir= Location of the PostgreSQL pkglibs. E.g. plpgsql.so ]) AC_ARG_WITH(pgsharedir, [ --with-pgsharedir= Location of the PostgreSQL share dir. E.g. postgresql.conf.sample ]) -AC_ARG_WITH(netsnmp, [ --with-netsnmp= Enable snmp support is the location of net-snmp-config. **EXPERIMENTAL** ]) AC_ARG_WITH(perltools, [ --with-perltools= Location to install the perl management tools. Default $PREFIX/bin. ]) AC_ARG_WITH(perlsharedir, [ --with-perlsharedir= Location to install slon-tools.pm. Default $pglibdir. ]) AC_ARG_WITH(docs, [ --with-docs= Build the sgml documentation [default=no]]) @@ -216,7 +214,7 @@ AC_ARG_WITH(mandir, [ --with-mandir= Location to install the SLON_AC_ARG_BOOL(enable, engine, yes, - [ --disable-engine Don't build slony1-engine source. (Used when building documentation only)]) + [ --disable-engine Don't build slony1-engine source. (Used when building documentation only)]) AC_SUBST(enable_engine) @@ -234,7 +232,6 @@ AC_MSG_RESULT($enable_engine) if test "$enable_engine" = "yes"; then ACX_LIBPQ() -ACX_LIBSNMP() ACX_SLONYTOOLS() AC_SUBST(PG_VERSION_MAJOR, $PG_VERSION_MAJOR) @@ -247,9 +244,6 @@ AC_SUBST(PGLIBDIR, $PG_LIBDIR) AC_SUBST(PGPKGLIBDIR, $PG_PKGLIBDIR) AC_SUBST(PGSHAREDIR, $PG_SHAREDIR) AC_SUBST(PGBINDIR, $PG_BINDIR) -AC_SUBST(HAVE_NETSNMP, $HAVE_NETSNMP) -AC_SUBST(NETSNMP_CFLAGS, $NETSNMP_CFLAGS) -AC_SUBST(NETSNMP_AGENTLIBS, $NETSNMP_AGENTLIBS) AC_SUBST(TOOLSBIN, $TOOLSBIN) AC_SUBST(SLONYPATH) @@ -276,45 +270,45 @@ AC_SUBST(with_docs, $with_docs) AC_MSG_CHECKING(if you have requested documentation building) if test "$with_docs" = "yes"; then - AC_MSG_RESULT($with_docs) - - # ---- - # Tools for building docs - # --- - - SLON_AC_PROG_GROFF - SLON_AC_PROG_PS2PDF - SLON_AC_PROG_DJPEG - SLON_AC_PROG_PNMTOPS - SLON_AC_PROG_CONVERT - SLON_AC_PROG_PGAUTODOC - - # - # Check for DocBook and tools - # - SLON_AC_PROG_NSGMLS - SLON_AC_PROG_SGMLSPL - SLON_AC_PROG_D2M - - SLON_AC_PROG_JADE - SLON_AC_CHECK_DOCBOOK(4.2) - SLON_AC_PATH_DOCBOOK_STYLESHEETS - SLON_AC_PATH_COLLATEINDEX - - AC_SUBST(docdir, $docdir) - AC_SUBST(mandir, $mandir) + AC_MSG_RESULT($with_docs) + + # ---- + # Tools for building docs + # --- + + SLON_AC_PROG_GROFF + SLON_AC_PROG_PS2PDF + SLON_AC_PROG_DJPEG + SLON_AC_PROG_PNMTOPS + SLON_AC_PROG_CONVERT + SLON_AC_PROG_PGAUTODOC + + # + # Check for DocBook and tools + # + SLON_AC_PROG_NSGMLS + SLON_AC_PROG_SGMLSPL + SLON_AC_PROG_D2M + + SLON_AC_PROG_JADE + SLON_AC_CHECK_DOCBOOK(4.2) + SLON_AC_PATH_DOCBOOK_STYLESHEETS + SLON_AC_PATH_COLLATEINDEX + + AC_SUBST(docdir, $docdir) + AC_SUBST(mandir, $mandir) else AC_MSG_RESULT(no) fi if test x"$with_perlsharedir" = x""; then - with_perlsharedir="$PG_LIBDIR" + with_perlsharedir="$PG_LIBDIR" fi AC_SUBST(perlsharedir, $with_perlsharedir) AC_CONFIG_FILES([ - Makefile.global GNUmakefile + Makefile.global GNUmakefile ]) AC_OUTPUT([ diff --git a/src/slon/Makefile b/src/slon/Makefile index 1e79036e..c074c728 100644 --- a/src/slon/Makefile +++ b/src/slon/Makefile @@ -4,7 +4,7 @@ # Copyright (c) 2003-2009, PostgreSQL Global Development Group # Author: Jan Wieck, Afilias USA INC. # -# +# # ---------- slony_subdir = src/slon @@ -13,7 +13,6 @@ SLFILEDESC="Slony replication engine" include $(slony_top_builddir)/Makefile.global SFILES=$(wildcard *.c) -SFILES2=$(filter-out snmp_thread.c, $(SFILES)) CC = $(PTHREAD_CC) override CFLAGS += $(PTHREAD_CFLAGS) -I$(slony_top_builddir) -I$(slony_top_builddir)/$(slony_subdir) @@ -29,24 +28,19 @@ PROG = slon.exe endif OBJS = \ - slon.o \ - runtime_config.o \ - local_listen.o \ - remote_listen.o \ - remote_worker.o \ - sync_thread.o \ - cleanup_thread.o \ - scheduler.o \ - dbutils.o \ - conf-file.o \ - confoptions.o \ - misc.o \ - ../parsestatements/scanner.o - -ifdef HAVE_NETSNMP -OBJS+= snmp_thread.o -override LDFLAGS+= ${NETSNMP_AGENTLIBS} -endif + slon.o \ + runtime_config.o \ + local_listen.o \ + remote_listen.o \ + remote_worker.o \ + sync_thread.o \ + cleanup_thread.o \ + scheduler.o \ + dbutils.o \ + conf-file.o \ + confoptions.o \ + misc.o \ + ../parsestatements/scanner.o ifeq ($(PORTNAME), win32) OBJS += port/pipe.o port/win32service.o $(WIN32RES) @@ -57,12 +51,12 @@ endif DISTFILES = Makefile README $(wildcard *.c) $(wildcard port/*.c) $(wildcard *.h) $(wildcard *.l) $(wildcard *.y) ALL = \ - $(PROG) + $(PROG) all: $(ALL) -$(PROG): $(OBJS) +$(PROG): $(OBJS) $(CC) $(CFLAGS) -o $(PROG) $(OBJS) $(PTHREAD_CFLAGS) $(LDFLAGS) cleanup_thread.o: cleanup_thread.c slon.h @@ -78,15 +72,12 @@ sync_thread.o: sync_thread.c slon.h conf-file.o: conf-file.c slon.h confoptions.h confoptions.o: confoptions.c slon.h confoptions.h -snmp_thread.o: snmp_thread.c slon.h - $(CC) $(PTHREAD_CFLAGS) $(CFLAGS) $(NETSNMP_CFLAGS) -DHAVE_NETSNMP -c $< -o $@ - conf-file.c: conf-file.l ifdef FLEX $(FLEX) $(FLEXFLAGS) -o'$@' $< else - @echo "Missing flex $< $@" - @exit + @echo "Missing flex $< $@" + @exit endif clean distclean: @@ -103,12 +94,11 @@ installdirs: $(mkinstalldirs) $(DESTDIR)$(slonbindir) splint: - splint -I $(pgincludedir) -I $(pgincludeserverdir) +unixlib -preproc +skip-sys-headers $(SFILES2) + splint -I $(pgincludedir) -I $(pgincludeserverdir) +unixlib -preproc +skip-sys-headers $(SFILES) distdir: $(DISTFILES) mkdir $(distdir)/$(subdir) -chmod 777 $(distdir)/$(subdir) for file in $(DISTFILES) ; do \ - cp $$file $(distdir)/$(subdir)/$$file || exit; \ - done - + cp $$file $(distdir)/$(subdir)/$$file || exit; \ + done diff --git a/src/slon/slon.c b/src/slon/slon.c index 6085657d..2e7aa3a3 100644 --- a/src/slon/slon.c +++ b/src/slon/slon.c @@ -6,7 +6,7 @@ * Copyright (c) 2003-2009, PostgreSQL Global Development Group * Author: Jan Wieck, Afilias USA INC. * - * + * *------------------------------------------------------------------------- */ @@ -35,14 +35,14 @@ #include "libpq-fe.h" #include "c.h" - + #include "slon.h" #include "confoptions.h" -/* ---------- - * Global data +/* ---------- + * Global data * ---------- */ #ifndef WIN32 @@ -58,8 +58,8 @@ pthread_mutex_t slon_wait_listen_lock; pthread_cond_t slon_wait_listen_cond; int slon_listen_started=0; -/* ---------- - * Local data +/* ---------- + * Local data * ---------- */ static void slon_exit(int code); @@ -67,10 +67,6 @@ static pthread_t local_event_thread; static pthread_t local_cleanup_thread; static pthread_t local_sync_thread; -#ifdef HAVE_NETSNMP -static pthread_t local_snmp_thread; -#endif - static pthread_t main_thread; static char *const *main_argv; @@ -85,12 +81,12 @@ typedef void (*sighandler_t)(int); static sighandler_t install_signal_handler(int signum, sighandler_t handler); int slon_log_level; -char *pid_file; -char *archive_dir = NULL; +char *pid_file; +char *archive_dir = NULL; static int child_status; /** - * A variable to indicate that the + * A variable to indicate that the * worker has been restarted by the watchdog. */ int worker_restarted=0; @@ -102,1006 +98,991 @@ int worker_restarted=0; void Usage(char *const argv[]) { - fprintf(stderr, "usage: %s [options] clustername conninfo\n", argv[0]); - fprintf(stderr, "\n"); - fprintf(stderr, "Options:\n"); - - fprintf(stderr, " -h print usage message and exit\n"); - fprintf(stderr, " -v print version and exit\n"); - fprintf(stderr, " -d verbosity of logging (1..4)\n"); - fprintf(stderr, " -s SYNC check interval (default 10000)\n"); - fprintf(stderr, " -t SYNC interval timeout (default 60000)\n"); - fprintf(stderr, " -o desired subscriber SYNC processing time\n"); - fprintf(stderr, " -g maximum SYNC group size (default 6)\n"); - fprintf(stderr, " -c how often to vacuum in cleanup cycles\n"); - fprintf(stderr, " -p slon pid file\n"); - fprintf(stderr, " -f slon configuration file\n"); - fprintf(stderr, " -a directory to store SYNC archive files\n"); - fprintf(stderr, " -x program to run after writing archive file\n"); - fprintf(stderr, " -q Terminate when this node reaches # of SYNCs\n"); - fprintf(stderr, " -r # of syncs for -q option\n"); - fprintf(stderr, " -l this node should lag providers by this interval\n"); + fprintf(stderr, "usage: %s [options] clustername conninfo\n", argv[0]); + fprintf(stderr, "\n"); + fprintf(stderr, "Options:\n"); + + fprintf(stderr, " -h print usage message and exit\n"); + fprintf(stderr, " -v print version and exit\n"); + fprintf(stderr, " -d verbosity of logging (1..4)\n"); + fprintf(stderr, " -s SYNC check interval (default 10000)\n"); + fprintf(stderr, " -t SYNC interval timeout (default 60000)\n"); + fprintf(stderr, " -o desired subscriber SYNC processing time\n"); + fprintf(stderr, " -g maximum SYNC group size (default 6)\n"); + fprintf(stderr, " -c how often to vacuum in cleanup cycles\n"); + fprintf(stderr, " -p slon pid file\n"); + fprintf(stderr, " -f slon configuration file\n"); + fprintf(stderr, " -a directory to store SYNC archive files\n"); + fprintf(stderr, " -x program to run after writing archive file\n"); + fprintf(stderr, " -q Terminate when this node reaches # of SYNCs\n"); + fprintf(stderr, " -r # of syncs for -q option\n"); + fprintf(stderr, " -l this node should lag providers by this interval\n"); #ifdef WIN32 - fprintf(stderr, "\nWindows service registration:\n"); - fprintf(stderr, " slon -regservice [servicename]\n"); - fprintf(stderr, " slon -unregservice [servicename]\n"); - fprintf(stderr, " slon -listengines [servicename]\n"); - fprintf(stderr, " slon -addengine [servicename] \n"); - fprintf(stderr, " slon -delengine [servicename] \n"); + fprintf(stderr, "\nWindows service registration:\n"); + fprintf(stderr, " slon -regservice [servicename]\n"); + fprintf(stderr, " slon -unregservice [servicename]\n"); + fprintf(stderr, " slon -listengines [servicename]\n"); + fprintf(stderr, " slon -addengine [servicename] \n"); + fprintf(stderr, " slon -delengine [servicename] \n"); #endif - exit(1); + exit(1); } -/* ---------- - * main +/* ---------- + * main * ---------- */ int main(int argc, char *const argv[]) { - char *cp1; - char *cp2; - int c; - int errors = 0; - extern int optind; - extern char *optarg; + char *cp1; + char *cp2; + int c; + int errors = 0; + extern int optind; + extern char *optarg; #ifdef WIN32 - WSADATA wsaData; - int err; + WSADATA wsaData; + int err; #endif #ifdef WIN32 - if (argc >= 2 && !strcmp(argv[1], "-service")) - { - win32_servicestart(); - exit(0); - } - if (argc >= 2 && !strcmp(argv[1], "-subservice")) - { - win32_isservice = 1; - argc--; - argv++; - } - if (argc >= 2 && argc <= 4 && ( - !strcmp(argv[1], "-regservice") || - !strcmp(argv[1], "-unregservice") || - !strcmp(argv[1], "-addengine") || - !strcmp(argv[1], "-delengine") || - !strcmp(argv[1], "-listengines"))) - { - win32_serviceconfig(argc, argv); - } + if (argc >= 2 && !strcmp(argv[1], "-service")) + { + win32_servicestart(); + exit(0); + } + if (argc >= 2 && !strcmp(argv[1], "-subservice")) + { + win32_isservice = 1; + argc--; + argv++; + } + if (argc >= 2 && argc <= 4 && ( + !strcmp(argv[1], "-regservice") || + !strcmp(argv[1], "-unregservice") || + !strcmp(argv[1], "-addengine") || + !strcmp(argv[1], "-delengine") || + !strcmp(argv[1], "-listengines"))) + { + win32_serviceconfig(argc, argv); + } #endif - InitializeConfOptions(); - - while ((c = getopt(argc, argv, "f:a:d:s:t:g:c:p:o:q:r:l:x:hv?")) != EOF) - { - switch (c) - { - case '?': - Usage(argv); - case 'q': - set_config_option("quit_sync_provider", optarg); - break; - - case 'r': - set_config_option("quit_sync_finalsync", optarg); - break; - - case 'f': - ProcessConfigFile(optarg); - break; - - case 'a': - set_config_option("archive_dir", optarg); - break; - - case 'd': - set_config_option("log_level", optarg); - break; - - case 's': - set_config_option("sync_interval", optarg); - break; - - case 't': - set_config_option("sync_interval_timeout", optarg); - break; - - case 'g': - set_config_option("sync_group_maxsize", optarg); - break; - - case 'c': - set_config_option("vac_frequency", optarg); - break; - - case 'p': - set_config_option("pid_file", optarg); - break; - - case 'o': - set_config_option("desired_sync_time", optarg); - break; - - case 'l': - set_config_option("lag_interval", optarg); - break; - - case 'h': - errors++; - break; - - case 'v': - printf("slon version %s\n", SLONY_I_VERSION_STRING); - exit(0); - break; - - case 'x': - set_config_option("command_on_logarchive", optarg); - break; - - default: - fprintf(stderr, "unknown option '%c'\n", c); - errors++; - break; - } - } - - /* - * Make sure the sync interval timeout isn't too small. - */ - if (sync_interval_timeout != 0 && sync_interval_timeout <= sync_interval) - sync_interval_timeout = sync_interval * 2; - - /* - * Remember the cluster name and build the properly quoted namespace - * identifier - */ - slon_pid = getpid(); + InitializeConfOptions(); + + while ((c = getopt(argc, argv, "f:a:d:s:t:g:c:p:o:q:r:l:x:hv?")) != EOF) + { + switch (c) + { + case '?': + Usage(argv); + case 'q': + set_config_option("quit_sync_provider", optarg); + break; + + case 'r': + set_config_option("quit_sync_finalsync", optarg); + break; + + case 'f': + ProcessConfigFile(optarg); + break; + + case 'a': + set_config_option("archive_dir", optarg); + break; + + case 'd': + set_config_option("log_level", optarg); + break; + + case 's': + set_config_option("sync_interval", optarg); + break; + + case 't': + set_config_option("sync_interval_timeout", optarg); + break; + + case 'g': + set_config_option("sync_group_maxsize", optarg); + break; + + case 'c': + set_config_option("vac_frequency", optarg); + break; + + case 'p': + set_config_option("pid_file", optarg); + break; + + case 'o': + set_config_option("desired_sync_time", optarg); + break; + + case 'l': + set_config_option("lag_interval", optarg); + break; + + case 'h': + errors++; + break; + + case 'v': + printf("slon version %s\n", SLONY_I_VERSION_STRING); + exit(0); + break; + + case 'x': + set_config_option("command_on_logarchive", optarg); + break; + + default: + fprintf(stderr, "unknown option '%c'\n", c); + errors++; + break; + } + } + + /* + * Make sure the sync interval timeout isn't too small. + */ + if (sync_interval_timeout != 0 && sync_interval_timeout <= sync_interval) + sync_interval_timeout = sync_interval * 2; + + /* + * Remember the cluster name and build the properly quoted namespace + * identifier + */ + slon_pid = getpid(); #ifndef WIN32 - if (pthread_mutex_init(&slon_watchdog_lock, NULL) < 0) - { - slon_log(SLON_FATAL, "slon: pthread_mutex_init() - %s\n", - strerror(errno)); - exit(-1); - } - slon_watchdog_pid = slon_pid; - slon_worker_pid = -1; + if (pthread_mutex_init(&slon_watchdog_lock, NULL) < 0) + { + slon_log(SLON_FATAL, "slon: pthread_mutex_init() - %s\n", + strerror(errno)); + exit(-1); + } + slon_watchdog_pid = slon_pid; + slon_worker_pid = -1; #endif - main_argv = argv; - - if ((char *)argv[optind]) - { - set_config_option("cluster_name", (char *)argv[optind]); - set_config_option("conn_info", (char *)argv[++optind]); - } - - if (rtcfg_cluster_name != NULL) - { - rtcfg_namespace = malloc(strlen(rtcfg_cluster_name) * 2 + 4); - cp2 = rtcfg_namespace; - *cp2++ = '"'; - *cp2++ = '_'; - for (cp1 = (char *)rtcfg_cluster_name; *cp1; cp1++) - { - if (*cp1 == '"') - *cp2++ = '"'; - *cp2++ = *cp1; - } - *cp2++ = '"'; - *cp2 = '\0'; - } - else - { - errors++; - } - - slon_log(SLON_CONFIG, "main: slon version %s starting up\n", - SLONY_I_VERSION_STRING); - - /* - * Remember the connection information for the local node. - */ - if (rtcfg_conninfo == NULL) - { - errors++; - } - - if (errors != 0) - { - Usage(argv); - } + main_argv = argv; + + if ((char *)argv[optind]) + { + set_config_option("cluster_name", (char *)argv[optind]); + set_config_option("conn_info", (char *)argv[++optind]); + } + + if (rtcfg_cluster_name != NULL) + { + rtcfg_namespace = malloc(strlen(rtcfg_cluster_name) * 2 + 4); + cp2 = rtcfg_namespace; + *cp2++ = '"'; + *cp2++ = '_'; + for (cp1 = (char *)rtcfg_cluster_name; *cp1; cp1++) + { + if (*cp1 == '"') + *cp2++ = '"'; + *cp2++ = *cp1; + } + *cp2++ = '"'; + *cp2 = '\0'; + } + else + { + errors++; + } + + slon_log(SLON_CONFIG, "main: slon version %s starting up\n", + SLONY_I_VERSION_STRING); + + /* + * Remember the connection information for the local node. + */ + if (rtcfg_conninfo == NULL) + { + errors++; + } + + if (errors != 0) + { + Usage(argv); + } #ifdef WIN32 - /* - * Startup the network subsystem, in case our libpq doesn't - */ - err = WSAStartup(MAKEWORD(1, 1), &wsaData); - if (err != 0) - { - slon_log(SLON_FATAL, "main: Cannot start the network subsystem - %d\n", err); - exit(-1); - } + /* + * Startup the network subsystem, in case our libpq doesn't + */ + err = WSAStartup(MAKEWORD(1, 1), &wsaData); + if (err != 0) + { + slon_log(SLON_FATAL, "main: Cannot start the network subsystem - %d\n", err); + exit(-1); + } #endif - if (pid_file) - { - FILE *pidfile; - - pidfile = fopen(pid_file, "w"); - if (pidfile) - { - fprintf(pidfile, "%d", slon_pid); - fclose(pidfile); - } - else - { - slon_log(SLON_FATAL, "Cannot open pid_file \"%s\"\n", pid_file); - exit(-1); - } - } - - /* - * Create the pipe used to kick the workers scheduler thread - */ - if (pgpipe(sched_wakeuppipe) < 0) - { - slon_log(SLON_FATAL, "slon: sched_wakeuppipe create failed -(%d) %s\n", errno, strerror(errno)); - slon_exit(-1); - } - - if (!PQisthreadsafe()) - { - slon_log(SLON_FATAL,"slon: libpq was not compiled with thread safety enabled (normally: --enable-thread-safety). slon is a multithreaded application requiring thread-safe libpq\n"); - slon_exit(-1); - } - - if (!PQisthreadsafe()) - { - slon_log(SLON_FATAL,"slon: libpq was not compiled with --enable-thread-safety. Slony-I requires a thread enabled libpq\n"); - slon_exit(-1); - } - - /* - * There is no watchdog process on win32. We delegate restarting and other - * such tasks to the Service Control Manager. And win32 doesn't support - * signals, so we don't need to catch them... - */ + if (pid_file) + { + FILE *pidfile; + + pidfile = fopen(pid_file, "w"); + if (pidfile) + { + fprintf(pidfile, "%d", slon_pid); + fclose(pidfile); + } + else + { + slon_log(SLON_FATAL, "Cannot open pid_file \"%s\"\n", pid_file); + exit(-1); + } + } + + /* + * Create the pipe used to kick the workers scheduler thread + */ + if (pgpipe(sched_wakeuppipe) < 0) + { + slon_log(SLON_FATAL, "slon: sched_wakeuppipe create failed -(%d) %s\n", errno, strerror(errno)); + slon_exit(-1); + } + + if (!PQisthreadsafe()) + { + slon_log(SLON_FATAL,"slon: libpq was not compiled with thread safety enabled (normally: --enable-thread-safety). slon is a multithreaded application requiring thread-safe libpq\n"); + slon_exit(-1); + } + + if (!PQisthreadsafe()) + { + slon_log(SLON_FATAL,"slon: libpq was not compiled with --enable-thread-safety. Slony-I requires a thread enabled libpq\n"); + slon_exit(-1); + } + + /* + * There is no watchdog process on win32. We delegate restarting and other + * such tasks to the Service Control Manager. And win32 doesn't support + * signals, so we don't need to catch them... + */ #ifndef WIN32 - SlonWatchdog(); + SlonWatchdog(); #else - SlonMain(); + SlonMain(); #endif - exit(0); + exit(0); } -/* ---------- - * SlonMain +/* ---------- + * SlonMain * ---------- */ static void SlonMain(void) { - PGresult *res; - SlonDString query; - int i, - n; - PGconn *startup_conn; + PGresult *res; + SlonDString query; + int i, + n; + PGconn *startup_conn; - slon_pid = getpid(); + slon_pid = getpid(); #ifndef WIN32 - slon_worker_pid = slon_pid; + slon_worker_pid = slon_pid; #endif - if (pthread_mutex_init(&slon_wait_listen_lock, NULL) < 0) - { - slon_log(SLON_FATAL, "main: pthread_mutex_init() failed - %s\n", - strerror(errno)); - slon_abort(); - } - if (pthread_cond_init(&slon_wait_listen_cond, NULL) < 0) - { - slon_log(SLON_FATAL, "main: pthread_cond_init() failed - %s\n", - strerror(errno)); - slon_abort(); - } - - - /* - * Dump out current configuration - all elements of the various arrays... - */ - dump_configuration(); - /* - * Connect to the local database to read the initial configuration - */ - startup_conn = PQconnectdb(rtcfg_conninfo); - if (startup_conn == NULL) - { - slon_log(SLON_FATAL, "main: PQconnectdb() failed - sleep 10s\n"); - sleep (10); - slon_retry(); - exit(-1); - } - if (PQstatus(startup_conn) != CONNECTION_OK) - { - slon_log(SLON_FATAL, "main: Cannot connect to local database - %s - sleep 10s\n", - PQerrorMessage(startup_conn)); - PQfinish(startup_conn); - sleep(10); - slon_retry(); - exit(-1); - } - - /* - * Get our local node ID - */ - rtcfg_nodeid = db_getLocalNodeId(startup_conn); - if (rtcfg_nodeid < 0) - { - slon_log(SLON_FATAL, "main: Node is not initialized properly - sleep 10s\n"); - sleep(10); - slon_retry(); - exit(-1); - } - if (db_checkSchemaVersion(startup_conn) < 0) - { - slon_log(SLON_FATAL, "main: Node has wrong Slony-I schema or module version loaded\n"); - slon_abort(); - } - slon_log(SLON_CONFIG, "main: local node id = %d\n", rtcfg_nodeid); - - dstring_init(&query); - slon_mkquery(&query, "select %s.slon_node_health_check();", rtcfg_namespace); - res = PQexec(startup_conn, dstring_data(&query)); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - slon_log(SLON_FATAL, "could not call slon_node_health_check() - %", - PQresultErrorMessage(res)); - slon_abort(); - } else { - if (PQntuples(res) != 1) - { - slon_log(SLON_FATAL, - "query '%s' returned %d rows (expected 1)\n", - query, PQntuples(res)); - slon_abort(); - } else { - if (*(PQgetvalue(res, 0, 0)) == 'f') { - slon_log(SLON_FATAL, - "slon_node_health_check() returned false - fatal health problem!\n%s\nREPAIR CONFIG may be helpful to rectify this problem\n", - PQresultErrorMessage(res)); - slon_abort(); - } - } - } - PQclear(res); - dstring_free(&query); + if (pthread_mutex_init(&slon_wait_listen_lock, NULL) < 0) + { + slon_log(SLON_FATAL, "main: pthread_mutex_init() failed - %s\n", + strerror(errno)); + slon_abort(); + } + if (pthread_cond_init(&slon_wait_listen_cond, NULL) < 0) + { + slon_log(SLON_FATAL, "main: pthread_cond_init() failed - %s\n", + strerror(errno)); + slon_abort(); + } + + + /* + * Dump out current configuration - all elements of the various arrays... + */ + dump_configuration(); + /* + * Connect to the local database to read the initial configuration + */ + startup_conn = PQconnectdb(rtcfg_conninfo); + if (startup_conn == NULL) + { + slon_log(SLON_FATAL, "main: PQconnectdb() failed - sleep 10s\n"); + sleep (10); + slon_retry(); + exit(-1); + } + if (PQstatus(startup_conn) != CONNECTION_OK) + { + slon_log(SLON_FATAL, "main: Cannot connect to local database - %s - sleep 10s\n", + PQerrorMessage(startup_conn)); + PQfinish(startup_conn); + sleep(10); + slon_retry(); + exit(-1); + } + + /* + * Get our local node ID + */ + rtcfg_nodeid = db_getLocalNodeId(startup_conn); + if (rtcfg_nodeid < 0) + { + slon_log(SLON_FATAL, "main: Node is not initialized properly - sleep 10s\n"); + sleep(10); + slon_retry(); + exit(-1); + } + if (db_checkSchemaVersion(startup_conn) < 0) + { + slon_log(SLON_FATAL, "main: Node has wrong Slony-I schema or module version loaded\n"); + slon_abort(); + } + slon_log(SLON_CONFIG, "main: local node id = %d\n", rtcfg_nodeid); + + dstring_init(&query); + slon_mkquery(&query, "select %s.slon_node_health_check();", rtcfg_namespace); + res = PQexec(startup_conn, dstring_data(&query)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + slon_log(SLON_FATAL, "could not call slon_node_health_check() - %", + PQresultErrorMessage(res)); + slon_abort(); + } else { + if (PQntuples(res) != 1) + { + slon_log(SLON_FATAL, + "query '%s' returned %d rows (expected 1)\n", + query, PQntuples(res)); + slon_abort(); + } else { + if (*(PQgetvalue(res, 0, 0)) == 'f') { + slon_log(SLON_FATAL, + "slon_node_health_check() returned false - fatal health problem!\n%s\nREPAIR CONFIG may be helpful to rectify this problem\n", + PQresultErrorMessage(res)); + slon_abort(); + } + } + } + PQclear(res); + dstring_free(&query); #ifndef WIN32 - if (signal(SIGHUP, SIG_IGN) == SIG_ERR) - { - slon_log(SLON_FATAL, "main: SIGHUP signal handler setup failed -(%d) %s\n", errno, strerror(errno)); - slon_abort(); - } - if (signal(SIGINT, SIG_IGN) == SIG_ERR) - { - slon_log(SLON_FATAL, "main: SIGINT signal handler setup failed -(%d) %s\n", errno, strerror(errno)); - slon_abort(); - } - if (signal(SIGTERM, SIG_IGN) == SIG_ERR) - { - slon_log(SLON_FATAL, "main: SIGTERM signal handler setup failed -(%d) %s\n", errno, strerror(errno)); - slon_abort(); - } - if (signal(SIGCHLD, SIG_IGN) == SIG_ERR) - { - slon_log(SLON_FATAL, "main: SIGCHLD signal handler setup failed -(%d) %s\n", errno, strerror(errno)); - slon_abort(); - } - if (signal(SIGQUIT, SIG_IGN) == SIG_ERR) - { - slon_log(SLON_FATAL, "main: SIGQUIT signal handler setup failed -(%d) %s\n", errno, strerror(errno)); - slon_abort(); - } -#endif - - slon_log(SLON_INFO, "main: main process started\n"); - - /* - * Start the event scheduling system - */ - slon_log(SLON_CONFIG, "main: launching sched_start_mainloop\n"); - if (sched_start_mainloop() < 0) - slon_retry(); - - slon_log(SLON_CONFIG, "main: loading current cluster configuration\n"); - - /* - * Begin a transaction - */ - res = PQexec(startup_conn, - "start transaction; " - "set transaction isolation level serializable;"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - slon_log(SLON_FATAL, "Cannot start transaction - %s - sleep 10s\n", - PQresultErrorMessage(res)); - sleep (10); - PQclear(res); - slon_retry(); - } - PQclear(res); - - /* - * Read configuration table sl_node - */ - dstring_init(&query); - slon_mkquery(&query, - "select no_id, no_active, no_comment, " - " (select coalesce(max(con_seqno),0) from %s.sl_confirm " - " where con_origin = no_id and con_received = %d) " - " as last_event " - "from %s.sl_node " - "order by no_id; ", - rtcfg_namespace, rtcfg_nodeid, rtcfg_namespace); - res = PQexec(startup_conn, dstring_data(&query)); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - slon_log(SLON_FATAL, "main: Cannot get node list - %s\n", - PQresultErrorMessage(res)); - PQclear(res); - dstring_free(&query); - slon_retry(); - } - for (i = 0, n = PQntuples(res); i < n; i++) - { - int no_id = (int)strtol(PQgetvalue(res, i, 0), NULL, 10); - int no_active = (*PQgetvalue(res, i, 1) == 't') ? 1 : 0; - char *no_comment = PQgetvalue(res, i, 2); - int64 last_event; - - if (no_id == rtcfg_nodeid) - { - /* - * Complete our own local node entry - */ - rtcfg_nodeactive = no_active; - rtcfg_nodecomment = strdup(no_comment); - } - else - { - /* - * Add a remote node - */ - slon_scanint64(PQgetvalue(res, i, 3), &last_event); - rtcfg_storeNode(no_id, no_comment); - rtcfg_setNodeLastEvent(no_id, last_event); - - /* - * If it is active, remember for activation just before we start - * processing events. - */ - if (no_active) - rtcfg_needActivate(no_id); - } - } - PQclear(res); - - /* - * Read configuration table sl_path - the interesting pieces - */ - slon_mkquery(&query, - "select pa_server, pa_conninfo, pa_connretry " - "from %s.sl_path where pa_client = %d", - rtcfg_namespace, rtcfg_nodeid); - res = PQexec(startup_conn, dstring_data(&query)); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - slon_log(SLON_FATAL, "main: Cannot get path config - %s\n", - PQresultErrorMessage(res)); - PQclear(res); - dstring_free(&query); - slon_retry(); - } - for (i = 0, n = PQntuples(res); i < n; i++) - { - int pa_server = (int)strtol(PQgetvalue(res, i, 0), NULL, 10); - char *pa_conninfo = PQgetvalue(res, i, 1); - int pa_connretry = (int)strtol(PQgetvalue(res, i, 2), NULL, 10); - - rtcfg_storePath(pa_server, pa_conninfo, pa_connretry); - } - PQclear(res); - - /* - * Load the initial listen configuration - */ - rtcfg_reloadListen(startup_conn); - - /* - * Read configuration table sl_set - */ - slon_mkquery(&query, - "select set_id, set_origin, set_comment " - "from %s.sl_set", - rtcfg_namespace); - res = PQexec(startup_conn, dstring_data(&query)); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - slon_log(SLON_FATAL, "main: Cannot get set config - %s\n", - PQresultErrorMessage(res)); - PQclear(res); - dstring_free(&query); - slon_retry(); - } - for (i = 0, n = PQntuples(res); i < n; i++) - { - int set_id = (int)strtol(PQgetvalue(res, i, 0), NULL, 10); - int set_origin = (int)strtol(PQgetvalue(res, i, 1), NULL, 10); - char *set_comment = PQgetvalue(res, i, 2); - - rtcfg_storeSet(set_id, set_origin, set_comment); - } - PQclear(res); - - /* - * Read configuration table sl_subscribe - only subscriptions for local - * node - */ - slon_mkquery(&query, - "select sub_set, sub_provider, sub_forward, sub_active " - "from %s.sl_subscribe " - "where sub_receiver = %d", - rtcfg_namespace, rtcfg_nodeid); - res = PQexec(startup_conn, dstring_data(&query)); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - slon_log(SLON_FATAL, "main: Cannot get subscription config - %s\n", - PQresultErrorMessage(res)); - PQclear(res); - dstring_free(&query); - slon_retry(); - } - for (i = 0, n = PQntuples(res); i < n; i++) - { - int sub_set = (int)strtol(PQgetvalue(res, i, 0), NULL, 10); - int sub_provider = (int)strtol(PQgetvalue(res, i, 1), NULL, 10); - char *sub_forward = PQgetvalue(res, i, 2); - char *sub_active = PQgetvalue(res, i, 3); - - rtcfg_storeSubscribe(sub_set, sub_provider, sub_forward); - if (*sub_active == 't') - rtcfg_enableSubscription(sub_set, sub_provider, sub_forward); - } - PQclear(res); - - /* - * Remember the last known local event sequence - */ - slon_mkquery(&query, - "select coalesce(max(ev_seqno), -1) from %s.sl_event " - "where ev_origin = '%d'", - rtcfg_namespace, rtcfg_nodeid); - res = PQexec(startup_conn, dstring_data(&query)); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - slon_log(SLON_FATAL, "main: Cannot get last local eventid - %s\n", - PQresultErrorMessage(res)); - PQclear(res); - dstring_free(&query); - slon_retry(); - } - if (PQntuples(res) == 0) - strcpy(rtcfg_lastevent, "-1"); - else if (PQgetisnull(res, 0, 0)) - strcpy(rtcfg_lastevent, "-1"); - else - strcpy(rtcfg_lastevent, PQgetvalue(res, 0, 0)); - PQclear(res); - dstring_free(&query); - slon_log(SLON_CONFIG, - "main: last local event sequence = %s\n", - rtcfg_lastevent); - - /* - * Rollback the transaction we used to get the config snapshot - */ - res = PQexec(startup_conn, "rollback transaction;"); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - { - slon_log(SLON_FATAL, "main: Cannot rollback transaction - %s\n", - PQresultErrorMessage(res)); - PQclear(res); - slon_retry(); - } - PQclear(res); - - /* - * Done with the startup, don't need the local connection any more. - */ - PQfinish(startup_conn); - - slon_log(SLON_CONFIG, "main: configuration complete - starting threads\n"); - - /* - * Create the local event thread that monitors the local node for - * administrative events to adjust the configuration at runtime. We wait - * here until the local listen thread has checked that there is no other - * slon daemon running. - */ - pthread_mutex_lock(&slon_wait_listen_lock); - if (pthread_create(&local_event_thread, NULL, localListenThread_main, NULL) < 0) - { - slon_log(SLON_FATAL, "main: cannot create localListenThread - %s\n", - strerror(errno)); - slon_retry(); - } - pthread_cond_wait(&slon_wait_listen_cond, &slon_wait_listen_lock); - if(!slon_listen_started) - { - /** - * The local listen thread did not start up properly. - */ - slon_log(SLON_FATAL,"main: localListenThread did not start\n"); - slon_abort(); - } - pthread_mutex_unlock(&slon_wait_listen_lock); - - /* - * Enable all nodes that are active - */ - rtcfg_doActivate(); - - /* - * Create the local cleanup thread that will remove old events and log - * data. - */ - if (pthread_create(&local_cleanup_thread, NULL, cleanupThread_main, NULL) < 0) - { - slon_log(SLON_FATAL, "main: cannot create cleanupThread - %s\n", - strerror(errno)); - slon_retry(); - } - - /* - * Create the local sync thread that will generate SYNC events if we had - * local database updates. - */ - if (pthread_create(&local_sync_thread, NULL, syncThread_main, NULL) < 0) - { - slon_log(SLON_FATAL, "main: cannot create syncThread - %s\n", - strerror(errno)); - slon_retry(); - } - -#ifdef HAVE_NETSNMP - if (pthread_create(&local_snmp_thread, NULL, snmpThread_main, NULL) < 0) - { - slon_log(SLON_FATAL, "main: cannot create snmpThread -%s\n", - strerror(errno)); - slon_retry(); - } -#endif - - /* - * Wait until the scheduler has shut down all remote connections - */ - slon_log(SLON_INFO, "main: running scheduler mainloop\n"); - if (sched_wait_mainloop() < 0) - { - slon_log(SLON_FATAL, "main: scheduler returned with error\n"); - slon_retry(); - } - slon_log(SLON_INFO, "main: scheduler mainloop returned\n"); - - /* - * Wait for all remote threads to finish - */ - main_thread = pthread_self(); - - slon_log(SLON_CONFIG, "main: wait for remote threads\n"); - rtcfg_joinAllRemoteThreads(); - - /* - * Wait for the local threads to finish - */ - if (pthread_join(local_event_thread, NULL) < 0) - slon_log(SLON_ERROR, "main: cannot join localListenThread - %s\n", - strerror(errno)); - - if (pthread_join(local_cleanup_thread, NULL) < 0) - slon_log(SLON_ERROR, "main: cannot join cleanupThread - %s\n", - strerror(errno)); - - if (pthread_join(local_sync_thread, NULL) < 0) - slon_log(SLON_ERROR, "main: cannot join syncThread - %s\n", - strerror(errno)); - -#ifdef HAVE_NETSNMP - if (pthread_kill(local_snmp_thread, SIGINT) < 0) - slon_log(SLON_ERROR, "main: cannot join snmpThread - %s\n", - strerror(errno)); + if (signal(SIGHUP, SIG_IGN) == SIG_ERR) + { + slon_log(SLON_FATAL, "main: SIGHUP signal handler setup failed -(%d) %s\n", errno, strerror(errno)); + slon_abort(); + } + if (signal(SIGINT, SIG_IGN) == SIG_ERR) + { + slon_log(SLON_FATAL, "main: SIGINT signal handler setup failed -(%d) %s\n", errno, strerror(errno)); + slon_abort(); + } + if (signal(SIGTERM, SIG_IGN) == SIG_ERR) + { + slon_log(SLON_FATAL, "main: SIGTERM signal handler setup failed -(%d) %s\n", errno, strerror(errno)); + slon_abort(); + } + if (signal(SIGCHLD, SIG_IGN) == SIG_ERR) + { + slon_log(SLON_FATAL, "main: SIGCHLD signal handler setup failed -(%d) %s\n", errno, strerror(errno)); + slon_abort(); + } + if (signal(SIGQUIT, SIG_IGN) == SIG_ERR) + { + slon_log(SLON_FATAL, "main: SIGQUIT signal handler setup failed -(%d) %s\n", errno, strerror(errno)); + slon_abort(); + } #endif - slon_log(SLON_CONFIG, "main: done\n"); - - exit(0); + slon_log(SLON_INFO, "main: main process started\n"); + + /* + * Start the event scheduling system + */ + slon_log(SLON_CONFIG, "main: launching sched_start_mainloop\n"); + if (sched_start_mainloop() < 0) + slon_retry(); + + slon_log(SLON_CONFIG, "main: loading current cluster configuration\n"); + + /* + * Begin a transaction + */ + res = PQexec(startup_conn, + "start transaction; " + "set transaction isolation level serializable;"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + slon_log(SLON_FATAL, "Cannot start transaction - %s - sleep 10s\n", + PQresultErrorMessage(res)); + sleep (10); + PQclear(res); + slon_retry(); + } + PQclear(res); + + /* + * Read configuration table sl_node + */ + dstring_init(&query); + slon_mkquery(&query, + "select no_id, no_active, no_comment, " + " (select coalesce(max(con_seqno),0) from %s.sl_confirm " + " where con_origin = no_id and con_received = %d) " + " as last_event " + "from %s.sl_node " + "order by no_id; ", + rtcfg_namespace, rtcfg_nodeid, rtcfg_namespace); + res = PQexec(startup_conn, dstring_data(&query)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + slon_log(SLON_FATAL, "main: Cannot get node list - %s\n", + PQresultErrorMessage(res)); + PQclear(res); + dstring_free(&query); + slon_retry(); + } + for (i = 0, n = PQntuples(res); i < n; i++) + { + int no_id = (int)strtol(PQgetvalue(res, i, 0), NULL, 10); + int no_active = (*PQgetvalue(res, i, 1) == 't') ? 1 : 0; + char *no_comment = PQgetvalue(res, i, 2); + int64 last_event; + + if (no_id == rtcfg_nodeid) + { + /* + * Complete our own local node entry + */ + rtcfg_nodeactive = no_active; + rtcfg_nodecomment = strdup(no_comment); + } + else + { + /* + * Add a remote node + */ + slon_scanint64(PQgetvalue(res, i, 3), &last_event); + rtcfg_storeNode(no_id, no_comment); + rtcfg_setNodeLastEvent(no_id, last_event); + + /* + * If it is active, remember for activation just before we start + * processing events. + */ + if (no_active) + rtcfg_needActivate(no_id); + } + } + PQclear(res); + + /* + * Read configuration table sl_path - the interesting pieces + */ + slon_mkquery(&query, + "select pa_server, pa_conninfo, pa_connretry " + "from %s.sl_path where pa_client = %d", + rtcfg_namespace, rtcfg_nodeid); + res = PQexec(startup_conn, dstring_data(&query)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + slon_log(SLON_FATAL, "main: Cannot get path config - %s\n", + PQresultErrorMessage(res)); + PQclear(res); + dstring_free(&query); + slon_retry(); + } + for (i = 0, n = PQntuples(res); i < n; i++) + { + int pa_server = (int)strtol(PQgetvalue(res, i, 0), NULL, 10); + char *pa_conninfo = PQgetvalue(res, i, 1); + int pa_connretry = (int)strtol(PQgetvalue(res, i, 2), NULL, 10); + + rtcfg_storePath(pa_server, pa_conninfo, pa_connretry); + } + PQclear(res); + + /* + * Load the initial listen configuration + */ + rtcfg_reloadListen(startup_conn); + + /* + * Read configuration table sl_set + */ + slon_mkquery(&query, + "select set_id, set_origin, set_comment " + "from %s.sl_set", + rtcfg_namespace); + res = PQexec(startup_conn, dstring_data(&query)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + slon_log(SLON_FATAL, "main: Cannot get set config - %s\n", + PQresultErrorMessage(res)); + PQclear(res); + dstring_free(&query); + slon_retry(); + } + for (i = 0, n = PQntuples(res); i < n; i++) + { + int set_id = (int)strtol(PQgetvalue(res, i, 0), NULL, 10); + int set_origin = (int)strtol(PQgetvalue(res, i, 1), NULL, 10); + char *set_comment = PQgetvalue(res, i, 2); + + rtcfg_storeSet(set_id, set_origin, set_comment); + } + PQclear(res); + + /* + * Read configuration table sl_subscribe - only subscriptions for local + * node + */ + slon_mkquery(&query, + "select sub_set, sub_provider, sub_forward, sub_active " + "from %s.sl_subscribe " + "where sub_receiver = %d", + rtcfg_namespace, rtcfg_nodeid); + res = PQexec(startup_conn, dstring_data(&query)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + slon_log(SLON_FATAL, "main: Cannot get subscription config - %s\n", + PQresultErrorMessage(res)); + PQclear(res); + dstring_free(&query); + slon_retry(); + } + for (i = 0, n = PQntuples(res); i < n; i++) + { + int sub_set = (int)strtol(PQgetvalue(res, i, 0), NULL, 10); + int sub_provider = (int)strtol(PQgetvalue(res, i, 1), NULL, 10); + char *sub_forward = PQgetvalue(res, i, 2); + char *sub_active = PQgetvalue(res, i, 3); + + rtcfg_storeSubscribe(sub_set, sub_provider, sub_forward); + if (*sub_active == 't') + rtcfg_enableSubscription(sub_set, sub_provider, sub_forward); + } + PQclear(res); + + /* + * Remember the last known local event sequence + */ + slon_mkquery(&query, + "select coalesce(max(ev_seqno), -1) from %s.sl_event " + "where ev_origin = '%d'", + rtcfg_namespace, rtcfg_nodeid); + res = PQexec(startup_conn, dstring_data(&query)); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + slon_log(SLON_FATAL, "main: Cannot get last local eventid - %s\n", + PQresultErrorMessage(res)); + PQclear(res); + dstring_free(&query); + slon_retry(); + } + if (PQntuples(res) == 0) + strcpy(rtcfg_lastevent, "-1"); + else if (PQgetisnull(res, 0, 0)) + strcpy(rtcfg_lastevent, "-1"); + else + strcpy(rtcfg_lastevent, PQgetvalue(res, 0, 0)); + PQclear(res); + dstring_free(&query); + slon_log(SLON_CONFIG, + "main: last local event sequence = %s\n", + rtcfg_lastevent); + + /* + * Rollback the transaction we used to get the config snapshot + */ + res = PQexec(startup_conn, "rollback transaction;"); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + slon_log(SLON_FATAL, "main: Cannot rollback transaction - %s\n", + PQresultErrorMessage(res)); + PQclear(res); + slon_retry(); + } + PQclear(res); + + /* + * Done with the startup, don't need the local connection any more. + */ + PQfinish(startup_conn); + + slon_log(SLON_CONFIG, "main: configuration complete - starting threads\n"); + + /* + * Create the local event thread that monitors the local node for + * administrative events to adjust the configuration at runtime. We wait + * here until the local listen thread has checked that there is no other + * slon daemon running. + */ + pthread_mutex_lock(&slon_wait_listen_lock); + if (pthread_create(&local_event_thread, NULL, localListenThread_main, NULL) < 0) + { + slon_log(SLON_FATAL, "main: cannot create localListenThread - %s\n", + strerror(errno)); + slon_retry(); + } + pthread_cond_wait(&slon_wait_listen_cond, &slon_wait_listen_lock); + if(!slon_listen_started) + { + /** + * The local listen thread did not start up properly. + */ + slon_log(SLON_FATAL,"main: localListenThread did not start\n"); + slon_abort(); + } + pthread_mutex_unlock(&slon_wait_listen_lock); + + /* + * Enable all nodes that are active + */ + rtcfg_doActivate(); + + /* + * Create the local cleanup thread that will remove old events and log + * data. + */ + if (pthread_create(&local_cleanup_thread, NULL, cleanupThread_main, NULL) < 0) + { + slon_log(SLON_FATAL, "main: cannot create cleanupThread - %s\n", + strerror(errno)); + slon_retry(); + } + + /* + * Create the local sync thread that will generate SYNC events if we had + * local database updates. + */ + if (pthread_create(&local_sync_thread, NULL, syncThread_main, NULL) < 0) + { + slon_log(SLON_FATAL, "main: cannot create syncThread - %s\n", + strerror(errno)); + slon_retry(); + } + + /* + * Wait until the scheduler has shut down all remote connections + */ + slon_log(SLON_INFO, "main: running scheduler mainloop\n"); + if (sched_wait_mainloop() < 0) + { + slon_log(SLON_FATAL, "main: scheduler returned with error\n"); + slon_retry(); + } + slon_log(SLON_INFO, "main: scheduler mainloop returned\n"); + + /* + * Wait for all remote threads to finish + */ + main_thread = pthread_self(); + + slon_log(SLON_CONFIG, "main: wait for remote threads\n"); + rtcfg_joinAllRemoteThreads(); + + /* + * Wait for the local threads to finish + */ + if (pthread_join(local_event_thread, NULL) < 0) + slon_log(SLON_ERROR, "main: cannot join localListenThread - %s\n", + strerror(errno)); + + if (pthread_join(local_cleanup_thread, NULL) < 0) + slon_log(SLON_ERROR, "main: cannot join cleanupThread - %s\n", + strerror(errno)); + + if (pthread_join(local_sync_thread, NULL) < 0) + slon_log(SLON_ERROR, "main: cannot join syncThread - %s\n", + strerror(errno)); + + slon_log(SLON_CONFIG, "main: done\n"); + + exit(0); } #ifndef WIN32 -/* ---------- - * SlonWatchdog +/* ---------- + * SlonWatchdog * ---------- */ static void SlonWatchdog(void) { - pid_t pid; - int shutdown=0; - int return_code=-99; - char * termination_reason="unknown"; - slon_log(SLON_INFO, "slon: watchdog process started\n"); - - - - slon_log(SLON_CONFIG, "slon: watchdog ready - pid = %d\n", slon_watchdog_pid); - - slon_worker_pid = fork(); - if (slon_worker_pid == 0) - { - SlonMain(); - exit(-1); - } - else if (slon_worker_pid < 0) - { - slon_log(SLON_FATAL, "slon: failed to fork child: %d %s\n", - errno,strerror(errno)); - slon_exit(-1); - - } - /* - * Install signal handlers - */ - - if (install_signal_handler(SIGHUP, sighandler) == SIG_ERR) - { - slon_log(SLON_FATAL, "slon: SIGHUP signal handler setup failed -(%d) %s\n", errno, strerror(errno)); - slon_exit(-1); - } - - if (install_signal_handler(SIGUSR1,sighandler) == SIG_ERR) - { - slon_log(SLON_FATAL, "slon: SIGUSR1 signal handler setup failed -(%d) %s\n", errno, strerror(errno)); - slon_exit(-1); - } - if (install_signal_handler(SIGALRM,sighandler) == SIG_ERR) - { - slon_log(SLON_FATAL, "slon: SIGALRM signal handler setup failed -(%d) %s\n", errno, strerror(errno)); - slon_exit(-1); - } - if (install_signal_handler(SIGINT,sighandler) == SIG_ERR) - { - slon_log(SLON_FATAL, "slon: SIGINT signal handler setup failed -(%d) %s\n", errno, strerror(errno)); - slon_exit(-1); - } - if (install_signal_handler(SIGTERM,sighandler) == SIG_ERR) - { - slon_log(SLON_FATAL, "slon: SIGTERM signal handler setup failed -(%d) %s\n", errno, strerror(errno)); - slon_exit(-1); - } - - - if (install_signal_handler(SIGQUIT,sighandler) == SIG_ERR) - { - slon_log(SLON_FATAL, "slon: SIGQUIT signal handler setup failed -(%d) %s\n", errno, strerror(errno)); - slon_exit(-1); - } - - slon_log(SLON_CONFIG, "slon: worker process created - pid = %d\n", - slon_worker_pid); - while(!shutdown) - { - while ((pid = wait(&child_status)) != slon_worker_pid) - { - if (pid < 0 && errno == EINTR) - continue; - - slon_log(SLON_CONFIG, "slon: child terminated status: %d; pid: %d, current worker pid: %d errno: %d\n", child_status, pid, slon_worker_pid,errno); - - if(pid < 0 ) - { - /** - * if errno is not EINTR and pid<0 we have - * a problem. - * looping on wait() isn't a good idea. - */ - slon_log(SLON_FATAL,"slon: wait returned an error pid:%d errno:%d\n", - pid,errno); - exit(-1); - } - } - if( WIFSIGNALED(child_status) ) - { - return_code=WTERMSIG(child_status); - termination_reason="signal"; - } - else if ( WIFEXITED(child_status) ) - { - return_code=WEXITSTATUS(child_status); - termination_reason="exit code"; - } - slon_log(SLON_CONFIG, "slon: child terminated %s: %d; pid: %d, current worker pid: %d\n", termination_reason,return_code, pid, slon_worker_pid); - - - switch (watchdog_status) - { - case SLON_WATCHDOG_RESTART: - slon_log(SLON_CONFIG,"slon: restart of worker in 20 seconds\n"); - sleep(20); - slon_worker_pid = fork(); - if(slon_worker_pid==0) - { - worker_restarted=1; - SlonMain(); - exit(-1); - } - else if (slon_worker_pid < 0) - { - slon_log(SLON_FATAL, "slon: failed to fork child: %d %s\n", - errno,strerror(errno)); - slon_exit(-1); - - } - watchdog_status=SLON_WATCHDOG_NORMAL; - continue; - - case SLON_WATCHDOG_NORMAL: - case SLON_WATCHDOG_RETRY: - watchdog_status = SLON_WATCHDOG_RETRY; - if (child_status != 0) - { - slon_log(SLON_CONFIG, "slon: restart of worker in 10 seconds\n"); - (void) sleep(10); - } - else - { - slon_log(SLON_CONFIG, "slon: restart of worker\n"); - } - if (watchdog_status == SLON_WATCHDOG_RETRY) - { - slon_worker_pid=fork(); - if(slon_worker_pid == 0) - { - worker_restarted=1; - SlonMain(); - exit(-1); - } - else if (slon_worker_pid < 0) - { - slon_log(SLON_FATAL, "slon: failed to fork child: %d %s\n", - errno,strerror(errno)); - slon_exit(-1); - - } - watchdog_status=SLON_WATCHDOG_NORMAL; - continue; - } - break; - - default: - shutdown=1; - break; - } /*switch*/ - }/*while*/ - - slon_log(SLON_INFO, "slon: done\n"); - - /* - * That's it. - */ - slon_exit(0); + pid_t pid; + int shutdown=0; + int return_code=-99; + char * termination_reason="unknown"; + slon_log(SLON_INFO, "slon: watchdog process started\n"); + + + + slon_log(SLON_CONFIG, "slon: watchdog ready - pid = %d\n", slon_watchdog_pid); + + slon_worker_pid = fork(); + if (slon_worker_pid == 0) + { + SlonMain(); + exit(-1); + } + else if (slon_worker_pid < 0) + { + slon_log(SLON_FATAL, "slon: failed to fork child: %d %s\n", + errno,strerror(errno)); + slon_exit(-1); + + } + /* + * Install signal handlers + */ + + if (install_signal_handler(SIGHUP, sighandler) == SIG_ERR) + { + slon_log(SLON_FATAL, "slon: SIGHUP signal handler setup failed -(%d) %s\n", errno, strerror(errno)); + slon_exit(-1); + } + + if (install_signal_handler(SIGUSR1,sighandler) == SIG_ERR) + { + slon_log(SLON_FATAL, "slon: SIGUSR1 signal handler setup failed -(%d) %s\n", errno, strerror(errno)); + slon_exit(-1); + } + if (install_signal_handler(SIGALRM,sighandler) == SIG_ERR) + { + slon_log(SLON_FATAL, "slon: SIGALRM signal handler setup failed -(%d) %s\n", errno, strerror(errno)); + slon_exit(-1); + } + if (install_signal_handler(SIGINT,sighandler) == SIG_ERR) + { + slon_log(SLON_FATAL, "slon: SIGINT signal handler setup failed -(%d) %s\n", errno, strerror(errno)); + slon_exit(-1); + } + if (install_signal_handler(SIGTERM,sighandler) == SIG_ERR) + { + slon_log(SLON_FATAL, "slon: SIGTERM signal handler setup failed -(%d) %s\n", errno, strerror(errno)); + slon_exit(-1); + } + + + if (install_signal_handler(SIGQUIT,sighandler) == SIG_ERR) + { + slon_log(SLON_FATAL, "slon: SIGQUIT signal handler setup failed -(%d) %s\n", errno, strerror(errno)); + slon_exit(-1); + } + + slon_log(SLON_CONFIG, "slon: worker process created - pid = %d\n", + slon_worker_pid); + while(!shutdown) + { + while ((pid = wait(&child_status)) != slon_worker_pid) + { + if (pid < 0 && errno == EINTR) + continue; + + slon_log(SLON_CONFIG, "slon: child terminated status: %d; pid: %d, current worker pid: %d errno: %d\n", child_status, pid, slon_worker_pid,errno); + + if(pid < 0 ) + { + /** + * if errno is not EINTR and pid<0 we have + * a problem. + * looping on wait() isn't a good idea. + */ + slon_log(SLON_FATAL,"slon: wait returned an error pid:%d errno:%d\n", + pid,errno); + exit(-1); + } + } + if( WIFSIGNALED(child_status) ) + { + return_code=WTERMSIG(child_status); + termination_reason="signal"; + } + else if ( WIFEXITED(child_status) ) + { + return_code=WEXITSTATUS(child_status); + termination_reason="exit code"; + } + slon_log(SLON_CONFIG, "slon: child terminated %s: %d; pid: %d, current worker pid: %d\n", termination_reason,return_code, pid, slon_worker_pid); + + + switch (watchdog_status) + { + case SLON_WATCHDOG_RESTART: + slon_log(SLON_CONFIG,"slon: restart of worker in 20 seconds\n"); + sleep(20); + slon_worker_pid = fork(); + if(slon_worker_pid==0) + { + worker_restarted=1; + SlonMain(); + exit(-1); + } + else if (slon_worker_pid < 0) + { + slon_log(SLON_FATAL, "slon: failed to fork child: %d %s\n", + errno,strerror(errno)); + slon_exit(-1); + + } + watchdog_status=SLON_WATCHDOG_NORMAL; + continue; + + case SLON_WATCHDOG_NORMAL: + case SLON_WATCHDOG_RETRY: + watchdog_status = SLON_WATCHDOG_RETRY; + if (child_status != 0) + { + slon_log(SLON_CONFIG, "slon: restart of worker in 10 seconds\n"); + (void) sleep(10); + } + else + { + slon_log(SLON_CONFIG, "slon: restart of worker\n"); + } + if (watchdog_status == SLON_WATCHDOG_RETRY) + { + slon_worker_pid=fork(); + if(slon_worker_pid == 0) + { + worker_restarted=1; + SlonMain(); + exit(-1); + } + else if (slon_worker_pid < 0) + { + slon_log(SLON_FATAL, "slon: failed to fork child: %d %s\n", + errno,strerror(errno)); + slon_exit(-1); + + } + watchdog_status=SLON_WATCHDOG_NORMAL; + continue; + } + break; + + default: + shutdown=1; + break; + } /*switch*/ + }/*while*/ + + slon_log(SLON_INFO, "slon: done\n"); + + /* + * That's it. + */ + slon_exit(0); } -/* ---------- - * sighandler +/* ---------- + * sighandler * ---------- */ static void sighandler(int signo) { - switch (signo) - { - case SIGALRM: - kill(slon_worker_pid, SIGKILL); - break; - - case SIGCHLD: - break; - - case SIGHUP: - watchdog_status = SLON_WATCHDOG_RESTART; - slon_terminate_worker(); - break; - - case SIGUSR1: - watchdog_status = SLON_WATCHDOG_RETRY; - slon_terminate_worker(); - break; - - case SIGINT: - case SIGTERM: - watchdog_status = SLON_WATCHDOG_SHUTDOWN; - slon_terminate_worker(); - break; - - case SIGQUIT: - kill(slon_worker_pid, SIGKILL); - slon_exit(-1); - break; - } + switch (signo) + { + case SIGALRM: + kill(slon_worker_pid, SIGKILL); + break; + + case SIGCHLD: + break; + + case SIGHUP: + watchdog_status = SLON_WATCHDOG_RESTART; + slon_terminate_worker(); + break; + + case SIGUSR1: + watchdog_status = SLON_WATCHDOG_RETRY; + slon_terminate_worker(); + break; + + case SIGINT: + case SIGTERM: + watchdog_status = SLON_WATCHDOG_SHUTDOWN; + slon_terminate_worker(); + break; + + case SIGQUIT: + kill(slon_worker_pid, SIGKILL); + slon_exit(-1); + break; + } } -/* ---------- - * slon_terminate_worker +/* ---------- + * slon_terminate_worker * ---------- */ void slon_terminate_worker() { - (void) kill(slon_worker_pid, SIGKILL); + (void) kill(slon_worker_pid, SIGKILL); } #endif -/* ---------- - * slon_exit +/* ---------- + * slon_exit * ---------- */ static void slon_exit(int code) { #ifdef WIN32 - /* Cleanup winsock */ - WSACleanup(); + /* Cleanup winsock */ + WSACleanup(); #endif - if (pid_file) - { - slon_log(SLON_INFO, "slon: remove pid file\n"); - (void) unlink(pid_file); - } + if (pid_file) + { + slon_log(SLON_INFO, "slon: remove pid file\n"); + (void) unlink(pid_file); + } - slon_log(SLON_INFO, "slon: exit(%d)\n", code); + slon_log(SLON_INFO, "slon: exit(%d)\n", code); - exit(code); + exit(code); } static sighandler_t install_signal_handler(int signo, sighandler_t handler) { - + #ifndef CYGWIN - struct sigaction act; - act.sa_handler = handler; - (void) sigemptyset(&act.sa_mask); - act.sa_flags = SA_NODEFER; + struct sigaction act; + act.sa_handler = handler; + (void) sigemptyset(&act.sa_mask); + act.sa_flags = SA_NODEFER; - if(sigaction(signo, &act, NULL) < 0) - { - return SIG_ERR; - } - return handler; + if(sigaction(signo, &act, NULL) < 0) + { + return SIG_ERR; + } + return handler; #else - return signal(signo,handler); + return signal(signo,handler); #endif } diff --git a/src/slon/slon.h b/src/slon/slon.h index b88827cf..caf19bc4 100644 --- a/src/slon/slon.h +++ b/src/slon/slon.h @@ -6,7 +6,7 @@ * Copyright (c) 2003-2009, PostgreSQL Global Development Group * Author: Jan Wieck, Afilias USA INC. * - * + * *------------------------------------------------------------------------- */ @@ -57,11 +57,11 @@ typedef enum { - SLON_TSTAT_NONE, - SLON_TSTAT_RUNNING, - SLON_TSTAT_SHUTDOWN, - SLON_TSTAT_RESTART, - SLON_TSTAT_DONE + SLON_TSTAT_NONE, + SLON_TSTAT_RUNNING, + SLON_TSTAT_SHUTDOWN, + SLON_TSTAT_RESTART, + SLON_TSTAT_DONE } SlonThreadStatus; @@ -84,38 +84,38 @@ typedef struct SlonWorkMsg_s SlonWorkMsg; */ struct SlonNode_s { - int no_id; /* node ID */ - int no_active; /* it's active state */ - char *no_comment; /* comment field */ + int no_id; /* node ID */ + int no_active; /* it's active state */ + char *no_comment; /* comment field */ #if 0 - pthread_mutex_t node_lock; /* mutex for node */ + pthread_mutex_t node_lock; /* mutex for node */ #endif - char *pa_conninfo; /* path to the node */ - int pa_connretry; /* connection retry interval */ + char *pa_conninfo; /* path to the node */ + int pa_connretry; /* connection retry interval */ - int64 last_event; /* last event we have received */ + int64 last_event; /* last event we have received */ - SlonThreadStatus listen_status; /* status of the listen thread */ - pthread_t listen_thread; /* thread id of listen thread */ - SlonListen *listen_head; /* list of origins we listen for */ - SlonListen *listen_tail; + SlonThreadStatus listen_status; /* status of the listen thread */ + pthread_t listen_thread; /* thread id of listen thread */ + SlonListen *listen_head; /* list of origins we listen for */ + SlonListen *listen_tail; - SlonThreadStatus worker_status; /* status of the worker thread */ - pthread_t worker_thread; /* thread id of worker thread */ - pthread_mutex_t message_lock; /* mutex for the message queue */ - pthread_cond_t message_cond; /* condition variable for queue */ - SlonWorkMsg *message_head; - SlonWorkMsg *message_tail; + SlonThreadStatus worker_status; /* status of the worker thread */ + pthread_t worker_thread; /* thread id of worker thread */ + pthread_mutex_t message_lock; /* mutex for the message queue */ + pthread_cond_t message_cond; /* condition variable for queue */ + SlonWorkMsg *message_head; + SlonWorkMsg *message_tail; - char *archive_name; - char *archive_temp; - char *archive_counter; - char *archive_timestamp; - FILE *archive_fp; + char *archive_name; + char *archive_temp; + char *archive_counter; + char *archive_timestamp; + FILE *archive_fp; - SlonNode *prev; - SlonNode *next; + SlonNode *prev; + SlonNode *next; }; /* ---------- @@ -124,10 +124,10 @@ struct SlonNode_s */ struct SlonListen_s { - int li_origin; /* origin of events */ + int li_origin; /* origin of events */ - SlonListen *prev; - SlonListen *next; + SlonListen *prev; + SlonListen *next; }; /* ---------- @@ -136,17 +136,17 @@ struct SlonListen_s */ struct SlonSet_s { - int set_id; /* set ID */ - int set_origin; /* set origin */ - char *set_comment; /* set comment */ + int set_id; /* set ID */ + int set_origin; /* set origin */ + char *set_comment; /* set comment */ - int sub_provider; /* from where this node receives */ - /* data (if subscribed) */ - int sub_forward; /* if we need to forward data */ - int sub_active; /* if the subscription is active */ + int sub_provider; /* from where this node receives */ + /* data (if subscribed) */ + int sub_forward; /* if we need to forward data */ + int sub_active; /* if the subscription is active */ - SlonSet *prev; - SlonSet *next; + SlonSet *prev; + SlonSet *next; }; /* ---------- @@ -155,18 +155,18 @@ struct SlonSet_s */ struct SlonConn_s { - char *symname; /* Symbolic name of connection */ - struct SlonNode_s *node; /* remote node this belongs to */ - PGconn *dbconn; /* database connection */ - pthread_mutex_t conn_lock; /* mutex for conn */ - pthread_cond_t conn_cond; /* condition variable for conn */ - - int condition; /* what are we waiting for? */ - struct timeval timeout; /* timeofday for timeout */ - int pg_version; /* PostgreSQL version */ - - SlonConn *prev; - SlonConn *next; + char *symname; /* Symbolic name of connection */ + struct SlonNode_s *node; /* remote node this belongs to */ + PGconn *dbconn; /* database connection */ + pthread_mutex_t conn_lock; /* mutex for conn */ + pthread_cond_t conn_cond; /* condition variable for conn */ + + int condition; /* what are we waiting for? */ + struct timeval timeout; /* timeofday for timeout */ + int pg_version; /* PostgreSQL version */ + + SlonConn *prev; + SlonConn *next; }; /* ---------- @@ -178,73 +178,73 @@ struct SlonConn_s typedef struct { - size_t n_alloc; - size_t n_used; - char *data; + size_t n_alloc; + size_t n_used; + char *data; } SlonDString; #define dstring_init(__ds) \ do { \ - (__ds)->n_alloc = SLON_DSTRING_SIZE_INIT; \ - (__ds)->n_used = 0; \ - (__ds)->data = malloc(SLON_DSTRING_SIZE_INIT); \ - if ((__ds)->data == NULL) { \ - slon_log(SLON_FATAL, "dstring_init: malloc() - %s", \ - strerror(errno)); \ - slon_abort(); \ - } \ + (__ds)->n_alloc = SLON_DSTRING_SIZE_INIT; \ + (__ds)->n_used = 0; \ + (__ds)->data = malloc(SLON_DSTRING_SIZE_INIT); \ + if ((__ds)->data == NULL) { \ + slon_log(SLON_FATAL, "dstring_init: malloc() - %s", \ + strerror(errno)); \ + slon_abort(); \ + } \ } while (0) #define dstring_reset(__ds) \ do { \ - (__ds)->n_used = 0; \ - (__ds)->data[0] = '\0'; \ + (__ds)->n_used = 0; \ + (__ds)->data[0] = '\0'; \ } while (0) #define dstring_free(__ds) \ do { \ - free((__ds)->data); \ - (__ds)->n_used = 0; \ - (__ds)->data = NULL; \ + free((__ds)->data); \ + (__ds)->n_used = 0; \ + (__ds)->data = NULL; \ } while (0) #define dstring_nappend(__ds,__s,__n) \ do { \ - if ((__ds)->n_used + (__n) >= (__ds)->n_alloc) \ - { \ - while ((__ds)->n_used + (__n) >= (__ds)->n_alloc) \ - (__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \ - (__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \ - if ((__ds)->data == NULL) \ - { \ - slon_log(SLON_FATAL, "dstring_nappend: realloc() - %s", \ - strerror(errno)); \ - slon_abort(); \ - } \ - } \ - memcpy(&((__ds)->data[(__ds)->n_used]), (__s), (__n)); \ - (__ds)->n_used += (__n); \ + if ((__ds)->n_used + (__n) >= (__ds)->n_alloc) \ + { \ + while ((__ds)->n_used + (__n) >= (__ds)->n_alloc) \ + (__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \ + (__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \ + if ((__ds)->data == NULL) \ + { \ + slon_log(SLON_FATAL, "dstring_nappend: realloc() - %s", \ + strerror(errno)); \ + slon_abort(); \ + } \ + } \ + memcpy(&((__ds)->data[(__ds)->n_used]), (__s), (__n)); \ + (__ds)->n_used += (__n); \ } while (0) #define dstring_append(___ds,___s) \ do { \ - register int ___n = strlen((___s)); \ - dstring_nappend((___ds),(___s),___n); \ + register int ___n = strlen((___s)); \ + dstring_nappend((___ds),(___s),___n); \ } while (0) #define dstring_addchar(__ds,__c) \ do { \ - if ((__ds)->n_used + 1 >= (__ds)->n_alloc) \ - { \ - (__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \ - (__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \ - if ((__ds)->data == NULL) \ - { \ - slon_log(SLON_FATAL, "dstring_addchar: realloc() - %s", \ - strerror(errno)); \ - slon_abort(); \ - } \ - } \ - (__ds)->data[(__ds)->n_used++] = (__c); \ + if ((__ds)->n_used + 1 >= (__ds)->n_alloc) \ + { \ + (__ds)->n_alloc *= SLON_DSTRING_SIZE_INC; \ + (__ds)->data = realloc((__ds)->data, (__ds)->n_alloc); \ + if ((__ds)->data == NULL) \ + { \ + slon_log(SLON_FATAL, "dstring_addchar: realloc() - %s", \ + strerror(errno)); \ + slon_abort(); \ + } \ + } \ + (__ds)->data[(__ds)->n_used++] = (__c); \ } while (0) #define dstring_terminate(__ds) \ do { \ - (__ds)->data[(__ds)->n_used] = '\0'; \ + (__ds)->data[(__ds)->n_used] = '\0'; \ } while (0) #define dstring_data(__ds) ((__ds)->data) @@ -255,43 +255,43 @@ do { \ */ #define DLLIST_ADD_TAIL(_pf,_pl,_obj) \ do { \ - if ((_pl) == NULL) { \ - (_obj)->prev = (_obj)->next = NULL; \ - (_pf) = (_pl) = (_obj); \ - } else { \ - (_obj)->prev = (_pl); \ - (_obj)->next = NULL; \ - (_pl)->next = (_obj); \ - (_pl) = (_obj); \ - } \ + if ((_pl) == NULL) { \ + (_obj)->prev = (_obj)->next = NULL; \ + (_pf) = (_pl) = (_obj); \ + } else { \ + (_obj)->prev = (_pl); \ + (_obj)->next = NULL; \ + (_pl)->next = (_obj); \ + (_pl) = (_obj); \ + } \ } while (0) #define DLLIST_ADD_HEAD(_pf,_pl,_obj) \ do { \ - if ((_pf) == NULL) { \ - (_obj)->prev = (_obj)->next = NULL; \ - (_pf) = (_pl) = (_obj); \ - } else { \ - (_obj)->prev = NULL; \ - (_obj)->next = (_pf); \ - (_pf)->prev = (_obj); \ - (_pf) = (_obj); \ - } \ + if ((_pf) == NULL) { \ + (_obj)->prev = (_obj)->next = NULL; \ + (_pf) = (_pl) = (_obj); \ + } else { \ + (_obj)->prev = NULL; \ + (_obj)->next = (_pf); \ + (_pf)->prev = (_obj); \ + (_pf) = (_obj); \ + } \ } while (0) #define DLLIST_REMOVE(_pf,_pl,_obj) \ do { \ - if ((_obj)->prev == NULL) { \ - (_pf) = (_obj)->next; \ - } else { \ - (_obj)->prev->next = (_obj)->next; \ - } \ - if ((_obj)->next == NULL) { \ - (_pl) = (_obj)->prev; \ - } else { \ - (_obj)->next->prev = (_obj)->prev; \ - } \ - (_obj)->prev = (_obj)->next = NULL; \ + if ((_obj)->prev == NULL) { \ + (_pf) = (_obj)->next; \ + } else { \ + (_obj)->prev->next = (_obj)->next; \ + } \ + if ((_obj)->next == NULL) { \ + (_pl) = (_obj)->prev; \ + } else { \ + (_obj)->next->prev = (_obj)->prev; \ + } \ + (_obj)->prev = (_obj)->next = NULL; \ } while (0) @@ -303,9 +303,9 @@ do { \ * ---------- */ #define TIMEVAL_DIFF(_t1,_t2) \ - (((_t1)->tv_usec <= (_t2)->tv_usec) ? \ - (double)((_t2)->tv_sec - (_t1)->tv_sec) + (double)((_t2)->tv_usec - (_t1)->tv_usec) / 1000000.0 : \ - (double)((_t2)->tv_sec - (_t1)->tv_sec - 1) + (double)((_t2)->tv_usec + 1000000 - (_t1)->tv_usec) / 1000000.0) + (((_t1)->tv_usec <= (_t2)->tv_usec) ? \ + (double)((_t2)->tv_sec - (_t1)->tv_sec) + (double)((_t2)->tv_usec - (_t1)->tv_usec) / 1000000.0 : \ + (double)((_t2)->tv_sec - (_t1)->tv_sec - 1) + (double)((_t2)->tv_usec + 1000000 - (_t1)->tv_usec) / 1000000.0) /* ---------- @@ -314,11 +314,11 @@ do { \ */ typedef enum { - SCHED_STATUS_OK, - SCHED_STATUS_SHUTDOWN, - SCHED_STATUS_DONE, - SCHED_STATUS_CANCEL, - SCHED_STATUS_ERROR + SCHED_STATUS_OK, + SCHED_STATUS_SHUTDOWN, + SCHED_STATUS_DONE, + SCHED_STATUS_CANCEL, + SCHED_STATUS_ERROR } ScheduleStatus; /* ---------- @@ -363,54 +363,54 @@ extern SlonSet *rtcfg_set_list_tail; #ifndef WIN32 #define slon_abort() \ do { \ - pthread_mutex_lock(&slon_watchdog_lock); \ - if (slon_watchdog_pid >= 0) { \ - slon_log(SLON_DEBUG2, "slon_abort() from pid=%d\n", slon_pid); \ - (void) kill(slon_watchdog_pid, SIGTERM); \ - slon_watchdog_pid = -1; \ - } \ - pthread_mutex_unlock(&slon_watchdog_lock); \ - pthread_exit(NULL); \ + pthread_mutex_lock(&slon_watchdog_lock); \ + if (slon_watchdog_pid >= 0) { \ + slon_log(SLON_DEBUG2, "slon_abort() from pid=%d\n", slon_pid); \ + (void) kill(slon_watchdog_pid, SIGTERM); \ + slon_watchdog_pid = -1; \ + } \ + pthread_mutex_unlock(&slon_watchdog_lock); \ + pthread_exit(NULL); \ } while (0) #define slon_restart() \ do { \ - pthread_mutex_lock(&slon_watchdog_lock); \ - if (slon_watchdog_pid >= 0) { \ - slon_log(SLON_DEBUG2, "slon_restart() from pid=%d\n", slon_pid); \ - (void) kill(slon_watchdog_pid, SIGHUP); \ - slon_watchdog_pid = -1; \ - } \ - pthread_mutex_unlock(&slon_watchdog_lock); \ - pthread_exit(NULL); \ + pthread_mutex_lock(&slon_watchdog_lock); \ + if (slon_watchdog_pid >= 0) { \ + slon_log(SLON_DEBUG2, "slon_restart() from pid=%d\n", slon_pid); \ + (void) kill(slon_watchdog_pid, SIGHUP); \ + slon_watchdog_pid = -1; \ + } \ + pthread_mutex_unlock(&slon_watchdog_lock); \ + pthread_exit(NULL); \ } while (0) #define slon_retry() \ do { \ - pthread_mutex_lock(&slon_watchdog_lock); \ - if (slon_watchdog_pid >= 0) { \ - slon_log(SLON_DEBUG2, "slon_retry() from pid=%d\n", slon_pid); \ - (void) kill(slon_watchdog_pid, SIGUSR1); \ - slon_watchdog_pid = -1; \ - } \ - pthread_mutex_unlock(&slon_watchdog_lock); \ - pthread_exit(NULL); \ + pthread_mutex_lock(&slon_watchdog_lock); \ + if (slon_watchdog_pid >= 0) { \ + slon_log(SLON_DEBUG2, "slon_retry() from pid=%d\n", slon_pid); \ + (void) kill(slon_watchdog_pid, SIGUSR1); \ + slon_watchdog_pid = -1; \ + } \ + pthread_mutex_unlock(&slon_watchdog_lock); \ + pthread_exit(NULL); \ } while (0) #else /* WIN32 */ /* On win32, we currently just bail out and let the service control manager * deal with possible restarts */ #define slon_abort() \ do { \ - WSACleanup(); \ - exit(1); \ + WSACleanup(); \ + exit(1); \ } while (0) #define slon_restart() \ do { \ - WSACleanup(); \ - exit(1); \ + WSACleanup(); \ + exit(1); \ } while (0) #define slon_retry() \ do { \ - WSACleanup(); \ - exit(1); \ + WSACleanup(); \ + exit(1); \ } while (0) #endif @@ -436,7 +436,7 @@ extern int64 rtcfg_setNodeLastEvent(int no_id, int64 event_seq); extern int64 rtcfg_getNodeLastEvent(int no_id); extern void rtcfg_storePath(int pa_server, char *pa_conninfo, - int pa_connretry); + int pa_connretry); extern void rtcfg_dropPath(int pa_server); extern void rtcfg_reloadListen(PGconn *db); @@ -446,12 +446,12 @@ extern void rtcfg_dropListen(int li_origin, int li_provider); extern void rtcfg_storeSet(int set_id, int set_origin, char *set_comment); extern void rtcfg_dropSet(int set_id); extern void rtcfg_moveSet(int set_id, int old_origin, int new_origin, - int sub_provider); + int sub_provider); extern void rtcfg_storeSubscribe(int sub_set, int sub_provider, - char *sub_forward); + char *sub_forward); extern void rtcfg_enableSubscription(int sub_set, int sub_provider, - char *sub_forward); + char *sub_forward); extern void rtcfg_unsubscribeSet(int sub_set); extern void rtcfg_needActivate(int no_id); @@ -497,13 +497,6 @@ extern int sync_interval_timeout; */ extern void *syncThread_main(void *dummy); -/* ---------- - * Functions in snmp_thread.c - * ---------- - */ -extern void *snmpThread_main(void *dummy); - - /* ---------- * Functions in local_listen.c * ---------- @@ -533,18 +526,18 @@ extern int sync_max_largemem; */ extern void *remoteWorkerThread_main(void *cdata); extern void remoteWorker_event(int event_provider, - int ev_origin, int64 ev_seqno, - char *ev_timestamp, - char *ev_snapshot, char *ev_mintxid, char *ev_maxtxid, - char *ev_type, - char *ev_data1, char *ev_data2, - char *ev_data3, char *ev_data4, - char *ev_data5, char *ev_data6, - char *ev_data7, char *ev_data8); + int ev_origin, int64 ev_seqno, + char *ev_timestamp, + char *ev_snapshot, char *ev_mintxid, char *ev_maxtxid, + char *ev_type, + char *ev_data1, char *ev_data2, + char *ev_data3, char *ev_data4, + char *ev_data5, char *ev_data6, + char *ev_data7, char *ev_data8); extern void remoteWorker_wakeup(int no_id); extern void remoteWorker_confirm(int no_id, - char *con_origin_c, char *con_received_c, - char *con_seqno_c, char *con_timestamp_c); + char *con_origin_c, char *con_received_c, + char *con_seqno_c, char *con_timestamp_c); /* ---------- diff --git a/src/slon/snmp_thread.c b/src/slon/snmp_thread.c deleted file mode 100644 index 711e1b34..00000000 --- a/src/slon/snmp_thread.c +++ /dev/null @@ -1,107 +0,0 @@ -#include - -#include -#include -#include -#include -#include - -#include "slon.h" - -#include -#include - - -extern int slon_log_level; - -void -init_nstAgentSubagentObject(void) -{ - static oid nstAgentSubagentObject_oid[] = - {1, 3, 6, 1, 4, 1, 20366, 32, 2, 3, 32, 1}; - - netsnmp_register_int_instance("nstAgentSubagentObject", - nstAgentSubagentObject_oid, - OID_LENGTH(nstAgentSubagentObject_oid), - &slon_log_level, NULL); -} - - -void * -snmpThread_main(void *dummy) -{ - int agentx_subagent = 1; - - - /************************************************************************* - * Check configuration to see if have been asked to run as a - * master agent (0) the default is to run as a subagent (1) - ************************************************************************/ - - - /************************************************************************ - * we really should make sure that we point snmp_log to slony_log, this - * work is being saved for a later date once I better understand how - * snmp_log works - ************************************************************************/ - - if (agentx_subagent) - { - /* make us a agentx client. */ - netsnmp_ds_set_boolean(NETSNMP_DS_APPLICATION_ID, - NETSNMP_DS_AGENT_ROLE, 1); - - /* - * ******************************************************************** - * If we are running slon as root allow the snmp agent to have full - * access to it's internals, this is required to run as a master agent - * (from my understanding) ******************************************************************** - */ - - if (getuid() != 0) - { - netsnmp_ds_set_boolean(NETSNMP_DS_APPLICATION_ID, - NETSNMP_DS_AGENT_NO_ROOT_ACCESS, 1); - } - } - - init_agent("slon-demon"); - - - /* - * initialize the mib code found in: init_nstAgentSubagentObject from - * nstAgentSubagentObject.C - */ - init_nstAgentSubagentObject(); - - /* initialize vacm/usm access control */ -/* - if (!agentx_subagent) - { - init_vacm_vars(); - init_usmUser(); - } -*/ - init_snmp("slon-demon"); - - /* If we're going to be a snmp master agent, initial the ports */ - if (!agentx_subagent) - { - init_master_agent(); /* open the port to listen on (defaults to - * udp:161) */ - } - - while (true) - { - /*********************************************************************** - * Begin processing snmp requests we can pass in 0 if we rather do this - * in a non blocking method - ***********************************************************************/ - - agent_check_and_process(1); /* 0 == don't block */ - - } - - snmp_shutdown("slon-demon"); - pthread_exit(NULL); -}