X-Git-Url: https://git.exim.org/exim.git/blobdiff_plain/59a93276e38d2d8ae297a9581a5388a475c209c1..HEAD:/src/src/queue.c diff --git a/src/src/queue.c b/src/src/queue.c index f65c65262..6e72a33d5 100644 --- a/src/src/queue.c +++ b/src/src/queue.c @@ -2,8 +2,10 @@ * Exim - an Internet mail transport agent * *************************************************/ +/* Copyright (c) The Exim Maintainers 2020 - 2024 */ /* Copyright (c) University of Cambridge 1995 - 2018 */ /* See the file NOTICE for conditions of use and distribution. */ +/* SPDX-License-Identifier: GPL-2.0-or-later */ /* Functions that operate on the input queue. */ @@ -25,6 +27,9 @@ Michael Haardt. */ #define LOG2_MAXNODES 32 +#ifndef DISABLE_TLS +static BOOL queue_tls_init = FALSE; +#endif /************************************************* * Helper sort function for queue_get_spool_list * @@ -49,8 +54,13 @@ queue_filename **append = &first; while (a && b) { int d; - if ((d = Ustrncmp(a->text, b->text, 6)) == 0) - d = Ustrcmp(a->text + 14, b->text + 14); + if ((d = Ustrncmp(a->text, b->text, MESSAGE_ID_TIME_LEN)) == 0) + { + BOOL a_old = is_old_message_id(a->text), b_old = is_old_message_id(b->text); + /* Do not worry over the sub-second sorting wrt. old vs. new */ + d = Ustrcmp(a->text + (a_old ? 6+1+6+1 : MESSAGE_ID_TIME_LEN + 1 + MESSAGE_ID_PID_LEN + 1), + b->text + (b_old ? 6+1+6+1 : MESSAGE_ID_TIME_LEN + 1 + MESSAGE_ID_PID_LEN + 1)); + } if (d < 0) { *append = a; @@ -110,13 +120,14 @@ Arguments: subdirs vector to store list of subdirchars subcount pointer to int in which to store count of subdirs randomize TRUE if the order of the list is to be unpredictable + pcount If not NULL, fill in with count of files and do not return list Returns: pointer to a chain of queue name items */ static queue_filename * queue_get_spool_list(int subdiroffset, uschar *subdirs, int *subcount, - BOOL randomize) + BOOL randomize, unsigned * pcount) { int i; int flags = 0; @@ -124,8 +135,6 @@ int resetflags = -1; int subptr; queue_filename *yield = NULL; queue_filename *last = NULL; -struct dirent *ent; -DIR *dd; uschar buffer[256]; queue_filename *root[LOG2_MAXNODES]; @@ -134,7 +143,9 @@ according to the bits of the flags variable. Get a collection of bits from the current time. Use the bottom 16 and just keep re-using them if necessary. When not randomizing, initialize the sublists for the bottom-up merge sort. */ -if (randomize) +if (pcount) + *pcount = 0; +else if (randomize) resetflags = time(NULL) & 0xFFFF; else for (i = 0; i < LOG2_MAXNODES; i++) @@ -168,6 +179,7 @@ for (; i <= *subcount; i++) { int count = 0; int subdirchar = subdirs[i]; /* 0 for main directory */ + DIR *dd; if (subdirchar != 0) { @@ -176,14 +188,14 @@ for (; i <= *subcount; i++) } DEBUG(D_queue_run) debug_printf("looking in %s\n", buffer); - if (!(dd = opendir(CS buffer))) + if (!(dd = exim_opendir(buffer))) continue; /* Now scan the directory. */ - while ((ent = readdir(dd))) + for (struct dirent * ent; ent = readdir(dd); ) { - uschar *name = US ent->d_name; + uschar * name = US ent->d_name; int len = Ustrlen(name); /* Count entries */ @@ -202,62 +214,66 @@ for (; i <= *subcount; i++) /* Otherwise, if it is a header spool file, add it to the list */ - if (len == SPOOL_NAME_LENGTH && - Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0) - { - queue_filename *next = - store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name)); - Ustrcpy(next->text, name); - next->dir_uschar = subdirchar; - - /* Handle the creation of a randomized list. The first item becomes both - the top and bottom of the list. Subsequent items are inserted either at - the top or the bottom, randomly. This is, I argue, faster than doing a - sort by allocating a random number to each item, and it also saves having - to store the number with each item. */ - - if (randomize) - if (!yield) - { - next->next = NULL; - yield = last = next; - } - else - { - if (flags == 0) - flags = resetflags; - if ((flags & 1) == 0) - { - next->next = yield; - yield = next; - } - else - { - next->next = NULL; - last->next = next; - last = next; - } - flags = flags >> 1; - } + if ( (len == SPOOL_NAME_LENGTH || len == SPOOL_NAME_LENGTH_OLD) + && Ustrcmp(name + len - 2, "-H") == 0 + ) + if (pcount) + (*pcount)++; + else + { + queue_filename * next = + store_get(sizeof(queue_filename) + len, name); + Ustrcpy(next->text, name); + next->dir_uschar = subdirchar; + + /* Handle the creation of a randomized list. The first item becomes both + the top and bottom of the list. Subsequent items are inserted either at + the top or the bottom, randomly. This is, I argue, faster than doing a + sort by allocating a random number to each item, and it also saves having + to store the number with each item. */ + + if (randomize) + if (!yield) + { + next->next = NULL; + yield = last = next; + } + else + { + if (flags == 0) + flags = resetflags; + if ((flags & 1) == 0) + { + next->next = yield; + yield = next; + } + else + { + next->next = NULL; + last->next = next; + last = next; + } + flags = flags >> 1; + } - /* Otherwise do a bottom-up merge sort based on the name. */ + /* Otherwise do a bottom-up merge sort based on the name. */ - else - { - next->next = NULL; - for (int j = 0; j < LOG2_MAXNODES; j++) - if (root[j]) - { - next = merge_queue_lists(next, root[j]); - root[j] = j == LOG2_MAXNODES - 1 ? next : NULL; - } - else - { - root[j] = next; - break; - } - } - } + else + { + next->next = NULL; + for (int j = 0; j < LOG2_MAXNODES; j++) + if (root[j]) + { + next = merge_queue_lists(next, root[j]); + root[j] = j == LOG2_MAXNODES - 1 ? next : NULL; + } + else + { + root[j] = next; + break; + } + } + } } /* Finished with this directory */ @@ -294,7 +310,7 @@ for (; i <= *subcount; i++) /* When using a bottom-up merge sort, do the final merging of the sublists. Then pass back the final list of file items. */ -if (!randomize) +if (!pcount && !randomize) for (i = 0; i < LOG2_MAXNODES; ++i) yield = merge_queue_lists(yield, root[i]); @@ -315,8 +331,8 @@ previous lexically lesser one if the given stop message doesn't exist. Because a queue run can take some time, stat each file before forking, in case it has been delivered in the meantime by some other means. -The global variables queue_run_force and queue_run_local may be set to cause -forced deliveries or local-only deliveries, respectively. +The qrun descriptor variables queue_run_force and queue_run_local may be set to +cause forced deliveries or local-only deliveries, respectively. If deliver_selectstring[_sender] is not NULL, skip messages whose recipients do not contain the string. As this option is typically used when a machine comes @@ -329,6 +345,7 @@ is set so that routing is done for all messages. Thus in the second run those that are routed to the same host should go down the same SMTP connection. Arguments: + q queue-runner descriptor start_id message id to start at, or NULL for all stop_id message id to end at, or NULL for all recurse TRUE if recursing for 2-stage run @@ -337,15 +354,28 @@ Returns: nothing */ void -queue_run(uschar *start_id, uschar *stop_id, BOOL recurse) +queue_run(qrunner * q, const uschar * start_id, const uschar * stop_id, BOOL recurse) { -BOOL force_delivery = f.queue_run_force || deliver_selectstring != NULL || - deliver_selectstring_sender != NULL; -const pcre *selectstring_regex = NULL; -const pcre *selectstring_regex_sender = NULL; +BOOL force_delivery = q->queue_run_force + || deliver_selectstring || deliver_selectstring_sender; +const pcre2_code *selectstring_regex = NULL; +const pcre2_code *selectstring_regex_sender = NULL; uschar *log_detail = NULL; int subcount = 0; uschar subdirs[64]; +pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */ +BOOL single_id = FALSE; + +#ifdef MEASURE_TIMING +report_time_since(×tamp_startup, US"queue_run start"); +#endif + +/* Copy the legacy globals from the newer per-qrunner-desc */ + +queue_name = q->name ? q->name : US""; +f.queue_2stage = q->queue_2stage; +f.deliver_force_thaw = q->deliver_force_thaw; +f.queue_run_local = q->queue_run_local; /* Cancel any specific queue domains. Turn off the flag that causes SMTP deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag @@ -355,7 +385,7 @@ on TCP/IP channels have queue_run_pid set, but not queue_running. */ queue_domains = NULL; queue_smtp_domains = NULL; -f.queue_smtp = f.queue_2stage; +f.queue_smtp = q->queue_2stage; queue_run_pid = getpid(); f.queue_running = TRUE; @@ -367,11 +397,11 @@ if (!recurse) uschar extras[8]; uschar *p = extras; - if (f.queue_2stage) *p++ = 'q'; - if (f.queue_run_first_delivery) *p++ = 'i'; - if (f.queue_run_force) *p++ = 'f'; - if (f.deliver_force_thaw) *p++ = 'f'; - if (f.queue_run_local) *p++ = 'l'; + if (q->queue_2stage) *p++ = 'q'; + if (q->queue_run_first_delivery) *p++ = 'i'; + if (q->queue_run_force) *p++ = 'f'; + if (q->deliver_force_thaw) *p++ = 'f'; + if (q->queue_run_local) *p++ = 'l'; *p = 0; p = big_buffer; @@ -381,29 +411,38 @@ if (!recurse) p += sprintf(CS p, " -q%s", extras); if (deliver_selectstring) - p += sprintf(CS p, " -R%s %s", f.deliver_selectstring_regex? "r" : "", - deliver_selectstring); + { + snprintf(CS p, big_buffer_size - (p - big_buffer), " -R%s %s", + f.deliver_selectstring_regex ? "r" : "", deliver_selectstring); + p += Ustrlen(CCS p); + } if (deliver_selectstring_sender) - p += sprintf(CS p, " -S%s %s", f.deliver_selectstring_sender_regex? "r" : "", - deliver_selectstring_sender); + { + snprintf(CS p, big_buffer_size - (p - big_buffer), " -S%s %s", + f.deliver_selectstring_sender_regex ? "r" : "", deliver_selectstring_sender); + p += Ustrlen(CCS p); + } log_detail = string_copy(big_buffer); - if (*queue_name) + if (q->name) log_write(L_queue_run, LOG_MAIN, "Start '%s' queue run: %s", - queue_name, log_detail); + q->name, log_detail); else log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail); + + single_id = start_id && stop_id && !q->queue_2stage + && Ustrcmp(start_id, stop_id) == 0; } /* If deliver_selectstring is a regex, compile it. */ if (deliver_selectstring && f.deliver_selectstring_regex) - selectstring_regex = regex_must_compile(deliver_selectstring, TRUE, FALSE); + selectstring_regex = regex_must_compile(deliver_selectstring, MCS_CASELESS, FALSE); if (deliver_selectstring_sender && f.deliver_selectstring_sender_regex) selectstring_regex_sender = - regex_must_compile(deliver_selectstring_sender, TRUE, FALSE); + regex_must_compile(deliver_selectstring_sender, MCS_CASELESS, FALSE); /* If the spool is split into subdirectories, we want to process it one directory at a time, so as to spread out the directory scanning and the @@ -437,7 +476,7 @@ for (int i = queue_run_in_order ? -1 : 0; } for (queue_filename * fq = queue_get_spool_list(i, subdirs, &subcount, - !queue_run_in_order); + !queue_run_in_order, NULL); fq; fq = fq->next) { pid_t pid; @@ -449,7 +488,7 @@ for (int i = queue_run_in_order ? -1 : 0; /* Unless deliveries are forced, if deliver_queue_load_max is non-negative, check that the load average is low enough to permit deliveries. */ - if (!f.queue_run_force && deliver_queue_load_max >= 0) + if (!q->queue_run_force && deliver_queue_load_max >= 0) if ((load_average = os_getloadavg()) > deliver_queue_load_max) { log_write(L_queue_run, LOG_MAIN, "Abandon queue run: %s (load %.2f, max %.2f)", @@ -464,18 +503,39 @@ for (int i = queue_run_in_order ? -1 : 0; (double)load_average/1000.0, (double)deliver_queue_load_max/1000.0); + /* If initial of a 2-phase run, maintain a set of child procs + to get disk parallelism */ + + if (q->queue_2stage && !queue_run_in_order) + { + int i; + if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1]) + { + DEBUG(D_queue_run) debug_printf("q2stage waiting for child %d\n", (int)qpid[0]); + waitpid(qpid[0], NULL, 0); + DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]); + if (f.running_in_test_harness) i = 0; + else for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1]; + qpid[i] = 0; + } + else + for (i = 0; qpid[i]; ) i++; + if ((qpid[i] = exim_fork(US"qrun-phase-one"))) + continue; /* parent loops around */ + } + /* Skip this message unless it's within the ID limits */ if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0) - continue; + goto go_around; if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0) - continue; + goto go_around; /* Check that the message still exists */ message_subdir[0] = fq->dir_uschar; if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0) - continue; + goto go_around; /* There are some tests that require the reading of the header file. Ensure the store used is scavenged afterwards so that this process doesn't keep @@ -484,7 +544,7 @@ for (int i = queue_run_in_order ? -1 : 0; message when many are not going to be delivered. */ if (deliver_selectstring || deliver_selectstring_sender || - f.queue_run_first_delivery) + q->queue_run_first_delivery) { BOOL wanted = TRUE; BOOL orig_dont_deliver = f.dont_deliver; @@ -495,14 +555,14 @@ for (int i = queue_run_in_order ? -1 : 0; follow. If the message is chosen for delivery, the header is read again in the deliver_message() function, in a subprocess. */ - if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue; + if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around; f.dont_deliver = orig_dont_deliver; /* Now decide if we want to deliver this message. As we have read the header file, we might as well do the freeze test now, and save forking another process. */ - if (f.deliver_freeze && !f.deliver_force_thaw) + if (f.deliver_freeze && !q->deliver_force_thaw) { log_write(L_skip_delivery, LOG_MAIN, "Message is frozen"); wanted = FALSE; @@ -510,7 +570,7 @@ for (int i = queue_run_in_order ? -1 : 0; /* Check first_delivery in the case when there are no message logs. */ - else if (f.queue_run_first_delivery && !f.deliver_firsttime) + else if (q->queue_run_first_delivery && !f.deliver_firsttime) { DEBUG(D_queue_run) debug_printf("%s: not first delivery\n", fq->text); wanted = FALSE; @@ -524,10 +584,8 @@ for (int i = queue_run_in_order ? -1 : 0; else if ( deliver_selectstring_sender && !(f.deliver_selectstring_sender_regex - ? (pcre_exec(selectstring_regex_sender, NULL, - CS sender_address, Ustrlen(sender_address), 0, PCRE_EOPT, - NULL, 0) >= 0) - : (strstric(sender_address, deliver_selectstring_sender, FALSE) + ? regex_match(selectstring_regex_sender, sender_address, -1, NULL) + : (strstric_c(sender_address, deliver_selectstring_sender, FALSE) != NULL) ) ) { @@ -543,11 +601,10 @@ for (int i = queue_run_in_order ? -1 : 0; int i; for (i = 0; i < recipients_count; i++) { - uschar *address = recipients_list[i].address; + const uschar * address = recipients_list[i].address; if ( (f.deliver_selectstring_regex - ? (pcre_exec(selectstring_regex, NULL, CS address, - Ustrlen(address), 0, PCRE_EOPT, NULL, 0) >= 0) - : (strstric(address, deliver_selectstring, FALSE) != NULL) + ? regex_match(selectstring_regex, address, -1, NULL) + : (strstric_c(address, deliver_selectstring, FALSE) != NULL) ) && tree_search(tree_nonrecipients, address) == NULL ) @@ -567,7 +624,7 @@ for (int i = queue_run_in_order ? -1 : 0; spool_clear_header_globals(); store_reset(reset_point2); - if (!wanted) continue; /* With next message */ + if (!wanted) goto go_around; /* With next message */ } /* OK, got a message we want to deliver. Create a pipe which will @@ -606,15 +663,30 @@ for (int i = queue_run_in_order ? -1 : 0; /* Now deliver the message; get the id by cutting the -H off the file name. The return of the process is zero if a delivery was attempted. */ + fq->text[Ustrlen(fq->text)-2] = 0; set_process_info("running queue: %s", fq->text); - fq->text[SPOOL_NAME_LENGTH-2] = 0; - if ((pid = fork()) == 0) +#ifdef MEASURE_TIMING + report_time_since(×tamp_startup, US"queue msg selected"); +#endif + +#ifndef DISABLE_TLS + if (!queue_tls_init) + { + queue_tls_init = TRUE; + /* Preload TLS library info for smtp transports. Once, and only if we + have a delivery to do. */ + tls_client_creds_reload(FALSE); + } +#endif + +single_item_retry: + if ((pid = exim_fork(US"qrun-delivery")) == 0) { int rc; - if (f.running_in_test_harness) millisleep(100); (void)close(pfd[pipe_read]); rc = deliver_message(fq->text, force_delivery, FALSE); - exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED); + exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED + ? EXIT_FAILURE : EXIT_SUCCESS); } if (pid < 0) log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from " @@ -630,15 +702,27 @@ for (int i = queue_run_in_order ? -1 : 0; /* A zero return means a delivery was attempted; turn off the force flag for any subsequent calls unless queue_force is set. */ - if ((status & 0xffff) == 0) force_delivery = f.queue_run_force; + if (!(status & 0xffff)) force_delivery = q->queue_run_force; /* If the process crashed, tell somebody */ - else if ((status & 0x00ff) != 0) + else if (status & 0x00ff) log_write(0, LOG_MAIN|LOG_PANIC, "queue run: process %d crashed with signal %d while delivering %s", (int)pid, status & 0x00ff, fq->text); + /* If single-item delivery was untried (likely due to locking) + retry once after a delay */ + + if (status & 0xff00 && single_id) + { + single_id = FALSE; + DEBUG(D_queue_run) debug_printf("qrun single-item pause before retry\n"); + millisleep(500); + DEBUG(D_queue_run) debug_printf("qrun single-item retry after pause\n"); + goto single_item_retry; + } + /* Before continuing, wait till the pipe gets closed at the far end. This tells us that any children created by the delivery to re-use any SMTP channels have all finished. Since no process actually writes to the pipe, @@ -646,19 +730,32 @@ for (int i = queue_run_in_order ? -1 : 0; set_process_info("running queue: waiting for children of %d", pid); if ((status = read(pfd[pipe_read], buffer, sizeof(buffer))) != 0) - log_write(0, LOG_MAIN|LOG_PANIC, "queue run: %s on pipe", - status > 0 ? "unexpected data" : "error"); + log_write(0, LOG_MAIN|LOG_PANIC, status > 0 ? + "queue run: unexpected data on pipe" : "queue run: error on pipe: %s", + strerror(errno)); (void)close(pfd[pipe_read]); set_process_info("running queue"); + /* If initial of a 2-phase run, we are a child - so just exit */ + if (q->queue_2stage && !queue_run_in_order) + exim_exit(EXIT_SUCCESS); + /* If we are in the test harness, and this is not the first of a 2-stage queue run, update fudged queue times. */ - if (f.running_in_test_harness && !f.queue_2stage) + if (f.running_in_test_harness && !q->queue_2stage) { - uschar *fqtnext = Ustrchr(fudged_queue_times, '/'); - if (fqtnext != NULL) fudged_queue_times = fqtnext + 1; + uschar * fqtnext = Ustrchr(fudged_queue_times, '/'); + if (fqtnext) fudged_queue_times = fqtnext + 1; } + + + continue; + + go_around: + /* If initial of a 2-phase run, we are a child - so just exit */ + if (q->queue_2stage && !queue_run_in_order) + exim_exit(EXIT_SUCCESS); } /* End loop for list of messages */ tree_nonrecipients = NULL; @@ -684,24 +781,55 @@ for (int i = queue_run_in_order ? -1 : 0; /* If queue_2stage is true, we do it all again, with the 2stage flag turned off. */ -if (f.queue_2stage) +if (q->queue_2stage) { - f.queue_2stage = FALSE; - queue_run(start_id, stop_id, TRUE); + + /* wait for last children */ + for (int i = 0; i < nelem(qpid); i++) + if (qpid[i]) + { + DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]); + waitpid(qpid[i], NULL, 0); + } + else break; + +#ifdef MEASURE_TIMING + report_time_since(×tamp_startup, US"queue_run 1st phase done"); +#endif + q->queue_2stage = f.queue_2stage = FALSE; + queue_run(q, start_id, stop_id, TRUE); } /* At top level, log the end of the run. */ if (!recurse) - if (*queue_name) + if (q->name) log_write(L_queue_run, LOG_MAIN, "End '%s' queue run: %s", - queue_name, log_detail); + q->name, log_detail); else log_write(L_queue_run, LOG_MAIN, "End queue run: %s", log_detail); } +void +single_queue_run(qrunner * q, const uschar * start_id, const uschar * stop_id) +{ +DEBUG(D_queue_run) debug_printf("Single queue run%s%s%s%s\n", + start_id ? US" starting at " : US"", + start_id ? start_id: US"", + stop_id ? US" stopping at " : US"", + stop_id ? stop_id : US""); + +if (*queue_name) + set_process_info("running the '%s' queue (single queue run)", queue_name); +else + set_process_info("running the queue (single queue run)"); +queue_run(q, start_id, stop_id, FALSE); +} + + + /************************************************ * Count messages on the queue * @@ -710,26 +838,38 @@ if (!recurse) /* Called as a result of -bpc Arguments: none -Returns: nothing +Returns: count */ -void +unsigned queue_count(void) { int subcount; -int count = 0; +unsigned count = 0; uschar subdirs[64]; -for (queue_filename *f = queue_get_spool_list( - -1, /* entire queue */ - subdirs, /* for holding sub list */ - &subcount, /* for subcount */ - FALSE); /* not random */ - f; f = f->next) count++; -fprintf(stdout, "%d\n", count); +(void) queue_get_spool_list(-1, /* entire queue */ + subdirs, /* for holding sub list */ + &subcount, /* for subcount */ + FALSE, /* not random */ + &count); /* just get the count */ +return count; } +#define QUEUE_SIZE_AGE 60 /* update rate for queue_size */ + +unsigned +queue_count_cached(void) +{ +time_t now; +if ((now = time(NULL)) >= queue_size_next) + { + queue_size = queue_count(); + queue_size_next = now + (f.running_in_test_harness ? 3 : QUEUE_SIZE_AGE); + } +return queue_size; +} /************************************************ * List extra deliveries * @@ -779,7 +919,7 @@ Returns: nothing */ void -queue_list(int option, uschar **list, int count) +queue_list(int option, const uschar ** list, int count) { int subcount; int now = (int)time(NULL); @@ -794,8 +934,8 @@ if (count > 0) queue_filename *last = NULL; for (int i = 0; i < count; i++) { - queue_filename *next = - store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2, is_tainted(list[i])); + queue_filename * next = + store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2, list[i]); sprintf(CS next->text, "%s-H", list[i]); next->dir_uschar = '*'; next->next = NULL; @@ -808,20 +948,27 @@ if (count > 0) else qf = queue_get_spool_list( - -1, /* entire queue */ - subdirs, /* for holding sub list */ - &subcount, /* for subcount */ - option >= 8); /* randomize if required */ + -1, /* entire queue */ + subdirs, /* for holding sub list */ + &subcount, /* for subcount */ + option >= QL_UNSORTED, /* randomize if required */ + NULL); /* don't just count */ -if (option >= 8) option -= 8; +option &= ~QL_UNSORTED; /* Now scan the chain and print information, resetting store used each time. */ -for (; - qf && (reset_point = store_mark()); - spool_clear_header_globals(), store_reset(reset_point), qf = qf->next - ) +if (option == QL_MSGID_ONLY) /* Print only the message IDs from the chain */ + for (; qf; qf = qf->next) + fprintf(stdout, "%.*s\n", + is_old_message_id(qf->text) ? MESSAGE_ID_LENGTH_OLD : MESSAGE_ID_LENGTH, + qf->text); + +else for (; + qf && (reset_point = store_mark()); + spool_clear_header_globals(), store_reset(reset_point), qf = qf->next + ) { int rc, save_errno; int size = 0; @@ -851,7 +998,7 @@ for (; that precedes the data. */ if (Ustat(fname, &statbuf) == 0) - size = message_size + statbuf.st_size - SPOOL_DATA_START_OFFSET + 1; + size = message_size + statbuf.st_size - spool_data_start_offset(qf->text) + 1; i = (now - received_time.tv_sec)/60; /* minutes on queue */ if (i > 90) { @@ -875,8 +1022,10 @@ for (; } } - fprintf(stdout, "%s ", string_format_size(size, big_buffer)); - for (int i = 0; i < 16; i++) fputc(qf->text[i], stdout); + fprintf(stdout, "%s %.*s", + string_format_size(size, big_buffer), + is_old_message_id(qf->text) ? MESSAGE_ID_LENGTH_OLD : MESSAGE_ID_LENGTH, + qf->text); if (env_read && sender_address) { @@ -913,14 +1062,14 @@ for (; { for (int i = 0; i < recipients_count; i++) { - tree_node *delivered = + tree_node * delivered = tree_search(tree_nonrecipients, recipients_list[i].address); - if (!delivered || option != 1) + if (!delivered || option != QL_UNDELIVERED_ONLY) printf(" %s %s\n", delivered ? "D" : " ", recipients_list[i].address); if (delivered) delivered->data.val = TRUE; } - if (option == 2 && tree_nonrecipients) + if (option == QL_PLUS_GENERATED && tree_nonrecipients) queue_list_extras(tree_nonrecipients); printf("\n"); } @@ -948,7 +1097,8 @@ Returns: FALSE if there was any problem */ BOOL -queue_action(uschar *id, int action, uschar **argv, int argc, int recipients_arg) +queue_action(const uschar * id, int action, const uschar ** argv, int argc, + int recipients_arg) { BOOL yield = TRUE; BOOL removed = FALSE; @@ -1017,7 +1167,7 @@ if (action >= MSG_SHOW_BODY) } while((rc = read(fd, big_buffer, big_buffer_size)) > 0) - rc = write(fileno(stdout), big_buffer, rc); + rc = write(fileno(stdout), big_buffer, rc); /*XXX why not fwrite() ? */ (void)close(fd); return TRUE; @@ -1160,11 +1310,9 @@ switch(action) case MSG_REMOVE: { - uschar suffix[3]; + uschar suffix[3] = { [0]='-', [2]=0 }; - suffix[0] = '-'; - suffix[2] = 0; - message_subdir[0] = id[5]; + message_subdir[0] = id[MESSAGE_ID_TIME_LEN - 1]; for (int j = 0; j < 2; message_subdir[0] = 0, j++) { @@ -1225,9 +1373,10 @@ switch(action) tree_search(tree_nonrecipients, recipients_list[i].address); if (!delivered) { - uschar * save_local = deliver_localpart; + const uschar * save_local = deliver_localpart; const uschar * save_domain = deliver_domain; - uschar * addr = recipients_list[i].address, * errmsg = NULL; + const uschar * addr = recipients_list[i].address; + uschar * errmsg = NULL; int start, end, dom; if (!parse_extract_address(addr, &errmsg, &start, &end, &dom, TRUE)) @@ -1240,15 +1389,15 @@ switch(action) deliver_domain = dom ? CUS string_copyn(addr+dom, end - dom) : CUS""; - event_raise(event_action, US"msg:fail:internal", - string_sprintf("message removed by %s", username)); + (void) event_raise(event_action, US"msg:fail:internal", + string_sprintf("message removed by %s", username), NULL); deliver_localpart = save_local; deliver_domain = save_domain; } } } - (void) event_raise(event_action, US"msg:complete", NULL); + (void) event_raise(event_action, US"msg:complete", NULL, NULL); #endif log_write(0, LOG_MAIN, "removed by %s", username); log_write(0, LOG_MAIN, "Completed"); @@ -1257,6 +1406,14 @@ switch(action) } + case MSG_SETQUEUE: + /* The global "queue_name_dest" is used as destination, "queue_name" + as source */ + + spool_move_message(id, message_subdir, US"", US""); + break; + + case MSG_MARK_ALL_DELIVERED: for (int i = 0; i < recipients_count; i++) tree_add_nonrecipient(recipients_list[i].address); @@ -1310,13 +1467,13 @@ switch(action) parse_extract_address(argv[recipients_arg], &errmess, &start, &end, &domain, (action == MSG_EDIT_SENDER)); - if (recipient == NULL) + if (!recipient) { yield = FALSE; printf("- error while %s:\n bad address %s: %s\n", doing, argv[recipients_arg], errmess); } - else if (recipient[0] != 0 && domain == 0) + else if (*recipient && domain == 0) { yield = FALSE; printf("- error while %s:\n bad address %s: " @@ -1404,34 +1561,60 @@ queue_check_only(void) { int sep = 0; struct stat statbuf; -const uschar *s; -uschar *ss; -uschar buffer[1024]; +const uschar * s = queue_only_file; +uschar * ss; -if (queue_only_file == NULL) return; - -s = queue_only_file; -while ((ss = string_nextinlist(&s, &sep, buffer, sizeof(buffer))) != NULL) - { - if (Ustrncmp(ss, "smtp", 4) == 0) - { - ss += 4; - if (Ustat(ss, &statbuf) == 0) - { - f.queue_smtp = TRUE; - DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss); - } - } - else - { - if (Ustat(ss, &statbuf) == 0) +if (s) + while ((ss = string_nextinlist(&s, &sep, NULL, 0))) + if (Ustrncmp(ss, "smtp", 4) == 0) { - queue_only = TRUE; - DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss); + ss += 4; + if (Ustat(ss, &statbuf) == 0) + { + f.queue_smtp = TRUE; + DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss); + } } - } + else + if (Ustat(ss, &statbuf) == 0) + { + queue_only = TRUE; + DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss); + } +} + + + +/******************************************************************************/ +/******************************************************************************/ + +#ifndef DISABLE_QUEUE_RAMP +void +queue_notify_daemon(const uschar * msgid) +{ +int bsize = 1 + MESSAGE_ID_LENGTH + 1 + Ustrlen(queue_name) + 1; +uschar * buf = store_get(bsize, GET_UNTAINTED); +int fd; + +DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid); + +buf[0] = NOTIFY_MSG_QRUN; +memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1); +Ustrcpy(buf+1+MESSAGE_ID_LENGTH+1, queue_name); + +if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0) + { + struct sockaddr_un sa_un = {.sun_family = AF_UNIX}; + ssize_t len = daemon_notifier_sockname(&sa_un); + + if (sendto(fd, buf, bsize, 0, (struct sockaddr *)&sa_un, (socklen_t)len) < 0) + DEBUG(D_queue_run) + debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno)); + close(fd); } +else DEBUG(D_queue_run) debug_printf(" socket: %s\n", strerror(errno)); } +#endif #endif /*!COMPILE_UTILITY*/