X-Git-Url: https://git.exim.org/exim.git/blobdiff_plain/63deec8a3ba77fcabf405d9c30fdd65a8b909526..1e835086d1592bdfbcd8577133965b78470840ac:/src/src/queue.c diff --git a/src/src/queue.c b/src/src/queue.c index f86e24b42..d01cde655 100644 --- a/src/src/queue.c +++ b/src/src/queue.c @@ -325,8 +325,8 @@ previous lexically lesser one if the given stop message doesn't exist. Because a queue run can take some time, stat each file before forking, in case it has been delivered in the meantime by some other means. -The global variables queue_run_force and queue_run_local may be set to cause -forced deliveries or local-only deliveries, respectively. +The qrun descriptor variables queue_run_force and queue_run_local may be set to +cause forced deliveries or local-only deliveries, respectively. If deliver_selectstring[_sender] is not NULL, skip messages whose recipients do not contain the string. As this option is typically used when a machine comes @@ -339,6 +339,7 @@ is set so that routing is done for all messages. Thus in the second run those that are routed to the same host should go down the same SMTP connection. Arguments: + q queue-runner descriptor start_id message id to start at, or NULL for all stop_id message id to end at, or NULL for all recurse TRUE if recursing for 2-stage run @@ -347,10 +348,10 @@ Returns: nothing */ void -queue_run(uschar *start_id, uschar *stop_id, BOOL recurse) +queue_run(qrunner * q, uschar * start_id, uschar * stop_id, BOOL recurse) { -BOOL force_delivery = f.queue_run_force || deliver_selectstring != NULL || - deliver_selectstring_sender != NULL; +BOOL force_delivery = q->queue_run_force + || deliver_selectstring || deliver_selectstring_sender; const pcre2_code *selectstring_regex = NULL; const pcre2_code *selectstring_regex_sender = NULL; uschar *log_detail = NULL; @@ -363,6 +364,13 @@ BOOL single_id = FALSE; report_time_since(×tamp_startup, US"queue_run start"); #endif +/* Copy the legacy globals from the newer per-qrunner-desc */ + +queue_name = q->name ? q->name : US""; +f.queue_2stage = q->queue_2stage; +f.deliver_force_thaw = q->deliver_force_thaw; +f.queue_run_local = q->queue_run_local; + /* Cancel any specific queue domains. Turn off the flag that causes SMTP deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag gets set. Save the queue_runner's pid and the flag that indicates any @@ -371,7 +379,7 @@ on TCP/IP channels have queue_run_pid set, but not queue_running. */ queue_domains = NULL; queue_smtp_domains = NULL; -f.queue_smtp = f.queue_2stage; +f.queue_smtp = q->queue_2stage; queue_run_pid = getpid(); f.queue_running = TRUE; @@ -383,11 +391,11 @@ if (!recurse) uschar extras[8]; uschar *p = extras; - if (f.queue_2stage) *p++ = 'q'; - if (f.queue_run_first_delivery) *p++ = 'i'; - if (f.queue_run_force) *p++ = 'f'; - if (f.deliver_force_thaw) *p++ = 'f'; - if (f.queue_run_local) *p++ = 'l'; + if (q->queue_2stage) *p++ = 'q'; + if (q->queue_run_first_delivery) *p++ = 'i'; + if (q->queue_run_force) *p++ = 'f'; + if (q->deliver_force_thaw) *p++ = 'f'; + if (q->queue_run_local) *p++ = 'l'; *p = 0; p = big_buffer; @@ -399,25 +407,25 @@ if (!recurse) if (deliver_selectstring) { snprintf(CS p, big_buffer_size - (p - big_buffer), " -R%s %s", - f.deliver_selectstring_regex? "r" : "", deliver_selectstring); + f.deliver_selectstring_regex ? "r" : "", deliver_selectstring); p += Ustrlen(CCS p); } if (deliver_selectstring_sender) { snprintf(CS p, big_buffer_size - (p - big_buffer), " -S%s %s", - f.deliver_selectstring_sender_regex? "r" : "", deliver_selectstring_sender); + f.deliver_selectstring_sender_regex ? "r" : "", deliver_selectstring_sender); p += Ustrlen(CCS p); } log_detail = string_copy(big_buffer); - if (*queue_name) + if (q->name) log_write(L_queue_run, LOG_MAIN, "Start '%s' queue run: %s", - queue_name, log_detail); + q->name, log_detail); else log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail); - single_id = start_id && stop_id && !f.queue_2stage + single_id = start_id && stop_id && !q->queue_2stage && Ustrcmp(start_id, stop_id) == 0; } @@ -474,7 +482,7 @@ for (int i = queue_run_in_order ? -1 : 0; /* Unless deliveries are forced, if deliver_queue_load_max is non-negative, check that the load average is low enough to permit deliveries. */ - if (!f.queue_run_force && deliver_queue_load_max >= 0) + if (!q->queue_run_force && deliver_queue_load_max >= 0) if ((load_average = os_getloadavg()) > deliver_queue_load_max) { log_write(L_queue_run, LOG_MAIN, "Abandon queue run: %s (load %.2f, max %.2f)", @@ -492,7 +500,7 @@ for (int i = queue_run_in_order ? -1 : 0; /* If initial of a 2-phase run, maintain a set of child procs to get disk parallelism */ - if (f.queue_2stage && !queue_run_in_order) + if (q->queue_2stage && !queue_run_in_order) { int i; if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1]) @@ -530,7 +538,7 @@ for (int i = queue_run_in_order ? -1 : 0; message when many are not going to be delivered. */ if (deliver_selectstring || deliver_selectstring_sender || - f.queue_run_first_delivery) + q->queue_run_first_delivery) { BOOL wanted = TRUE; BOOL orig_dont_deliver = f.dont_deliver; @@ -548,7 +556,7 @@ for (int i = queue_run_in_order ? -1 : 0; header file, we might as well do the freeze test now, and save forking another process. */ - if (f.deliver_freeze && !f.deliver_force_thaw) + if (f.deliver_freeze && !q->deliver_force_thaw) { log_write(L_skip_delivery, LOG_MAIN, "Message is frozen"); wanted = FALSE; @@ -556,7 +564,7 @@ for (int i = queue_run_in_order ? -1 : 0; /* Check first_delivery in the case when there are no message logs. */ - else if (f.queue_run_first_delivery && !f.deliver_firsttime) + else if (q->queue_run_first_delivery && !f.deliver_firsttime) { DEBUG(D_queue_run) debug_printf("%s: not first delivery\n", fq->text); wanted = FALSE; @@ -688,7 +696,7 @@ single_item_retry: /* A zero return means a delivery was attempted; turn off the force flag for any subsequent calls unless queue_force is set. */ - if (!(status & 0xffff)) force_delivery = f.queue_run_force; + if (!(status & 0xffff)) force_delivery = q->queue_run_force; /* If the process crashed, tell somebody */ @@ -723,13 +731,13 @@ single_item_retry: set_process_info("running queue"); /* If initial of a 2-phase run, we are a child - so just exit */ - if (f.queue_2stage && !queue_run_in_order) + if (q->queue_2stage && !queue_run_in_order) exim_exit(EXIT_SUCCESS); /* If we are in the test harness, and this is not the first of a 2-stage queue run, update fudged queue times. */ - if (f.running_in_test_harness && !f.queue_2stage) + if (f.running_in_test_harness && !q->queue_2stage) { uschar * fqtnext = Ustrchr(fudged_queue_times, '/'); if (fqtnext) fudged_queue_times = fqtnext + 1; @@ -740,7 +748,7 @@ single_item_retry: go_around: /* If initial of a 2-phase run, we are a child - so just exit */ - if (f.queue_2stage && !queue_run_in_order) + if (q->queue_2stage && !queue_run_in_order) exim_exit(EXIT_SUCCESS); } /* End loop for list of messages */ @@ -767,7 +775,7 @@ single_item_retry: /* If queue_2stage is true, we do it all again, with the 2stage flag turned off. */ -if (f.queue_2stage) +if (q->queue_2stage) { /* wait for last children */ @@ -782,22 +790,40 @@ if (f.queue_2stage) #ifdef MEASURE_TIMING report_time_since(×tamp_startup, US"queue_run 1st phase done"); #endif - f.queue_2stage = FALSE; - queue_run(start_id, stop_id, TRUE); + q->queue_2stage = f.queue_2stage = FALSE; + queue_run(q, start_id, stop_id, TRUE); } /* At top level, log the end of the run. */ if (!recurse) - if (*queue_name) + if (q->name) log_write(L_queue_run, LOG_MAIN, "End '%s' queue run: %s", - queue_name, log_detail); + q->name, log_detail); else log_write(L_queue_run, LOG_MAIN, "End queue run: %s", log_detail); } +void +single_queue_run(qrunner * q, uschar * start_id, uschar * stop_id) +{ +DEBUG(D_queue_run) debug_printf("Single queue run%s%s%s%s\n", + start_id ? US" starting at " : US"", + start_id ? start_id: US"", + stop_id ? US" stopping at " : US"", + stop_id ? stop_id : US""); + +if (*queue_name) + set_process_info("running the '%s' queue (single queue run)", queue_name); +else + set_process_info("running the queue (single queue run)"); +queue_run(q, start_id, stop_id, FALSE); +} + + + /************************************************ * Count messages on the queue * @@ -1552,20 +1578,22 @@ if (s) void queue_notify_daemon(const uschar * msgid) { -uschar buf[MESSAGE_ID_LENGTH + 2]; +int bsize = 1 + MESSAGE_ID_LENGTH + 1 + Ustrlen(queue_name) + 1; +uschar * buf = store_get(bsize, GET_UNTAINTED); int fd; DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid); buf[0] = NOTIFY_MSG_QRUN; memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1); +Ustrcpy(buf+1+MESSAGE_ID_LENGTH+1, queue_name); if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0) { struct sockaddr_un sa_un = {.sun_family = AF_UNIX}; ssize_t len = daemon_notifier_sockname(&sa_un); - if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sa_un, (socklen_t)len) < 0) + if (sendto(fd, buf, bsize, 0, (struct sockaddr *)&sa_un, (socklen_t)len) < 0) DEBUG(D_queue_run) debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno)); close(fd);