X-Git-Url: https://git.exim.org/exim.git/blobdiff_plain/ae96393855f6ea395a53a1767cc049d1f6ae9683..f168d6a2c93ee92d87744ce09c45a0ec666f889c:/src/src/queue.c diff --git a/src/src/queue.c b/src/src/queue.c index 8876e09be..2c3d014f5 100644 --- a/src/src/queue.c +++ b/src/src/queue.c @@ -2,8 +2,10 @@ * Exim - an Internet mail transport agent * *************************************************/ -/* Copyright (c) University of Cambridge 1995 - 2009 */ +/* Copyright (c) The Exim Maintainers 2020 - 2022 */ +/* Copyright (c) University of Cambridge 1995 - 2018 */ /* See the file NOTICE for conditions of use and distribution. */ +/* SPDX-License-Identifier: GPL-2.0-or-later */ /* Functions that operate on the input queue. */ @@ -12,6 +14,12 @@ + + + + +#ifndef COMPILE_UTILITY + /* The number of nodes to use for the bottom-up merge sort when a list of queue items is to be ordered. The code for this sort was contributed as a patch by Michael Haardt. */ @@ -19,6 +27,9 @@ Michael Haardt. */ #define LOG2_MAXNODES 32 +#ifndef DISABLE_TLS +static BOOL queue_tls_init = FALSE; +#endif /************************************************* * Helper sort function for queue_get_spool_list * @@ -40,9 +51,17 @@ merge_queue_lists(queue_filename *a, queue_filename *b) queue_filename *first = NULL; queue_filename **append = &first; -while (a != NULL && b != NULL) +while (a && b) { - if (Ustrcmp(a->text, b->text) < 0) + int d; + if ((d = Ustrncmp(a->text, b->text, MESSAGE_ID_TIME_LEN)) == 0) + { + BOOL a_old = is_old_message_id(a->text), b_old = is_old_message_id(b->text); + /* Do not worry over the sub-second sorting wrt. old vs. new */ + d = Ustrcmp(a->text + (a_old ? 6+1+6+1 : MESSAGE_ID_TIME_LEN + 1 + MESSAGE_ID_PID_LEN + 1), + b->text + (b_old ? 6+1+6+1 : MESSAGE_ID_TIME_LEN + 1 + MESSAGE_ID_PID_LEN + 1)); + } + if (d < 0) { *append = a; append= &a->next; @@ -56,7 +75,7 @@ while (a != NULL && b != NULL) } } -*append=((a != NULL)? a : b); +*append = a ? a : b; return first; } @@ -101,13 +120,14 @@ Arguments: subdirs vector to store list of subdirchars subcount pointer to int in which to store count of subdirs randomize TRUE if the order of the list is to be unpredictable + pcount If not NULL, fill in with count of files and do not return list Returns: pointer to a chain of queue name items */ static queue_filename * queue_get_spool_list(int subdiroffset, uschar *subdirs, int *subcount, - BOOL randomize) + BOOL randomize, unsigned * pcount) { int i; int flags = 0; @@ -115,8 +135,6 @@ int resetflags = -1; int subptr; queue_filename *yield = NULL; queue_filename *last = NULL; -struct dirent *ent; -DIR *dd; uschar buffer[256]; queue_filename *root[LOG2_MAXNODES]; @@ -125,8 +143,13 @@ according to the bits of the flags variable. Get a collection of bits from the current time. Use the bottom 16 and just keep re-using them if necessary. When not randomizing, initialize the sublists for the bottom-up merge sort. */ -if (randomize) resetflags = time(NULL) & 0xFFFF; - else for (i = 0; i < LOG2_MAXNODES; i++) root[i] = NULL; +if (pcount) + *pcount = 0; +else if (randomize) + resetflags = time(NULL) & 0xFFFF; +else + for (i = 0; i < LOG2_MAXNODES; i++) + root[i] = NULL; /* If processing the full queue, or just the top-level, start at the base directory, and initialize the first subdirectory name (as none). Otherwise, @@ -138,11 +161,13 @@ if (subdiroffset <= 0) subdirs[0] = 0; *subcount = 0; } -else i = subdiroffset; +else + i = subdiroffset; /* Set up prototype for the directory name. */ -sprintf(CS buffer, "%s/input", spool_directory); +spool_pname_buf(buffer, sizeof(buffer)); +buffer[sizeof(buffer) - 3] = 0; subptr = Ustrlen(buffer); buffer[subptr+2] = 0; /* terminator for lengthened name */ @@ -154,6 +179,7 @@ for (; i <= *subcount; i++) { int count = 0; int subdirchar = subdirs[i]; /* 0 for main directory */ + DIR *dd; if (subdirchar != 0) { @@ -161,14 +187,15 @@ for (; i <= *subcount; i++) buffer[subptr+1] = subdirchar; } - dd = opendir(CS buffer); - if (dd == NULL) continue; + DEBUG(D_queue_run) debug_printf("looking in %s\n", buffer); + if (!(dd = exim_opendir(buffer))) + continue; /* Now scan the directory. */ - while ((ent = readdir(dd)) != NULL) + for (struct dirent * ent; ent = readdir(dd); ) { - uschar *name = US ent->d_name; + uschar * name = US ent->d_name; int len = Ustrlen(name); /* Count entries */ @@ -187,66 +214,66 @@ for (; i <= *subcount; i++) /* Otherwise, if it is a header spool file, add it to the list */ - if (len == SPOOL_NAME_LENGTH && - Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0) - { - queue_filename *next = - store_get(sizeof(queue_filename) + Ustrlen(name)); - Ustrcpy(next->text, name); - next->dir_uschar = subdirchar; - - /* Handle the creation of a randomized list. The first item becomes both - the top and bottom of the list. Subsequent items are inserted either at - the top or the bottom, randomly. This is, I argue, faster than doing a - sort by allocating a random number to each item, and it also saves having - to store the number with each item. */ - - if (randomize) - { - if (yield == NULL) - { - next->next = NULL; - yield = last = next; - } - else - { - if (flags == 0) flags = resetflags; - if ((flags & 1) == 0) - { - next->next = yield; - yield = next; - } - else - { - next->next = NULL; - last->next = next; - last = next; - } - flags = flags >> 1; - } - } - - /* Otherwise do a bottom-up merge sort based on the name. */ - + if ( (len == SPOOL_NAME_LENGTH || len == SPOOL_NAME_LENGTH_OLD) + && Ustrcmp(name + len - 2, "-H") == 0 + ) + if (pcount) + (*pcount)++; else - { - int j; - next->next = NULL; - for (j = 0; j < LOG2_MAXNODES; j++) - { - if (root[j] != NULL) - { - next = merge_queue_lists(next, root[j]); - root[j] = (j == LOG2_MAXNODES - 1)? next : NULL; - } - else - { - root[j] = next; - break; - } - } - } - } + { + queue_filename * next = + store_get(sizeof(queue_filename) + len, name); + Ustrcpy(next->text, name); + next->dir_uschar = subdirchar; + + /* Handle the creation of a randomized list. The first item becomes both + the top and bottom of the list. Subsequent items are inserted either at + the top or the bottom, randomly. This is, I argue, faster than doing a + sort by allocating a random number to each item, and it also saves having + to store the number with each item. */ + + if (randomize) + if (!yield) + { + next->next = NULL; + yield = last = next; + } + else + { + if (flags == 0) + flags = resetflags; + if ((flags & 1) == 0) + { + next->next = yield; + yield = next; + } + else + { + next->next = NULL; + last->next = next; + last = next; + } + flags = flags >> 1; + } + + /* Otherwise do a bottom-up merge sort based on the name. */ + + else + { + next->next = NULL; + for (int j = 0; j < LOG2_MAXNODES; j++) + if (root[j]) + { + next = merge_queue_lists(next, root[j]); + root[j] = j == LOG2_MAXNODES - 1 ? next : NULL; + } + else + { + root[j] = next; + break; + } + } + } } /* Finished with this directory */ @@ -264,9 +291,11 @@ for (; i <= *subcount; i++) { if (!split_spool_directory && count <= 2) { + uschar subdir[2]; + rmdir(CS buffer); - sprintf(CS big_buffer, "%s/msglog/%c", spool_directory, subdirchar); - rmdir(CS big_buffer); + subdir[0] = subdirchar; subdir[1] = 0; + rmdir(CS spool_dname(US"msglog", subdir)); } if (subdiroffset > 0) break; /* Single sub-directory */ } @@ -274,16 +303,14 @@ for (; i <= *subcount; i++) /* If we have just scanned the base directory, and subdiroffset is 0, we do not want to continue scanning the sub-directories. */ - else - { - if (subdiroffset == 0) break; - } + else if (subdiroffset == 0) + break; } /* Loop for multiple subdirectories */ /* When using a bottom-up merge sort, do the final merging of the sublists. Then pass back the final list of file items. */ -if (!randomize) +if (!pcount && !randomize) for (i = 0; i < LOG2_MAXNODES; ++i) yield = merge_queue_lists(yield, root[i]); @@ -304,8 +331,8 @@ previous lexically lesser one if the given stop message doesn't exist. Because a queue run can take some time, stat each file before forking, in case it has been delivered in the meantime by some other means. -The global variables queue_run_force and queue_run_local may be set to cause -forced deliveries or local-only deliveries, respectively. +The qrun descriptor variables queue_run_force and queue_run_local may be set to +cause forced deliveries or local-only deliveries, respectively. If deliver_selectstring[_sender] is not NULL, skip messages whose recipients do not contain the string. As this option is typically used when a machine comes @@ -318,6 +345,7 @@ is set so that routing is done for all messages. Thus in the second run those that are routed to the same host should go down the same SMTP connection. Arguments: + q queue-runner descriptor start_id message id to start at, or NULL for all stop_id message id to end at, or NULL for all recurse TRUE if recursing for 2-stage run @@ -326,16 +354,28 @@ Returns: nothing */ void -queue_run(uschar *start_id, uschar *stop_id, BOOL recurse) +queue_run(qrunner * q, const uschar * start_id, const uschar * stop_id, BOOL recurse) { -BOOL force_delivery = queue_run_force || deliver_selectstring != NULL || - deliver_selectstring_sender != NULL; -const pcre *selectstring_regex = NULL; -const pcre *selectstring_regex_sender = NULL; +BOOL force_delivery = q->queue_run_force + || deliver_selectstring || deliver_selectstring_sender; +const pcre2_code *selectstring_regex = NULL; +const pcre2_code *selectstring_regex_sender = NULL; uschar *log_detail = NULL; int subcount = 0; -int i; uschar subdirs[64]; +pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */ +BOOL single_id = FALSE; + +#ifdef MEASURE_TIMING +report_time_since(×tamp_startup, US"queue_run start"); +#endif + +/* Copy the legacy globals from the newer per-qrunner-desc */ + +queue_name = q->name ? q->name : US""; +f.queue_2stage = q->queue_2stage; +f.deliver_force_thaw = q->deliver_force_thaw; +f.queue_run_local = q->queue_run_local; /* Cancel any specific queue domains. Turn off the flag that causes SMTP deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag @@ -345,10 +385,10 @@ on TCP/IP channels have queue_run_pid set, but not queue_running. */ queue_domains = NULL; queue_smtp_domains = NULL; -queue_smtp = queue_2stage; +f.queue_smtp = q->queue_2stage; queue_run_pid = getpid(); -queue_running = TRUE; +f.queue_running = TRUE; /* Log the true start of a queue run, and fancy options */ @@ -357,49 +397,52 @@ if (!recurse) uschar extras[8]; uschar *p = extras; - if (queue_2stage) *p++ = 'q'; - if (queue_run_first_delivery) *p++ = 'i'; - if (queue_run_force) *p++ = 'f'; - if (deliver_force_thaw) *p++ = 'f'; - if (queue_run_local) *p++ = 'l'; + if (q->queue_2stage) *p++ = 'q'; + if (q->queue_run_first_delivery) *p++ = 'i'; + if (q->queue_run_force) *p++ = 'f'; + if (q->deliver_force_thaw) *p++ = 'f'; + if (q->queue_run_local) *p++ = 'l'; *p = 0; p = big_buffer; - sprintf(CS p, "pid=%d", (int)queue_run_pid); - while (*p != 0) p++; + p += sprintf(CS p, "pid=%d", (int)queue_run_pid); if (extras[0] != 0) - { - sprintf(CS p, " -q%s", extras); - while (*p != 0) p++; - } + p += sprintf(CS p, " -q%s", extras); - if (deliver_selectstring != NULL) + if (deliver_selectstring) { - sprintf(CS p, " -R%s %s", deliver_selectstring_regex? "r" : "", - deliver_selectstring); - while (*p != 0) p++; + snprintf(CS p, big_buffer_size - (p - big_buffer), " -R%s %s", + f.deliver_selectstring_regex ? "r" : "", deliver_selectstring); + p += Ustrlen(CCS p); } - if (deliver_selectstring_sender != NULL) + if (deliver_selectstring_sender) { - sprintf(CS p, " -S%s %s", deliver_selectstring_sender_regex? "r" : "", - deliver_selectstring_sender); - while (*p != 0) p++; + snprintf(CS p, big_buffer_size - (p - big_buffer), " -S%s %s", + f.deliver_selectstring_sender_regex ? "r" : "", deliver_selectstring_sender); + p += Ustrlen(CCS p); } log_detail = string_copy(big_buffer); - log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail); + if (q->name) + log_write(L_queue_run, LOG_MAIN, "Start '%s' queue run: %s", + q->name, log_detail); + else + log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail); + + single_id = start_id && stop_id && !q->queue_2stage + && Ustrcmp(start_id, stop_id) == 0; } /* If deliver_selectstring is a regex, compile it. */ -if (deliver_selectstring != NULL && deliver_selectstring_regex) - selectstring_regex = regex_must_compile(deliver_selectstring, TRUE, FALSE); +if (deliver_selectstring && f.deliver_selectstring_regex) + selectstring_regex = regex_must_compile(deliver_selectstring, MCS_CASELESS, FALSE); -if (deliver_selectstring_sender != NULL && deliver_selectstring_sender_regex) +if (deliver_selectstring_sender && f.deliver_selectstring_sender_regex) selectstring_regex_sender = - regex_must_compile(deliver_selectstring_sender, TRUE, FALSE); + regex_must_compile(deliver_selectstring_sender, MCS_CASELESS, FALSE); /* If the spool is split into subdirectories, we want to process it one directory at a time, so as to spread out the directory scanning and the @@ -411,17 +454,16 @@ any messages therein), and then repeats for any subdirectories that were found. When the first argument of queue_get_spool_list() is 0, it scans the top directory, fills in subdirs, and sets subcount. The order of the directories is then randomized after the first time through, before they are scanned in -subsqeuent iterations. +subsequent iterations. When the first argument of queue_get_spool_list() is -1 (for queue_run_in_ order), it scans all directories and makes a single message list. */ -for (i = (queue_run_in_order? -1 : 0); - i <= (queue_run_in_order? -1 : subcount); +for (int i = queue_run_in_order ? -1 : 0; + i <= (queue_run_in_order ? -1 : subcount); i++) { - queue_filename *f; - void *reset_point1 = store_get(0); + rmark reset_point1 = store_mark(); DEBUG(D_queue_run) { @@ -433,9 +475,9 @@ for (i = (queue_run_in_order? -1 : 0); debug_printf("queue running subdirectory '%c'\n", subdirs[i]); } - for (f = queue_get_spool_list(i, subdirs, &subcount, !queue_run_in_order); - f != NULL; - f = f->next) + for (queue_filename * fq = queue_get_spool_list(i, subdirs, &subcount, + !queue_run_in_order, NULL); + fq; fq = fq->next) { pid_t pid; int status; @@ -446,10 +488,8 @@ for (i = (queue_run_in_order? -1 : 0); /* Unless deliveries are forced, if deliver_queue_load_max is non-negative, check that the load average is low enough to permit deliveries. */ - if (!queue_run_force && deliver_queue_load_max >= 0) - { - load_average = os_getloadavg(); - if (load_average > deliver_queue_load_max) + if (!q->queue_run_force && deliver_queue_load_max >= 0) + if ((load_average = os_getloadavg()) > deliver_queue_load_max) { log_write(L_queue_run, LOG_MAIN, "Abandon queue run: %s (load %.2f, max %.2f)", log_detail, @@ -459,26 +499,43 @@ for (i = (queue_run_in_order? -1 : 0); break; } else - { DEBUG(D_load) debug_printf("load average = %.2f max = %.2f\n", (double)load_average/1000.0, (double)deliver_queue_load_max/1000.0); - } + + /* If initial of a 2-phase run, maintain a set of child procs + to get disk parallelism */ + + if (q->queue_2stage && !queue_run_in_order) + { + int i; + if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1]) + { + DEBUG(D_queue_run) debug_printf("q2stage waiting for child %d\n", (int)qpid[0]); + waitpid(qpid[0], NULL, 0); + DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]); + if (f.running_in_test_harness) i = 0; + else for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1]; + qpid[i] = 0; + } + else + for (i = 0; qpid[i]; ) i++; + if ((qpid[i] = exim_fork(US"qrun-phase-one"))) + continue; /* parent loops around */ } /* Skip this message unless it's within the ID limits */ - if (stop_id != NULL && Ustrncmp(f->text, stop_id, MESSAGE_ID_LENGTH) > 0) - continue; - if (start_id != NULL && Ustrncmp(f->text, start_id, MESSAGE_ID_LENGTH) < 0) - continue; + if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0) + goto go_around; + if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0) + goto go_around; /* Check that the message still exists */ - message_subdir[0] = f->dir_uschar; - sprintf(CS buffer, "%s/input/%s/%s", spool_directory, message_subdir, - f->text); - if (Ustat(buffer, &statbuf) < 0) continue; + message_subdir[0] = fq->dir_uschar; + if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0) + goto go_around; /* There are some tests that require the reading of the header file. Ensure the store used is scavenged afterwards so that this process doesn't keep @@ -486,26 +543,26 @@ for (i = (queue_run_in_order? -1 : 0); delivering, but it's cheaper than forking a delivery process for each message when many are not going to be delivered. */ - if (deliver_selectstring != NULL || deliver_selectstring_sender != NULL || - queue_run_first_delivery) + if (deliver_selectstring || deliver_selectstring_sender || + q->queue_run_first_delivery) { BOOL wanted = TRUE; - BOOL orig_dont_deliver = dont_deliver; - void *reset_point2 = store_get(0); + BOOL orig_dont_deliver = f.dont_deliver; + rmark reset_point2 = store_mark(); /* Restore the original setting of dont_deliver after reading the header, so that a setting for a particular message doesn't force it for any that follow. If the message is chosen for delivery, the header is read again in the deliver_message() function, in a subprocess. */ - if (spool_read_header(f->text, FALSE, TRUE) != spool_read_OK) continue; - dont_deliver = orig_dont_deliver; + if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around; + f.dont_deliver = orig_dont_deliver; /* Now decide if we want to deliver this message. As we have read the header file, we might as well do the freeze test now, and save forking another process. */ - if (deliver_freeze && !deliver_force_thaw) + if (f.deliver_freeze && !q->deliver_force_thaw) { log_write(L_skip_delivery, LOG_MAIN, "Message is frozen"); wanted = FALSE; @@ -513,46 +570,44 @@ for (i = (queue_run_in_order? -1 : 0); /* Check first_delivery in the case when there are no message logs. */ - else if (queue_run_first_delivery && !deliver_firsttime) + else if (q->queue_run_first_delivery && !f.deliver_firsttime) { - DEBUG(D_queue_run) debug_printf("%s: not first delivery\n", f->text); + DEBUG(D_queue_run) debug_printf("%s: not first delivery\n", fq->text); wanted = FALSE; } - /* Check for a matching address if deliver_selectstring[_sender} is set. + /* Check for a matching address if deliver_selectstring[_sender] is set. If so, we do a fully delivery - don't want to omit other addresses since their routing might trigger re-writing etc. */ /* Sender matching */ - else if (deliver_selectstring_sender != NULL && - !(deliver_selectstring_sender_regex? - (pcre_exec(selectstring_regex_sender, NULL, CS sender_address, - Ustrlen(sender_address), 0, PCRE_EOPT, NULL, 0) >= 0) - : - (strstric(sender_address, deliver_selectstring_sender, FALSE) - != NULL))) + else if ( deliver_selectstring_sender + && !(f.deliver_selectstring_sender_regex + ? regex_match(selectstring_regex_sender, sender_address, -1, NULL) + : (strstric_c(sender_address, deliver_selectstring_sender, FALSE) + != NULL) + ) ) { DEBUG(D_queue_run) debug_printf("%s: sender address did not match %s\n", - f->text, deliver_selectstring_sender); + fq->text, deliver_selectstring_sender); wanted = FALSE; } /* Recipient matching */ - else if (deliver_selectstring != NULL) + else if (deliver_selectstring) { int i; for (i = 0; i < recipients_count; i++) { - uschar *address = recipients_list[i].address; - if ((deliver_selectstring_regex? - (pcre_exec(selectstring_regex, NULL, CS address, - Ustrlen(address), 0, PCRE_EOPT, NULL, 0) >= 0) - : - (strstric(address, deliver_selectstring, FALSE) != NULL)) - && - tree_search(tree_nonrecipients, address) == NULL) + const uschar * address = recipients_list[i].address; + if ( (f.deliver_selectstring_regex + ? regex_match(selectstring_regex, address, -1, NULL) + : (strstric_c(address, deliver_selectstring, FALSE) != NULL) + ) + && tree_search(tree_nonrecipients, address) == NULL + ) break; } @@ -560,15 +615,16 @@ for (i = (queue_run_in_order? -1 : 0); { DEBUG(D_queue_run) debug_printf("%s: no recipient address matched %s\n", - f->text, deliver_selectstring); + fq->text, deliver_selectstring); wanted = FALSE; } } /* Recover store used when reading the header */ + spool_clear_header_globals(); store_reset(reset_point2); - if (!wanted) continue; /* With next message */ + if (!wanted) goto go_around; /* With next message */ } /* OK, got a message we want to deliver. Create a pipe which will @@ -581,10 +637,8 @@ for (i = (queue_run_in_order? -1 : 0); pretty cheap. */ if (pipe(pfd) < 0) - { log_write(0, LOG_MAIN|LOG_PANIC_DIE, "failed to create pipe in queue " "runner process %d: %s", queue_run_pid, strerror(errno)); - } queue_run_pipe = pfd[pipe_write]; /* To ensure it gets passed on. */ /* Make sure it isn't stdin. This seems unlikely, but just to be on the @@ -609,15 +663,30 @@ for (i = (queue_run_in_order? -1 : 0); /* Now deliver the message; get the id by cutting the -H off the file name. The return of the process is zero if a delivery was attempted. */ - set_process_info("running queue: %s", f->text); - f->text[SPOOL_NAME_LENGTH-2] = 0; - if ((pid = fork()) == 0) + fq->text[Ustrlen(fq->text)-2] = 0; + set_process_info("running queue: %s", fq->text); +#ifdef MEASURE_TIMING + report_time_since(×tamp_startup, US"queue msg selected"); +#endif + +#ifndef DISABLE_TLS + if (!queue_tls_init) + { + queue_tls_init = TRUE; + /* Preload TLS library info for smtp transports. Once, and only if we + have a delivery to do. */ + tls_client_creds_reload(FALSE); + } +#endif + +single_item_retry: + if ((pid = exim_fork(US"qrun-delivery")) == 0) { int rc; - if (running_in_test_harness) millisleep(100); (void)close(pfd[pipe_read]); - rc = deliver_message(f->text, force_delivery, FALSE); - _exit(rc == DELIVER_NOT_ATTEMPTED); + rc = deliver_message(fq->text, force_delivery, FALSE); + exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED + ? EXIT_FAILURE : EXIT_SUCCESS); } if (pid < 0) log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from " @@ -627,21 +696,31 @@ for (i = (queue_run_in_order? -1 : 0); then wait for the first level process to terminate. */ (void)close(pfd[pipe_write]); - set_process_info("running queue: waiting for %s (%d)", f->text, pid); + set_process_info("running queue: waiting for %s (%d)", fq->text, pid); while (wait(&status) != pid); /* A zero return means a delivery was attempted; turn off the force flag for any subsequent calls unless queue_force is set. */ - if ((status & 0xffff) == 0) force_delivery = queue_run_force; + if (!(status & 0xffff)) force_delivery = q->queue_run_force; /* If the process crashed, tell somebody */ - else if ((status & 0x00ff) != 0) - { + else if (status & 0x00ff) log_write(0, LOG_MAIN|LOG_PANIC, "queue run: process %d crashed with signal %d while delivering %s", - (int)pid, status & 0x00ff, f->text); + (int)pid, status & 0x00ff, fq->text); + + /* If single-item delivery was untried (likely due to locking) + retry once after a delay */ + + if (status & 0xff00 && single_id) + { + single_id = FALSE; + DEBUG(D_queue_run) debug_printf("qrun single-item pause before retry\n"); + millisleep(500); + DEBUG(D_queue_run) debug_printf("qrun single-item retry after pause\n"); + goto single_item_retry; } /* Before continuing, wait till the pipe gets closed at the far end. This @@ -650,33 +729,46 @@ for (i = (queue_run_in_order? -1 : 0); the mere fact that read() unblocks is enough. */ set_process_info("running queue: waiting for children of %d", pid); - if (read(pfd[pipe_read], buffer, sizeof(buffer)) > 0) - log_write(0, LOG_MAIN|LOG_PANIC, "queue run: unexpected data on pipe"); + if ((status = read(pfd[pipe_read], buffer, sizeof(buffer))) != 0) + log_write(0, LOG_MAIN|LOG_PANIC, status > 0 ? + "queue run: unexpected data on pipe" : "queue run: error on pipe: %s", + strerror(errno)); (void)close(pfd[pipe_read]); set_process_info("running queue"); + /* If initial of a 2-phase run, we are a child - so just exit */ + if (q->queue_2stage && !queue_run_in_order) + exim_exit(EXIT_SUCCESS); + /* If we are in the test harness, and this is not the first of a 2-stage queue run, update fudged queue times. */ - if (running_in_test_harness && !queue_2stage) + if (f.running_in_test_harness && !q->queue_2stage) { - uschar *fqtnext = Ustrchr(fudged_queue_times, '/'); - if (fqtnext != NULL) fudged_queue_times = fqtnext + 1; + uschar * fqtnext = Ustrchr(fudged_queue_times, '/'); + if (fqtnext) fudged_queue_times = fqtnext + 1; } + + + continue; + + go_around: + /* If initial of a 2-phase run, we are a child - so just exit */ + if (q->queue_2stage && !queue_run_in_order) + exim_exit(EXIT_SUCCESS); } /* End loop for list of messages */ + tree_nonrecipients = NULL; store_reset(reset_point1); /* Scavenge list of messages */ /* If this was the first time through for random order processing, and sub-directories have been found, randomize their order if necessary. */ if (i == 0 && subcount > 1 && !queue_run_in_order) - { - int j; - for (j = 1; j <= subcount; j++) + for (int j = 1; j <= subcount; j++) { - int r = random_number(100); - if (r >= 50) + int r; + if ((r = random_number(100)) >= 50) { int k = (r % subcount) + 1; int x = subdirs[j]; @@ -684,21 +776,56 @@ for (i = (queue_run_in_order? -1 : 0); subdirs[k] = x; } } - } } /* End loop for multiple directories */ /* If queue_2stage is true, we do it all again, with the 2stage flag turned off. */ -if (queue_2stage) +if (q->queue_2stage) { - queue_2stage = FALSE; - queue_run(start_id, stop_id, TRUE); + + /* wait for last children */ + for (int i = 0; i < nelem(qpid); i++) + if (qpid[i]) + { + DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]); + waitpid(qpid[i], NULL, 0); + } + else break; + +#ifdef MEASURE_TIMING + report_time_since(×tamp_startup, US"queue_run 1st phase done"); +#endif + q->queue_2stage = f.queue_2stage = FALSE; + queue_run(q, start_id, stop_id, TRUE); } /* At top level, log the end of the run. */ -if (!recurse) log_write(L_queue_run, LOG_MAIN, "End queue run: %s", log_detail); +if (!recurse) + if (q->name) + log_write(L_queue_run, LOG_MAIN, "End '%s' queue run: %s", + q->name, log_detail); + else + log_write(L_queue_run, LOG_MAIN, "End queue run: %s", log_detail); +} + + + +void +single_queue_run(qrunner * q, const uschar * start_id, const uschar * stop_id) +{ +DEBUG(D_queue_run) debug_printf("Single queue run%s%s%s%s\n", + start_id ? US" starting at " : US"", + start_id ? start_id: US"", + stop_id ? US" stopping at " : US"", + stop_id ? stop_id : US""); + +if (*queue_name) + set_process_info("running the '%s' queue (single queue run)", queue_name); +else + set_process_info("running the queue (single queue run)"); +queue_run(q, start_id, stop_id, FALSE); } @@ -711,26 +838,38 @@ if (!recurse) log_write(L_queue_run, LOG_MAIN, "End queue run: %s", log_detail); /* Called as a result of -bpc Arguments: none -Returns: nothing +Returns: count */ -void +unsigned queue_count(void) { int subcount; -int count = 0; -queue_filename *f = NULL; +unsigned count = 0; uschar subdirs[64]; -f = queue_get_spool_list( - -1, /* entire queue */ - subdirs, /* for holding sub list */ - &subcount, /* for subcount */ - FALSE); /* not random */ -for (; f != NULL; f = f->next) count++; -fprintf(stdout, "%d\n", count); + +(void) queue_get_spool_list(-1, /* entire queue */ + subdirs, /* for holding sub list */ + &subcount, /* for subcount */ + FALSE, /* not random */ + &count); /* just get the count */ +return count; } +#define QUEUE_SIZE_AGE 60 /* update rate for queue_size */ + +unsigned +queue_count_cached(void) +{ +time_t now; +if ((now = time(NULL)) >= queue_size_next) + { + queue_size = queue_count(); + queue_size_next = now + (f.running_in_test_harness ? 3 : QUEUE_SIZE_AGE); + } +return queue_size; +} /************************************************ * List extra deliveries * @@ -746,11 +885,12 @@ Argument: points to the tree node Returns: nothing */ -static void queue_list_extras(tree_node *p) +static void +queue_list_extras(tree_node *p) { -if (p->left != NULL) queue_list_extras(p->left); +if (p->left) queue_list_extras(p->left); if (!p->data.val) printf(" +D %s\n", p->name); -if (p->right != NULL) queue_list_extras(p->right); +if (p->right) queue_list_extras(p->right); } @@ -779,13 +919,12 @@ Returns: nothing */ void -queue_list(int option, uschar **list, int count) +queue_list(int option, const uschar ** list, int count) { -int i; int subcount; int now = (int)time(NULL); -void *reset_point; -queue_filename *f = NULL; +rmark reset_point; +queue_filename * qf = NULL; uschar subdirs[64]; /* If given a list of messages, build a chain containing their ids. */ @@ -793,14 +932,14 @@ uschar subdirs[64]; if (count > 0) { queue_filename *last = NULL; - for (i = 0; i < count; i++) + for (int i = 0; i < count; i++) { - queue_filename *next = - store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2); + queue_filename * next = + store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2, list[i]); sprintf(CS next->text, "%s-H", list[i]); next->dir_uschar = '*'; next->next = NULL; - if (i == 0) f = next; else last->next = next; + if (i == 0) qf = next; else last->next = next; last = next; } } @@ -808,52 +947,59 @@ if (count > 0) /* Otherwise get a list of the entire queue, in order if necessary. */ else - f = queue_get_spool_list( - -1, /* entire queue */ - subdirs, /* for holding sub list */ - &subcount, /* for subcount */ - option >= 8); /* randomize if required */ + qf = queue_get_spool_list( + -1, /* entire queue */ + subdirs, /* for holding sub list */ + &subcount, /* for subcount */ + option >= QL_UNSORTED, /* randomize if required */ + NULL); /* don't just count */ -if (option >= 8) option -= 8; +option &= ~QL_UNSORTED; /* Now scan the chain and print information, resetting store used each time. */ -reset_point = store_get(0); +if (option == QL_MSGID_ONLY) /* Print only the message IDs from the chain */ + for (; qf; qf = qf->next) + fprintf(stdout, "%.*s\n", + is_old_message_id(qf->text) ? MESSAGE_ID_LENGTH_OLD : MESSAGE_ID_LENGTH, + qf->text); -for (; f != NULL; f = f->next) +else for (; + qf && (reset_point = store_mark()); + spool_clear_header_globals(), store_reset(reset_point), qf = qf->next + ) { int rc, save_errno; int size = 0; BOOL env_read; - store_reset(reset_point); message_size = 0; - message_subdir[0] = f->dir_uschar; - rc = spool_read_header(f->text, FALSE, count <= 0); - if (rc == spool_read_notopen && errno == ENOENT && count <= 0) continue; + message_subdir[0] = qf->dir_uschar; + rc = spool_read_header(qf->text, FALSE, count <= 0); + if (rc == spool_read_notopen && errno == ENOENT && count <= 0) + continue; save_errno = errno; env_read = (rc == spool_read_OK || rc == spool_read_hdrerror); if (env_read) { - int ptr; + int i, ptr; FILE *jread; struct stat statbuf; + uschar * fname = spool_fname(US"input", message_subdir, qf->text, US""); - sprintf(CS big_buffer, "%s/input/%s/%s", spool_directory, message_subdir, - f->text); - ptr = Ustrlen(big_buffer)-1; - big_buffer[ptr] = 'D'; + ptr = Ustrlen(fname)-1; + fname[ptr] = 'D'; /* Add the data size to the header size; don't count the file name at the start of the data file, but add one for the notional blank line that precedes the data. */ - if (Ustat(big_buffer, &statbuf) == 0) - size = message_size + statbuf.st_size - SPOOL_DATA_START_OFFSET + 1; - i = (now - received_time)/60; /* minutes on queue */ + if (Ustat(fname, &statbuf) == 0) + size = message_size + statbuf.st_size - spool_data_start_offset(qf->text) + 1; + i = (now - received_time.tv_sec)/60; /* minutes on queue */ if (i > 90) { i = (i + 30)/60; @@ -863,9 +1009,8 @@ for (; f != NULL; f = f->next) /* Collect delivered addresses from any J file */ - big_buffer[ptr] = 'J'; - jread = Ufopen(big_buffer, "rb"); - if (jread != NULL) + fname[ptr] = 'J'; + if ((jread = Ufopen(fname, "rb"))) { while (Ufgets(big_buffer, big_buffer_size, jread) != NULL) { @@ -877,13 +1022,15 @@ for (; f != NULL; f = f->next) } } - fprintf(stdout, "%s ", string_format_size(size, big_buffer)); - for (i = 0; i < 16; i++) fputc(f->text[i], stdout); + fprintf(stdout, "%s %.*s", + string_format_size(size, big_buffer), + is_old_message_id(qf->text) ? MESSAGE_ID_LENGTH_OLD : MESSAGE_ID_LENGTH, + qf->text); - if (env_read && sender_address != NULL) + if (env_read && sender_address) { printf(" <%s>", sender_address); - if (sender_set_untrusted) printf(" (%s)", originator_login); + if (f.sender_set_untrusted) printf(" (%s)", originator_login); } if (rc != spool_read_OK) @@ -892,9 +1039,9 @@ for (; f != NULL; f = f->next) if (save_errno == ERRNO_SPOOLFORMAT) { struct stat statbuf; - sprintf(CS big_buffer, "%s/input/%s/%s", spool_directory, message_subdir, - f->text); - if (Ustat(big_buffer, &statbuf) == 0) + uschar * fname = spool_fname(US"input", message_subdir, qf->text, US""); + + if (Ustat(fname, &statbuf) == 0) printf("*** spool format error: size=" OFF_T_FMT " ***", statbuf.st_size); else printf("*** spool format error ***"); @@ -907,22 +1054,22 @@ for (; f != NULL; f = f->next) } } - if (deliver_freeze) printf(" *** frozen ***"); + if (f.deliver_freeze) printf(" *** frozen ***"); printf("\n"); - if (recipients_list != NULL) + if (recipients_list) { - for (i = 0; i < recipients_count; i++) + for (int i = 0; i < recipients_count; i++) { - tree_node *delivered = + tree_node * delivered = tree_search(tree_nonrecipients, recipients_list[i].address); - if (!delivered || option != 1) - printf(" %s %s\n", (delivered != NULL)? "D":" ", - recipients_list[i].address); - if (delivered != NULL) delivered->data.val = TRUE; + if (!delivered || option != QL_UNDELIVERED_ONLY) + printf(" %s %s\n", + delivered ? "D" : " ", recipients_list[i].address); + if (delivered) delivered->data.val = TRUE; } - if (option == 2 && tree_nonrecipients != NULL) + if (option == QL_PLUS_GENERATED && tree_nonrecipients) queue_list_extras(tree_nonrecipients); printf("\n"); } @@ -950,16 +1097,16 @@ Returns: FALSE if there was any problem */ BOOL -queue_action(uschar *id, int action, uschar **argv, int argc, int recipients_arg) +queue_action(const uschar * id, int action, const uschar ** argv, int argc, + int recipients_arg) { -int i, j; BOOL yield = TRUE; BOOL removed = FALSE; struct passwd *pw; uschar *doing = NULL; uschar *username; uschar *errmsg; -uschar spoolname[256]; +uschar spoolname[32]; /* Set the global message_id variable, used when re-writing spool files. This also causes message ids to be added to log messages. */ @@ -971,10 +1118,10 @@ done. Only admin users may read the spool files. */ if (action >= MSG_SHOW_BODY) { - int fd, i, rc; + int fd, rc; uschar *subdirectory, *suffix; - if (!admin_user) + if (!f.admin_user) { printf("Permission denied\n"); return FALSE; @@ -1002,14 +1149,15 @@ if (action >= MSG_SHOW_BODY) suffix = US""; } - for (i = 0; i < 2; i++) + for (int i = 0; i < 2; i++) { - message_subdir[0] = (split_spool_directory == (i == 0))? id[5] : 0; - sprintf(CS spoolname, "%s/%s/%s/%s%s", spool_directory, subdirectory, - message_subdir, id, suffix); - fd = Uopen(spoolname, O_RDONLY, 0); - if (fd >= 0) break; - if (i == 0) continue; + set_subdir_str(message_subdir, id, i); + if ((fd = Uopen(spool_fname(subdirectory, message_subdir, id, suffix), + O_RDONLY, 0)) >= 0) + break; + if (i == 0) + continue; + printf("Failed to open %s file for %s%s: %s\n", subdirectory, id, suffix, strerror(errno)); if (action == MSG_SHOW_LOG && !message_logs) @@ -1019,7 +1167,7 @@ if (action >= MSG_SHOW_BODY) } while((rc = read(fd, big_buffer, big_buffer_size)) > 0) - rc = write(fileno(stdout), big_buffer, rc); + rc = write(fileno(stdout), big_buffer, rc); /*XXX why not fwrite() ? */ (void)close(fd); return TRUE; @@ -1030,13 +1178,12 @@ other process is working on this message. If the file does not exist, continue only if the action is remove and the user is an admin user, to allow for tidying up broken states. */ -if (!spool_open_datafile(id)) - { +if ((deliver_datafile = spool_open_datafile(id)) < 0) if (errno == ENOENT) { yield = FALSE; printf("Spool data file for %s does not exist\n", id); - if (action != MSG_REMOVE || !admin_user) return FALSE; + if (action != MSG_REMOVE || !f.admin_user) return FALSE; printf("Continuing, to ensure all files removed\n"); } else @@ -1046,7 +1193,6 @@ if (!spool_open_datafile(id)) strerror(errno)); return FALSE; } - } /* Read the spool header file for the message. Again, continue after an error only in the case of deleting by an administrator. Setting the third @@ -1062,7 +1208,7 @@ if (spool_read_header(spoolname, TRUE, FALSE) != spool_read_OK) printf("Spool read error for %s: %s\n", spoolname, strerror(errno)); else printf("Spool format error for %s\n", spoolname); - if (action != MSG_REMOVE || !admin_user) + if (action != MSG_REMOVE || !f.admin_user) { (void)close(deliver_datafile); deliver_datafile = -1; @@ -1076,7 +1222,7 @@ message. Only admin users may freeze/thaw, add/cancel recipients, or otherwise mess about, but the original sender is permitted to remove a message. That's why we leave this check until after the headers are read. */ -if (!admin_user && (action != MSG_REMOVE || real_uid != originator_uid)) +if (!f.admin_user && (action != MSG_REMOVE || real_uid != originator_uid)) { printf("Permission denied\n"); (void)close(deliver_datafile); @@ -1097,22 +1243,26 @@ if (action != MSG_SHOW_COPY) printf("Message %s ", id); switch(action) { case MSG_SHOW_COPY: - deliver_in_buffer = store_malloc(DELIVER_IN_BUFFER_SIZE); - deliver_out_buffer = store_malloc(DELIVER_OUT_BUFFER_SIZE); - transport_write_message(NULL, 1, 0, 0, NULL, NULL, NULL, NULL, NULL, 0); - break; + { + transport_ctx tctx = {{0}}; + deliver_in_buffer = store_malloc(DELIVER_IN_BUFFER_SIZE); + deliver_out_buffer = store_malloc(DELIVER_OUT_BUFFER_SIZE); + tctx.u.fd = 1; + (void) transport_write_message(&tctx, 0); + break; + } case MSG_FREEZE: - if (deliver_freeze) + if (f.deliver_freeze) { yield = FALSE; printf("is already frozen\n"); } else { - deliver_freeze = TRUE; - deliver_manual_thaw = FALSE; + f.deliver_freeze = TRUE; + f.deliver_manual_thaw = FALSE; deliver_frozen_at = time(NULL); if (spool_write_header(id, SW_MODIFYING, &errmsg) >= 0) { @@ -1129,15 +1279,15 @@ switch(action) case MSG_THAW: - if (!deliver_freeze) + if (!f.deliver_freeze) { yield = FALSE; printf("is not frozen\n"); } else { - deliver_freeze = FALSE; - deliver_manual_thaw = TRUE; + f.deliver_freeze = FALSE; + f.deliver_manual_thaw = TRUE; if (spool_write_header(id, SW_MODIFYING, &errmsg) >= 0) { printf("is no longer frozen\n"); @@ -1159,60 +1309,119 @@ switch(action) operation, just run everything twice. */ case MSG_REMOVE: - message_subdir[0] = id[5]; - for (j = 0; j < 2; message_subdir[0] = 0, j++) { - sprintf(CS spoolname, "%s/msglog/%s/%s", spool_directory, message_subdir, id); - if (Uunlink(spoolname) < 0) + uschar suffix[3] = { [0]='-', [2]=0 }; + + message_subdir[0] = id[MESSAGE_ID_TIME_LEN - 1]; + + for (int j = 0; j < 2; message_subdir[0] = 0, j++) { - if (errno != ENOENT) - { - yield = FALSE; - printf("Error while removing %s: %s\n", spoolname, - strerror(errno)); - } + uschar * fname = spool_fname(US"msglog", message_subdir, id, US""); + + DEBUG(D_any) debug_printf(" removing %s", fname); + if (Uunlink(fname) < 0) + { + if (errno != ENOENT) + { + yield = FALSE; + printf("Error while removing %s: %s\n", fname, strerror(errno)); + } + else DEBUG(D_any) debug_printf(" (no file)\n"); + } + else + { + removed = TRUE; + DEBUG(D_any) debug_printf(" (ok)\n"); + } + + for (int i = 0; i < 3; i++) + { + uschar * fname; + + suffix[1] = (US"DHJ")[i]; + fname = spool_fname(US"input", message_subdir, id, suffix); + + DEBUG(D_any) debug_printf(" removing %s", fname); + if (Uunlink(fname) < 0) + { + if (errno != ENOENT) + { + yield = FALSE; + printf("Error while removing %s: %s\n", fname, strerror(errno)); + } + else DEBUG(D_any) debug_printf(" (no file)\n"); + } + else + { + removed = TRUE; + DEBUG(D_any) debug_printf(" (done)\n"); + } + } } - else removed = TRUE; - for (i = 0; i < 3; i++) + /* In the common case, the datafile is open (and locked), so give the + obvious message. Otherwise be more specific. */ + + if (deliver_datafile >= 0) printf("has been removed\n"); + else printf("has been removed or did not exist\n"); + if (removed) { - sprintf(CS spoolname, "%s/input/%s/%s-%c", spool_directory, message_subdir, - id, "DHJ"[i]); - if (Uunlink(spoolname) < 0) - { - if (errno != ENOENT) - { - yield = FALSE; - printf("Error while removing %s: %s\n", spoolname, - strerror(errno)); - } - } - else removed = TRUE; +#ifndef DISABLE_EVENT + if (event_action) for (int i = 0; i < recipients_count; i++) + { + tree_node *delivered = + tree_search(tree_nonrecipients, recipients_list[i].address); + if (!delivered) + { + const uschar * save_local = deliver_localpart; + const uschar * save_domain = deliver_domain; + const uschar * addr = recipients_list[i].address; + uschar * errmsg = NULL; + int start, end, dom; + + if (!parse_extract_address(addr, &errmsg, &start, &end, &dom, TRUE)) + log_write(0, LOG_MAIN|LOG_PANIC, + "failed to parse address '%.100s'\n: %s", addr, errmsg); + else + { + deliver_localpart = + string_copyn(addr+start, dom ? (dom-1) - start : end - start); + deliver_domain = dom + ? CUS string_copyn(addr+dom, end - dom) : CUS""; + + (void) event_raise(event_action, US"msg:fail:internal", + string_sprintf("message removed by %s", username), NULL); + + deliver_localpart = save_local; + deliver_domain = save_domain; + } + } + } + (void) event_raise(event_action, US"msg:complete", NULL, NULL); +#endif + log_write(0, LOG_MAIN, "removed by %s", username); + log_write(0, LOG_MAIN, "Completed"); } + break; } - /* In the common case, the datafile is open (and locked), so give the - obvious message. Otherwise be more specific. */ - if (deliver_datafile >= 0) printf("has been removed\n"); - else printf("has been removed or did not exist\n"); - if (removed) - { - log_write(0, LOG_MAIN, "removed by %s", username); - log_write(0, LOG_MAIN, "Completed"); - } - break; + case MSG_SETQUEUE: + /* The global "queue_name_dest" is used as destination, "queue_name" + as source */ + + spool_move_message(id, message_subdir, US"", US""); + break; case MSG_MARK_ALL_DELIVERED: - for (i = 0; i < recipients_count; i++) - { + for (int i = 0; i < recipients_count; i++) tree_add_nonrecipient(recipients_list[i].address); - } + if (spool_write_header(id, SW_MODIFYING, &errmsg) >= 0) { printf("has been modified\n"); - for (i = 0; i < recipients_count; i++) + for (int i = 0; i < recipients_count; i++) log_write(0, LOG_MAIN, "address <%s> marked delivered by %s", recipients_list[i].address, username); } @@ -1258,13 +1467,13 @@ switch(action) parse_extract_address(argv[recipients_arg], &errmess, &start, &end, &domain, (action == MSG_EDIT_SENDER)); - if (recipient == NULL) + if (!recipient) { yield = FALSE; printf("- error while %s:\n bad address %s: %s\n", doing, argv[recipients_arg], errmess); } - else if (recipient[0] != 0 && domain == 0) + else if (*recipient && domain == 0) { yield = FALSE; printf("- error while %s:\n bad address %s: " @@ -1274,12 +1483,16 @@ switch(action) { if (action == MSG_ADD_RECIPIENT) { +#ifdef SUPPORT_I18N + if (string_is_utf8(recipient)) allow_utf8_domains = message_smtputf8 = TRUE; +#endif receive_add_recipient(recipient, -1); log_write(0, LOG_MAIN, "recipient <%s> added by %s", recipient, username); } else if (action == MSG_MARK_DELIVERED) { + int i; for (i = 0; i < recipients_count; i++) if (Ustrcmp(recipients_list[i].address, recipient) == 0) break; if (i >= recipients_count) @@ -1297,6 +1510,9 @@ switch(action) } else /* MSG_EDIT_SENDER */ { +#ifdef SUPPORT_I18N + if (string_is_utf8(recipient)) allow_utf8_domains = message_smtputf8 = TRUE; +#endif sender_address = recipient; log_write(0, LOG_MAIN, "sender address changed to <%s> by %s", recipient, username); @@ -1305,7 +1521,6 @@ switch(action) } if (yield) - { if (spool_write_header(id, SW_MODIFYING, &errmsg) >= 0) printf("has been modified\n"); else @@ -1313,7 +1528,6 @@ switch(action) yield = FALSE; printf("- while %s: %s\n", doing, errmsg); } - } break; } @@ -1321,8 +1535,11 @@ switch(action) /* Closing the datafile releases the lock and permits other processes to operate on the message (if it still exists). */ -(void)close(deliver_datafile); -deliver_datafile = -1; +if (deliver_datafile >= 0) + { + (void)close(deliver_datafile); + deliver_datafile = -1; + } return yield; } @@ -1342,35 +1559,63 @@ Returns: nothing void queue_check_only(void) { -BOOL *set; int sep = 0; struct stat statbuf; -uschar *s, *ss, *name; -uschar buffer[1024]; +const uschar * s = queue_only_file; +uschar * ss; -if (queue_only_file == NULL) return; +if (s) + while ((ss = string_nextinlist(&s, &sep, NULL, 0))) + if (Ustrncmp(ss, "smtp", 4) == 0) + { + ss += 4; + if (Ustat(ss, &statbuf) == 0) + { + f.queue_smtp = TRUE; + DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss); + } + } + else + if (Ustat(ss, &statbuf) == 0) + { + queue_only = TRUE; + DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss); + } +} + + + +/******************************************************************************/ +/******************************************************************************/ + +#ifndef DISABLE_QUEUE_RAMP +void +queue_notify_daemon(const uschar * msgid) +{ +int bsize = 1 + MESSAGE_ID_LENGTH + 1 + Ustrlen(queue_name) + 1; +uschar * buf = store_get(bsize, GET_UNTAINTED); +int fd; + +DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid); + +buf[0] = NOTIFY_MSG_QRUN; +memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1); +Ustrcpy(buf+1+MESSAGE_ID_LENGTH+1, queue_name); -s = queue_only_file; -while ((ss = string_nextinlist(&s, &sep, buffer, sizeof(buffer))) != NULL) +if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0) { - if (Ustrncmp(ss, "smtp", 4) == 0) - { - name = US"queue_smtp"; - set = &queue_smtp; - ss += 4; - } - else - { - name = US"queue_only"; - set = &queue_only; - } + struct sockaddr_un sa_un = {.sun_family = AF_UNIX}; + ssize_t len = daemon_notifier_sockname(&sa_un); - if (Ustat(ss, &statbuf) == 0) - { - *set = TRUE; - DEBUG(D_receive) debug_printf("%s set because %s exists\n", name, ss); - } + if (sendto(fd, buf, bsize, 0, (struct sockaddr *)&sa_un, (socklen_t)len) < 0) + DEBUG(D_queue_run) + debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno)); + close(fd); } +else DEBUG(D_queue_run) debug_printf(" socket: %s\n", strerror(errno)); } +#endif + +#endif /*!COMPILE_UTILITY*/ /* End of queue.c */