X-Git-Url: https://git.exim.org/users/jgh/exim.git/blobdiff_plain/d7978c0f8af20ff4c3f770589b1bb81568aecff3..92562f63be6fae2526d68171d60bf87027551f88:/src/src/queue.c diff --git a/src/src/queue.c b/src/src/queue.c index 617c267a3..d472b9851 100644 --- a/src/src/queue.c +++ b/src/src/queue.c @@ -12,39 +12,6 @@ -/* Routines with knowledge of spool layout */ - -#ifndef COMPILE_UTILITY -static void -spool_pname_buf(uschar * buf, int len) -{ -snprintf(CS buf, len, "%s/%s/input", spool_directory, queue_name); -} - -uschar * -spool_dname(const uschar * purpose, uschar * subdir) -{ -return string_sprintf("%s/%s/%s/%s", - spool_directory, queue_name, purpose, subdir); -} -#endif - -uschar * -spool_sname(const uschar * purpose, uschar * subdir) -{ -return string_sprintf("%s%s%s%s%s", - queue_name, *queue_name ? "/" : "", - purpose, - *subdir ? "/" : "", subdir); -} - -uschar * -spool_fname(const uschar * purpose, const uschar * subdir, const uschar * fname, - const uschar * suffix) -{ -return string_sprintf("%s/%s/%s/%s/%s%s", - spool_directory, queue_name, purpose, subdir, fname, suffix); -} @@ -239,7 +206,7 @@ for (; i <= *subcount; i++) Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0) { queue_filename *next = - store_get(sizeof(queue_filename) + Ustrlen(name)); + store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name)); Ustrcpy(next->text, name); next->dir_uschar = subdirchar; @@ -379,6 +346,11 @@ const pcre *selectstring_regex_sender = NULL; uschar *log_detail = NULL; int subcount = 0; uschar subdirs[64]; +pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */ + +#ifdef MEASURE_TIMING +report_time_since(×tamp_startup, US"queue_run start"); +#endif /* Cancel any specific queue domains. Turn off the flag that causes SMTP deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag @@ -457,7 +429,7 @@ for (int i = queue_run_in_order ? -1 : 0; i <= (queue_run_in_order ? -1 : subcount); i++) { - void *reset_point1 = store_get(0); + rmark reset_point1 = store_mark(); DEBUG(D_queue_run) { @@ -497,18 +469,40 @@ for (int i = queue_run_in_order ? -1 : 0; (double)load_average/1000.0, (double)deliver_queue_load_max/1000.0); + /* If initial of a 2-phase run, maintain a set of child procs + to get disk parallelism */ + + if (f.queue_2stage && !queue_run_in_order) + { + int i; + if (qpid[nelem(qpid) - 1]) + { + DEBUG(D_queue_run) debug_printf("q2stage waiting for child\n"); + waitpid(qpid[0], NULL, 0); + DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]); + for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1]; + qpid[i] = 0; + } + else + for (i = 0; qpid[i]; ) i++; + DEBUG(D_queue_run) debug_printf("q2stage forking\n"); + if ((qpid[i] = fork())) + continue; /* parent loops around */ + DEBUG(D_queue_run) debug_printf("q2stage child\n"); + } + /* Skip this message unless it's within the ID limits */ if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0) - continue; + goto go_around; if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0) - continue; + goto go_around; /* Check that the message still exists */ message_subdir[0] = fq->dir_uschar; if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0) - continue; + goto go_around; /* There are some tests that require the reading of the header file. Ensure the store used is scavenged afterwards so that this process doesn't keep @@ -521,14 +515,14 @@ for (int i = queue_run_in_order ? -1 : 0; { BOOL wanted = TRUE; BOOL orig_dont_deliver = f.dont_deliver; - void *reset_point2 = store_get(0); + rmark reset_point2 = store_mark(); /* Restore the original setting of dont_deliver after reading the header, so that a setting for a particular message doesn't force it for any that follow. If the message is chosen for delivery, the header is read again in the deliver_message() function, in a subprocess. */ - if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue; + if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around; f.dont_deliver = orig_dont_deliver; /* Now decide if we want to deliver this message. As we have read the @@ -600,7 +594,7 @@ for (int i = queue_run_in_order ? -1 : 0; spool_clear_header_globals(); store_reset(reset_point2); - if (!wanted) continue; /* With next message */ + if (!wanted) goto go_around; /* With next message */ } /* OK, got a message we want to deliver. Create a pipe which will @@ -641,13 +635,17 @@ for (int i = queue_run_in_order ? -1 : 0; set_process_info("running queue: %s", fq->text); fq->text[SPOOL_NAME_LENGTH-2] = 0; +#ifdef MEASURE_TIMING + report_time_since(×tamp_startup, US"queue msg selected"); +#endif + if ((pid = fork()) == 0) { int rc; - if (f.running_in_test_harness) millisleep(100); + testharness_pause_ms(100); (void)close(pfd[pipe_read]); rc = deliver_message(fq->text, force_delivery, FALSE); - _exit(rc == DELIVER_NOT_ATTEMPTED); + exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED); } if (pid < 0) log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from " @@ -663,11 +661,11 @@ for (int i = queue_run_in_order ? -1 : 0; /* A zero return means a delivery was attempted; turn off the force flag for any subsequent calls unless queue_force is set. */ - if ((status & 0xffff) == 0) force_delivery = f.queue_run_force; + if (!(status & 0xffff)) force_delivery = f.queue_run_force; /* If the process crashed, tell somebody */ - else if ((status & 0x00ff) != 0) + else if (status & 0x00ff) log_write(0, LOG_MAIN|LOG_PANIC, "queue run: process %d crashed with signal %d while delivering %s", (int)pid, status & 0x00ff, fq->text); @@ -679,19 +677,32 @@ for (int i = queue_run_in_order ? -1 : 0; set_process_info("running queue: waiting for children of %d", pid); if ((status = read(pfd[pipe_read], buffer, sizeof(buffer))) != 0) - log_write(0, LOG_MAIN|LOG_PANIC, "queue run: %s on pipe", - status > 0 ? "unexpected data" : "error"); + log_write(0, LOG_MAIN|LOG_PANIC, status > 0 ? + "queue run: unexpected data on pipe" : "queue run: error on pipe: %s", + strerror(errno)); (void)close(pfd[pipe_read]); set_process_info("running queue"); + /* If initial of a 2-phase run, we are a child - so just exit */ + if (f.queue_2stage && !queue_run_in_order) + exim_exit(EXIT_SUCCESS, US"2-phase child"); + /* If we are in the test harness, and this is not the first of a 2-stage queue run, update fudged queue times. */ if (f.running_in_test_harness && !f.queue_2stage) { - uschar *fqtnext = Ustrchr(fudged_queue_times, '/'); - if (fqtnext != NULL) fudged_queue_times = fqtnext + 1; + uschar * fqtnext = Ustrchr(fudged_queue_times, '/'); + if (fqtnext) fudged_queue_times = fqtnext + 1; } + + + continue; + + go_around: + /* If initial of a 2-phase run, we are a child - so just exit */ + if (f.queue_2stage && !queue_run_in_order) + exim_exit(EXIT_SUCCESS, US"2-phase child"); } /* End loop for list of messages */ tree_nonrecipients = NULL; @@ -719,6 +730,19 @@ turned off. */ if (f.queue_2stage) { + + /* wait for last children */ + for (int i = 0; i < nelem(qpid); i++) + if (qpid[i]) + { + DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]); + waitpid(qpid[i], NULL, 0); + } + else break; + +#ifdef MEASURE_TIMING + report_time_since(×tamp_startup, US"queue_run 1st phase done"); +#endif f.queue_2stage = FALSE; queue_run(start_id, stop_id, TRUE); } @@ -778,11 +802,12 @@ Argument: points to the tree node Returns: nothing */ -static void queue_list_extras(tree_node *p) +static void +queue_list_extras(tree_node *p) { -if (p->left != NULL) queue_list_extras(p->left); +if (p->left) queue_list_extras(p->left); if (!p->data.val) printf(" +D %s\n", p->name); -if (p->right != NULL) queue_list_extras(p->right); +if (p->right) queue_list_extras(p->right); } @@ -815,7 +840,7 @@ queue_list(int option, uschar **list, int count) { int subcount; int now = (int)time(NULL); -void *reset_point; +rmark reset_point; queue_filename * qf = NULL; uschar subdirs[64]; @@ -827,7 +852,7 @@ if (count > 0) for (int i = 0; i < count; i++) { queue_filename *next = - store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2); + store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2, is_tainted(list[i])); sprintf(CS next->text, "%s-H", list[i]); next->dir_uschar = '*'; next->next = NULL; @@ -850,8 +875,8 @@ if (option >= 8) option -= 8; /* Now scan the chain and print information, resetting store used each time. */ -for (reset_point = store_get(0); - qf; +for (; + qf && (reset_point = store_mark()); spool_clear_header_globals(), store_reset(reset_point), qf = qf->next ) { @@ -1033,7 +1058,7 @@ if (action >= MSG_SHOW_BODY) for (int i = 0; i < 2; i++) { - message_subdir[0] = split_spool_directory == (i == 0) ? id[5] : 0; + set_subdir_str(message_subdir, id, i); if ((fd = Uopen(spool_fname(subdirectory, message_subdir, id, suffix), O_RDONLY, 0)) >= 0) break; @@ -1251,7 +1276,7 @@ switch(action) if (removed) { #ifndef DISABLE_EVENT - for (int i = 0; i < recipients_count; i++) + if (event_action) for (int i = 0; i < recipients_count; i++) { tree_node *delivered = tree_search(tree_nonrecipients, recipients_list[i].address); @@ -1289,6 +1314,14 @@ switch(action) } + case MSG_SETQUEUE: + /* The global "queue_name_dest" is used as destination, "queue_name" + as source */ + + spool_move_message(id, message_subdir, US"", US""); + break; + + case MSG_MARK_ALL_DELIVERED: for (int i = 0; i < recipients_count; i++) tree_add_nonrecipient(recipients_list[i].address); @@ -1436,33 +1469,26 @@ queue_check_only(void) { int sep = 0; struct stat statbuf; -const uschar *s; -uschar *ss; -uschar buffer[1024]; - -if (queue_only_file == NULL) return; +const uschar * s = queue_only_file; +uschar * ss; -s = queue_only_file; -while ((ss = string_nextinlist(&s, &sep, buffer, sizeof(buffer))) != NULL) - { - if (Ustrncmp(ss, "smtp", 4) == 0) - { - ss += 4; - if (Ustat(ss, &statbuf) == 0) - { - f.queue_smtp = TRUE; - DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss); - } - } - else - { - if (Ustat(ss, &statbuf) == 0) +if (s) + while ((ss = string_nextinlist(&s, &sep, NULL, 0))) + if (Ustrncmp(ss, "smtp", 4) == 0) { - queue_only = TRUE; - DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss); + ss += 4; + if (Ustat(ss, &statbuf) == 0) + { + f.queue_smtp = TRUE; + DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss); + } } - } - } + else + if (Ustat(ss, &statbuf) == 0) + { + queue_only = TRUE; + DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss); + } } #endif /*!COMPILE_UTILITY*/