* Exim - an Internet mail transport agent *
*************************************************/
+/* Copyright (c) The Exim Maintainers 2020 - 2022 */
/* Copyright (c) University of Cambridge 1995 - 2018 */
/* See the file NOTICE for conditions of use and distribution. */
+/* SPDX-License-Identifier: GPL-2.0-or-later */
/* Functions that operate on the input queue. */
#define LOG2_MAXNODES 32
+#ifndef DISABLE_TLS
+static BOOL queue_tls_init = FALSE;
+#endif
/*************************************************
* Helper sort function for queue_get_spool_list *
int subptr;
queue_filename *yield = NULL;
queue_filename *last = NULL;
-struct dirent *ent;
-DIR *dd;
uschar buffer[256];
queue_filename *root[LOG2_MAXNODES];
{
int count = 0;
int subdirchar = subdirs[i]; /* 0 for main directory */
+ DIR *dd;
if (subdirchar != 0)
{
}
DEBUG(D_queue_run) debug_printf("looking in %s\n", buffer);
- if (!(dd = opendir(CS buffer)))
+ if (!(dd = exim_opendir(buffer)))
continue;
/* Now scan the directory. */
- while ((ent = readdir(dd)))
+ for (struct dirent *ent; ent = readdir(dd); )
{
uschar *name = US ent->d_name;
int len = Ustrlen(name);
(*pcount)++;
else
{
- queue_filename *next =
- store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name));
+ queue_filename * next =
+ store_get(sizeof(queue_filename) + Ustrlen(name), name);
Ustrcpy(next->text, name);
next->dir_uschar = subdirchar;
a queue run can take some time, stat each file before forking, in case it has
been delivered in the meantime by some other means.
-The global variables queue_run_force and queue_run_local may be set to cause
-forced deliveries or local-only deliveries, respectively.
+The qrun descriptor variables queue_run_force and queue_run_local may be set to
+cause forced deliveries or local-only deliveries, respectively.
If deliver_selectstring[_sender] is not NULL, skip messages whose recipients do
not contain the string. As this option is typically used when a machine comes
that are routed to the same host should go down the same SMTP connection.
Arguments:
+ q queue-runner descriptor
start_id message id to start at, or NULL for all
stop_id message id to end at, or NULL for all
recurse TRUE if recursing for 2-stage run
*/
void
-queue_run(uschar *start_id, uschar *stop_id, BOOL recurse)
+queue_run(qrunner * q, uschar * start_id, uschar * stop_id, BOOL recurse)
{
-BOOL force_delivery = f.queue_run_force || deliver_selectstring != NULL ||
- deliver_selectstring_sender != NULL;
-const pcre *selectstring_regex = NULL;
-const pcre *selectstring_regex_sender = NULL;
+BOOL force_delivery = q->queue_run_force
+ || deliver_selectstring || deliver_selectstring_sender;
+const pcre2_code *selectstring_regex = NULL;
+const pcre2_code *selectstring_regex_sender = NULL;
uschar *log_detail = NULL;
int subcount = 0;
uschar subdirs[64];
pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */
+BOOL single_id = FALSE;
#ifdef MEASURE_TIMING
report_time_since(×tamp_startup, US"queue_run start");
#endif
+/* Copy the legacy globals from the newer per-qrunner-desc */
+
+queue_name = q->name ? q->name : US"";
+f.queue_2stage = q->queue_2stage;
+f.deliver_force_thaw = q->deliver_force_thaw;
+f.queue_run_local = q->queue_run_local;
+
/* Cancel any specific queue domains. Turn off the flag that causes SMTP
deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag
gets set. Save the queue_runner's pid and the flag that indicates any
queue_domains = NULL;
queue_smtp_domains = NULL;
-f.queue_smtp = f.queue_2stage;
+f.queue_smtp = q->queue_2stage;
queue_run_pid = getpid();
f.queue_running = TRUE;
uschar extras[8];
uschar *p = extras;
- if (f.queue_2stage) *p++ = 'q';
- if (f.queue_run_first_delivery) *p++ = 'i';
- if (f.queue_run_force) *p++ = 'f';
- if (f.deliver_force_thaw) *p++ = 'f';
- if (f.queue_run_local) *p++ = 'l';
+ if (q->queue_2stage) *p++ = 'q';
+ if (q->queue_run_first_delivery) *p++ = 'i';
+ if (q->queue_run_force) *p++ = 'f';
+ if (q->deliver_force_thaw) *p++ = 'f';
+ if (q->queue_run_local) *p++ = 'l';
*p = 0;
p = big_buffer;
p += sprintf(CS p, " -q%s", extras);
if (deliver_selectstring)
- p += sprintf(CS p, " -R%s %s", f.deliver_selectstring_regex? "r" : "",
- deliver_selectstring);
+ {
+ snprintf(CS p, big_buffer_size - (p - big_buffer), " -R%s %s",
+ f.deliver_selectstring_regex ? "r" : "", deliver_selectstring);
+ p += Ustrlen(CCS p);
+ }
if (deliver_selectstring_sender)
- p += sprintf(CS p, " -S%s %s", f.deliver_selectstring_sender_regex? "r" : "",
- deliver_selectstring_sender);
+ {
+ snprintf(CS p, big_buffer_size - (p - big_buffer), " -S%s %s",
+ f.deliver_selectstring_sender_regex ? "r" : "", deliver_selectstring_sender);
+ p += Ustrlen(CCS p);
+ }
log_detail = string_copy(big_buffer);
- if (*queue_name)
+ if (q->name)
log_write(L_queue_run, LOG_MAIN, "Start '%s' queue run: %s",
- queue_name, log_detail);
+ q->name, log_detail);
else
log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail);
+
+ single_id = start_id && stop_id && !q->queue_2stage
+ && Ustrcmp(start_id, stop_id) == 0;
}
/* If deliver_selectstring is a regex, compile it. */
if (deliver_selectstring && f.deliver_selectstring_regex)
- selectstring_regex = regex_must_compile(deliver_selectstring, TRUE, FALSE);
+ selectstring_regex = regex_must_compile(deliver_selectstring, MCS_CASELESS, FALSE);
if (deliver_selectstring_sender && f.deliver_selectstring_sender_regex)
selectstring_regex_sender =
- regex_must_compile(deliver_selectstring_sender, TRUE, FALSE);
+ regex_must_compile(deliver_selectstring_sender, MCS_CASELESS, FALSE);
/* If the spool is split into subdirectories, we want to process it one
directory at a time, so as to spread out the directory scanning and the
/* Unless deliveries are forced, if deliver_queue_load_max is non-negative,
check that the load average is low enough to permit deliveries. */
- if (!f.queue_run_force && deliver_queue_load_max >= 0)
+ if (!q->queue_run_force && deliver_queue_load_max >= 0)
if ((load_average = os_getloadavg()) > deliver_queue_load_max)
{
log_write(L_queue_run, LOG_MAIN, "Abandon queue run: %s (load %.2f, max %.2f)",
/* If initial of a 2-phase run, maintain a set of child procs
to get disk parallelism */
- if (f.queue_2stage && !queue_run_in_order)
+ if (q->queue_2stage && !queue_run_in_order)
{
int i;
if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1])
}
else
for (i = 0; qpid[i]; ) i++;
- DEBUG(D_queue_run) debug_printf("q2stage forking\n");
- if ((qpid[i] = fork()))
+ if ((qpid[i] = exim_fork(US"qrun-phase-one")))
continue; /* parent loops around */
- DEBUG(D_queue_run) debug_printf("q2stage child\n");
}
/* Skip this message unless it's within the ID limits */
message when many are not going to be delivered. */
if (deliver_selectstring || deliver_selectstring_sender ||
- f.queue_run_first_delivery)
+ q->queue_run_first_delivery)
{
BOOL wanted = TRUE;
BOOL orig_dont_deliver = f.dont_deliver;
header file, we might as well do the freeze test now, and save forking
another process. */
- if (f.deliver_freeze && !f.deliver_force_thaw)
+ if (f.deliver_freeze && !q->deliver_force_thaw)
{
log_write(L_skip_delivery, LOG_MAIN, "Message is frozen");
wanted = FALSE;
/* Check first_delivery in the case when there are no message logs. */
- else if (f.queue_run_first_delivery && !f.deliver_firsttime)
+ else if (q->queue_run_first_delivery && !f.deliver_firsttime)
{
DEBUG(D_queue_run) debug_printf("%s: not first delivery\n", fq->text);
wanted = FALSE;
else if ( deliver_selectstring_sender
&& !(f.deliver_selectstring_sender_regex
- ? (pcre_exec(selectstring_regex_sender, NULL,
- CS sender_address, Ustrlen(sender_address), 0, PCRE_EOPT,
- NULL, 0) >= 0)
+ ? regex_match(selectstring_regex_sender, sender_address, -1, NULL)
: (strstric(sender_address, deliver_selectstring_sender, FALSE)
!= NULL)
) )
{
uschar *address = recipients_list[i].address;
if ( (f.deliver_selectstring_regex
- ? (pcre_exec(selectstring_regex, NULL, CS address,
- Ustrlen(address), 0, PCRE_EOPT, NULL, 0) >= 0)
+ ? regex_match(selectstring_regex, address, -1, NULL)
: (strstric(address, deliver_selectstring, FALSE) != NULL)
)
&& tree_search(tree_nonrecipients, address) == NULL
report_time_since(×tamp_startup, US"queue msg selected");
#endif
- if ((pid = fork()) == 0)
+#ifndef DISABLE_TLS
+ if (!queue_tls_init)
+ {
+ queue_tls_init = TRUE;
+ /* Preload TLS library info for smtp transports. Once, and only if we
+ have a delivery to do. */
+ tls_client_creds_reload(FALSE);
+ }
+#endif
+
+single_item_retry:
+ if ((pid = exim_fork(US"qrun-delivery")) == 0)
{
int rc;
- testharness_pause_ms(100);
(void)close(pfd[pipe_read]);
rc = deliver_message(fq->text, force_delivery, FALSE);
- exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED);
+ exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED
+ ? EXIT_FAILURE : EXIT_SUCCESS);
}
if (pid < 0)
log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from "
/* A zero return means a delivery was attempted; turn off the force flag
for any subsequent calls unless queue_force is set. */
- if (!(status & 0xffff)) force_delivery = f.queue_run_force;
+ if (!(status & 0xffff)) force_delivery = q->queue_run_force;
/* If the process crashed, tell somebody */
"queue run: process %d crashed with signal %d while delivering %s",
(int)pid, status & 0x00ff, fq->text);
+ /* If single-item delivery was untried (likely due to locking)
+ retry once after a delay */
+
+ if (status & 0xff00 && single_id)
+ {
+ single_id = FALSE;
+ DEBUG(D_queue_run) debug_printf("qrun single-item pause before retry\n");
+ millisleep(500);
+ DEBUG(D_queue_run) debug_printf("qrun single-item retry after pause\n");
+ goto single_item_retry;
+ }
+
/* Before continuing, wait till the pipe gets closed at the far end. This
tells us that any children created by the delivery to re-use any SMTP
channels have all finished. Since no process actually writes to the pipe,
set_process_info("running queue");
/* If initial of a 2-phase run, we are a child - so just exit */
- if (f.queue_2stage && !queue_run_in_order)
- exim_exit(EXIT_SUCCESS, US"2-phase child");
+ if (q->queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS);
/* If we are in the test harness, and this is not the first of a 2-stage
queue run, update fudged queue times. */
- if (f.running_in_test_harness && !f.queue_2stage)
+ if (f.running_in_test_harness && !q->queue_2stage)
{
uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
if (fqtnext) fudged_queue_times = fqtnext + 1;
go_around:
/* If initial of a 2-phase run, we are a child - so just exit */
- if (f.queue_2stage && !queue_run_in_order)
- exim_exit(EXIT_SUCCESS, US"2-phase child");
+ if (q->queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS);
} /* End loop for list of messages */
tree_nonrecipients = NULL;
/* If queue_2stage is true, we do it all again, with the 2stage flag
turned off. */
-if (f.queue_2stage)
+if (q->queue_2stage)
{
/* wait for last children */
#ifdef MEASURE_TIMING
report_time_since(×tamp_startup, US"queue_run 1st phase done");
#endif
- f.queue_2stage = FALSE;
- queue_run(start_id, stop_id, TRUE);
+ q->queue_2stage = f.queue_2stage = FALSE;
+ queue_run(q, start_id, stop_id, TRUE);
}
/* At top level, log the end of the run. */
if (!recurse)
- if (*queue_name)
+ if (q->name)
log_write(L_queue_run, LOG_MAIN, "End '%s' queue run: %s",
- queue_name, log_detail);
+ q->name, log_detail);
else
log_write(L_queue_run, LOG_MAIN, "End queue run: %s", log_detail);
}
+void
+single_queue_run(qrunner * q, uschar * start_id, uschar * stop_id)
+{
+DEBUG(D_queue_run) debug_printf("Single queue run%s%s%s%s\n",
+ start_id ? US" starting at " : US"",
+ start_id ? start_id: US"",
+ stop_id ? US" stopping at " : US"",
+ stop_id ? stop_id : US"");
+
+if (*queue_name)
+ set_process_info("running the '%s' queue (single queue run)", queue_name);
+else
+ set_process_info("running the queue (single queue run)");
+queue_run(q, start_id, stop_id, FALSE);
+}
+
+
+
/************************************************
* Count messages on the queue *
*/
void
-queue_list(int option, uschar **list, int count)
+queue_list(int option, uschar ** list, int count)
{
int subcount;
int now = (int)time(NULL);
queue_filename *last = NULL;
for (int i = 0; i < count; i++)
{
- queue_filename *next =
- store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2, is_tainted(list[i]));
+ queue_filename * next =
+ store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2, list[i]);
sprintf(CS next->text, "%s-H", list[i]);
next->dir_uschar = '*';
next->next = NULL;
else
qf = queue_get_spool_list(
- -1, /* entire queue */
- subdirs, /* for holding sub list */
- &subcount, /* for subcount */
- option >= 8, /* randomize if required */
- NULL); /* don't just count */
+ -1, /* entire queue */
+ subdirs, /* for holding sub list */
+ &subcount, /* for subcount */
+ option >= QL_UNSORTED, /* randomize if required */
+ NULL); /* don't just count */
-if (option >= 8) option -= 8;
+option &= ~QL_UNSORTED;
/* Now scan the chain and print information, resetting store used
each time. */
-for (;
- qf && (reset_point = store_mark());
- spool_clear_header_globals(), store_reset(reset_point), qf = qf->next
- )
+if (option == QL_MSGID_ONLY) /* Print only the message IDs from the chain */
+ for (; qf; qf = qf->next)
+ fprintf(stdout, "%.*s\n", MESSAGE_ID_LENGTH, qf->text);
+
+else for (;
+ qf && (reset_point = store_mark());
+ spool_clear_header_globals(), store_reset(reset_point), qf = qf->next
+ )
{
int rc, save_errno;
int size = 0;
}
}
- fprintf(stdout, "%s ", string_format_size(size, big_buffer));
- for (int i = 0; i < 16; i++) fputc(qf->text[i], stdout);
+ fprintf(stdout, "%s %.*s",
+ string_format_size(size, big_buffer), MESSAGE_ID_LENGTH, qf->text);
if (env_read && sender_address)
{
{
for (int i = 0; i < recipients_count; i++)
{
- tree_node *delivered =
+ tree_node * delivered =
tree_search(tree_nonrecipients, recipients_list[i].address);
- if (!delivered || option != 1)
+ if (!delivered || option != QL_UNDELIVERED_ONLY)
printf(" %s %s\n",
delivered ? "D" : " ", recipients_list[i].address);
if (delivered) delivered->data.val = TRUE;
}
- if (option == 2 && tree_nonrecipients)
+ if (option == QL_PLUS_GENERATED && tree_nonrecipients)
queue_list_extras(tree_nonrecipients);
printf("\n");
}
deliver_domain = dom
? CUS string_copyn(addr+dom, end - dom) : CUS"";
- event_raise(event_action, US"msg:fail:internal",
- string_sprintf("message removed by %s", username));
+ (void) event_raise(event_action, US"msg:fail:internal",
+ string_sprintf("message removed by %s", username), NULL);
deliver_localpart = save_local;
deliver_domain = save_domain;
}
}
}
- (void) event_raise(event_action, US"msg:complete", NULL);
+ (void) event_raise(event_action, US"msg:complete", NULL, NULL);
#endif
log_write(0, LOG_MAIN, "removed by %s", username);
log_write(0, LOG_MAIN, "Completed");
parse_extract_address(argv[recipients_arg], &errmess, &start, &end,
&domain, (action == MSG_EDIT_SENDER));
- if (recipient == NULL)
+ if (!recipient)
{
yield = FALSE;
printf("- error while %s:\n bad address %s: %s\n",
doing, argv[recipients_arg], errmess);
}
- else if (recipient[0] != 0 && domain == 0)
+ else if (*recipient && domain == 0)
{
yield = FALSE;
printf("- error while %s:\n bad address %s: "
/******************************************************************************/
/******************************************************************************/
-#ifdef EXPERIMENTAL_QUEUE_RAMP
+#ifndef DISABLE_QUEUE_RAMP
void
queue_notify_daemon(const uschar * msgid)
{
-uschar buf[MESSAGE_ID_LENGTH + 2];
+int bsize = 1 + MESSAGE_ID_LENGTH + 1 + Ustrlen(queue_name) + 1;
+uschar * buf = store_get(bsize, GET_UNTAINTED);
int fd;
DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid);
buf[0] = NOTIFY_MSG_QRUN;
memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1);
+Ustrcpy(buf+1+MESSAGE_ID_LENGTH+1, queue_name);
if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0)
{
struct sockaddr_un sa_un = {.sun_family = AF_UNIX};
- int slen;
-
-#ifdef EXIM_HAVE_ABSTRACT_UNIX_SOCKETS
- int len = offsetof(struct sockaddr_un, sun_path) + 1
- + snprintf(sa_un.sun_path+1, sizeof(sa_un.sun_path)-1, "%s",
- NOTIFIER_SOCKET_NAME);
- sa_un.sun_path[0] = 0;
-#else
- int len = offsetof(struct sockaddr_un, sun_path)
- + snprintf(sa_un.sun_path, sizeof(sa_un.sun_path), "%s/%s",
- spool_directory, NOTIFIER_SOCKET_NAME);
-#endif
+ ssize_t len = daemon_notifier_sockname(&sa_un);
- if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sa_un, len) < 0)
+ if (sendto(fd, buf, bsize, 0, (struct sockaddr *)&sa_un, (socklen_t)len) < 0)
DEBUG(D_queue_run)
debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno));
close(fd);