X-Git-Url: https://git.exim.org/exim.git/blobdiff_plain/fc7bae7f2fa3668ada9bd173e9f24c7166e1dd13..ff9663026d1a318d385730c4a2c3e85508b4b00b:/src/src/queue.c diff --git a/src/src/queue.c b/src/src/queue.c index d9ff13375..3c72eade6 100644 --- a/src/src/queue.c +++ b/src/src/queue.c @@ -346,6 +346,7 @@ const pcre *selectstring_regex_sender = NULL; uschar *log_detail = NULL; int subcount = 0; uschar subdirs[64]; +pid_t qpid[1] = {0}; /* Parallelism factor for q2stage 1st phase */ #ifdef MEASURE_TIMING report_time_since(×tamp_startup, US"queue_run start"); @@ -468,18 +469,40 @@ for (int i = queue_run_in_order ? -1 : 0; (double)load_average/1000.0, (double)deliver_queue_load_max/1000.0); + /* If initial of a 2-phase run, maintain a set of child procs + to get disk parallelism */ + + if (f.queue_2stage && !queue_run_in_order) + { + int i; + if (qpid[nelem(qpid) - 1]) + { + DEBUG(D_queue_run) debug_printf("q2stage waiting for child\n"); + waitpid(qpid[0], NULL, 0); + DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]); + for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1]; + qpid[i] = 0; + } + else + for (i = 0; qpid[i]; ) i++; + DEBUG(D_queue_run) debug_printf("q2stage forking\n"); + if ((qpid[i] = fork())) + continue; /* parent loops around */ + DEBUG(D_queue_run) debug_printf("q2stage child\n"); + } + /* Skip this message unless it's within the ID limits */ if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0) - continue; + goto go_around; if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0) - continue; + goto go_around; /* Check that the message still exists */ message_subdir[0] = fq->dir_uschar; if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0) - continue; + goto go_around; /* There are some tests that require the reading of the header file. Ensure the store used is scavenged afterwards so that this process doesn't keep @@ -499,7 +522,7 @@ for (int i = queue_run_in_order ? -1 : 0; follow. If the message is chosen for delivery, the header is read again in the deliver_message() function, in a subprocess. */ - if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue; + if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around; f.dont_deliver = orig_dont_deliver; /* Now decide if we want to deliver this message. As we have read the @@ -571,7 +594,7 @@ for (int i = queue_run_in_order ? -1 : 0; spool_clear_header_globals(); store_reset(reset_point2); - if (!wanted) continue; /* With next message */ + if (!wanted) goto go_around; /* With next message */ } /* OK, got a message we want to deliver. Create a pipe which will @@ -638,11 +661,11 @@ for (int i = queue_run_in_order ? -1 : 0; /* A zero return means a delivery was attempted; turn off the force flag for any subsequent calls unless queue_force is set. */ - if ((status & 0xffff) == 0) force_delivery = f.queue_run_force; + if (!(status & 0xffff)) force_delivery = f.queue_run_force; /* If the process crashed, tell somebody */ - else if ((status & 0x00ff) != 0) + else if (status & 0x00ff) log_write(0, LOG_MAIN|LOG_PANIC, "queue run: process %d crashed with signal %d while delivering %s", (int)pid, status & 0x00ff, fq->text); @@ -654,11 +677,16 @@ for (int i = queue_run_in_order ? -1 : 0; set_process_info("running queue: waiting for children of %d", pid); if ((status = read(pfd[pipe_read], buffer, sizeof(buffer))) != 0) - log_write(0, LOG_MAIN|LOG_PANIC, "queue run: %s on pipe", - status > 0 ? "unexpected data" : "error"); + log_write(0, LOG_MAIN|LOG_PANIC, status > 0 ? + "queue run: unexpected data on pipe" : "queue run: error on pipe: %s", + strerror(errno)); (void)close(pfd[pipe_read]); set_process_info("running queue"); + /* If initial of a 2-phase run, we are a child - so just exit */ + if (f.queue_2stage && !queue_run_in_order) + exim_exit(EXIT_SUCCESS, US"2-phase child"); + /* If we are in the test harness, and this is not the first of a 2-stage queue run, update fudged queue times. */ @@ -667,6 +695,14 @@ for (int i = queue_run_in_order ? -1 : 0; uschar * fqtnext = Ustrchr(fudged_queue_times, '/'); if (fqtnext) fudged_queue_times = fqtnext + 1; } + + + continue; + + go_around: + /* If initial of a 2-phase run, we are a child - so just exit */ + if (f.queue_2stage && !queue_run_in_order) + exim_exit(EXIT_SUCCESS, US"2-phase child"); } /* End loop for list of messages */ tree_nonrecipients = NULL; @@ -694,6 +730,19 @@ turned off. */ if (f.queue_2stage) { + + /* wait for last children */ + for (int i = 0; i < nelem(qpid); i++) + if (qpid[i]) + { + DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]); + waitpid(qpid[i], NULL, 0); + } + else break; + +#ifdef MEASURE_TIMING + report_time_since(×tamp_startup, US"queue_run 1st phase done"); +#endif f.queue_2stage = FALSE; queue_run(start_id, stop_id, TRUE); } @@ -1420,34 +1469,60 @@ queue_check_only(void) { int sep = 0; struct stat statbuf; -const uschar *s; -uschar *ss; -uschar buffer[1024]; - -if (queue_only_file == NULL) return; +const uschar * s = queue_only_file; +uschar * ss; -s = queue_only_file; -while ((ss = string_nextinlist(&s, &sep, buffer, sizeof(buffer))) != NULL) - { - if (Ustrncmp(ss, "smtp", 4) == 0) - { - ss += 4; - if (Ustat(ss, &statbuf) == 0) +if (s) + while ((ss = string_nextinlist(&s, &sep, NULL, 0))) + if (Ustrncmp(ss, "smtp", 4) == 0) { - f.queue_smtp = TRUE; - DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss); - } - } - else - { - if (Ustat(ss, &statbuf) == 0) - { - queue_only = TRUE; - DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss); + ss += 4; + if (Ustat(ss, &statbuf) == 0) + { + f.queue_smtp = TRUE; + DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss); + } } - } + else + if (Ustat(ss, &statbuf) == 0) + { + queue_only = TRUE; + DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss); + } +} + + + +/******************************************************************************/ +/******************************************************************************/ + +#ifdef EXPERIMENTAL_QUEUE_RAMP +void +queue_notify_daemon(const uschar * msgid) +{ +uschar buf[MESSAGE_ID_LENGTH + 2]; +int fd; + +DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid); + +buf[0] = NOTIFY_MSG_QRUN; +memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1); + +if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0) + { + struct sockaddr_un sun = {.sun_family = AF_UNIX}; + + snprintf(sun.sun_path, sizeof(sun.sun_path), "%s/%s", + spool_directory, NOTIFIER_SOCKET_NAME); + + if (sendto(fd, buf, sizeof(buf), 0, &sun, sizeof(sun)) < 0) + DEBUG(D_queue_run) + debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno)); + close(fd); } +else DEBUG(D_queue_run) debug_printf(" socket: %s\n", strerror(errno)); } +#endif #endif /*!COMPILE_UTILITY*/