*************************************************/
/* Copyright (c) University of Cambridge 1995 - 2018 */
+/* Copyright (c) The Exim Maintainers 2020 */
/* See the file NOTICE for conditions of use and distribution. */
/* Functions that operate on the input queue. */
#define LOG2_MAXNODES 32
+#ifndef DISABLE_TLS
+static BOOL queue_tls_init = FALSE;
+#endif
/*************************************************
* Helper sort function for queue_get_spool_list *
subdirs vector to store list of subdirchars
subcount pointer to int in which to store count of subdirs
randomize TRUE if the order of the list is to be unpredictable
+ pcount If not NULL, fill in with count of files and do not return list
Returns: pointer to a chain of queue name items
*/
static queue_filename *
queue_get_spool_list(int subdiroffset, uschar *subdirs, int *subcount,
- BOOL randomize)
+ BOOL randomize, unsigned * pcount)
{
int i;
int flags = 0;
int subptr;
queue_filename *yield = NULL;
queue_filename *last = NULL;
-struct dirent *ent;
-DIR *dd;
uschar buffer[256];
queue_filename *root[LOG2_MAXNODES];
current time. Use the bottom 16 and just keep re-using them if necessary. When
not randomizing, initialize the sublists for the bottom-up merge sort. */
-if (randomize)
+if (pcount)
+ *pcount = 0;
+else if (randomize)
resetflags = time(NULL) & 0xFFFF;
else
for (i = 0; i < LOG2_MAXNODES; i++)
{
int count = 0;
int subdirchar = subdirs[i]; /* 0 for main directory */
+ DIR *dd;
if (subdirchar != 0)
{
}
DEBUG(D_queue_run) debug_printf("looking in %s\n", buffer);
- if (!(dd = opendir(CS buffer)))
+ if (!(dd = exim_opendir(buffer)))
continue;
/* Now scan the directory. */
- while ((ent = readdir(dd)))
+ for (struct dirent *ent; ent = readdir(dd); )
{
uschar *name = US ent->d_name;
int len = Ustrlen(name);
if (len == SPOOL_NAME_LENGTH &&
Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0)
- {
- queue_filename *next =
- store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name));
- Ustrcpy(next->text, name);
- next->dir_uschar = subdirchar;
-
- /* Handle the creation of a randomized list. The first item becomes both
- the top and bottom of the list. Subsequent items are inserted either at
- the top or the bottom, randomly. This is, I argue, faster than doing a
- sort by allocating a random number to each item, and it also saves having
- to store the number with each item. */
-
- if (randomize)
- if (!yield)
- {
- next->next = NULL;
- yield = last = next;
- }
- else
- {
- if (flags == 0)
- flags = resetflags;
- if ((flags & 1) == 0)
- {
- next->next = yield;
- yield = next;
- }
- else
- {
- next->next = NULL;
- last->next = next;
- last = next;
- }
- flags = flags >> 1;
- }
+ if (pcount)
+ (*pcount)++;
+ else
+ {
+ queue_filename *next =
+ store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name));
+ Ustrcpy(next->text, name);
+ next->dir_uschar = subdirchar;
+
+ /* Handle the creation of a randomized list. The first item becomes both
+ the top and bottom of the list. Subsequent items are inserted either at
+ the top or the bottom, randomly. This is, I argue, faster than doing a
+ sort by allocating a random number to each item, and it also saves having
+ to store the number with each item. */
+
+ if (randomize)
+ if (!yield)
+ {
+ next->next = NULL;
+ yield = last = next;
+ }
+ else
+ {
+ if (flags == 0)
+ flags = resetflags;
+ if ((flags & 1) == 0)
+ {
+ next->next = yield;
+ yield = next;
+ }
+ else
+ {
+ next->next = NULL;
+ last->next = next;
+ last = next;
+ }
+ flags = flags >> 1;
+ }
- /* Otherwise do a bottom-up merge sort based on the name. */
+ /* Otherwise do a bottom-up merge sort based on the name. */
- else
- {
- next->next = NULL;
- for (int j = 0; j < LOG2_MAXNODES; j++)
- if (root[j])
- {
- next = merge_queue_lists(next, root[j]);
- root[j] = j == LOG2_MAXNODES - 1 ? next : NULL;
- }
- else
- {
- root[j] = next;
- break;
- }
- }
- }
+ else
+ {
+ next->next = NULL;
+ for (int j = 0; j < LOG2_MAXNODES; j++)
+ if (root[j])
+ {
+ next = merge_queue_lists(next, root[j]);
+ root[j] = j == LOG2_MAXNODES - 1 ? next : NULL;
+ }
+ else
+ {
+ root[j] = next;
+ break;
+ }
+ }
+ }
}
/* Finished with this directory */
/* When using a bottom-up merge sort, do the final merging of the sublists.
Then pass back the final list of file items. */
-if (!randomize)
+if (!pcount && !randomize)
for (i = 0; i < LOG2_MAXNODES; ++i)
yield = merge_queue_lists(yield, root[i]);
uschar *log_detail = NULL;
int subcount = 0;
uschar subdirs[64];
+pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */
+BOOL single_id = FALSE;
#ifdef MEASURE_TIMING
report_time_since(×tamp_startup, US"queue_run start");
p += sprintf(CS p, " -q%s", extras);
if (deliver_selectstring)
- p += sprintf(CS p, " -R%s %s", f.deliver_selectstring_regex? "r" : "",
- deliver_selectstring);
+ {
+ snprintf(CS p, big_buffer_size - (p - big_buffer), " -R%s %s",
+ f.deliver_selectstring_regex? "r" : "", deliver_selectstring);
+ p += Ustrlen(CCS p);
+ }
if (deliver_selectstring_sender)
- p += sprintf(CS p, " -S%s %s", f.deliver_selectstring_sender_regex? "r" : "",
- deliver_selectstring_sender);
+ {
+ snprintf(CS p, big_buffer_size - (p - big_buffer), " -S%s %s",
+ f.deliver_selectstring_sender_regex? "r" : "", deliver_selectstring_sender);
+ p += Ustrlen(CCS p);
+ }
log_detail = string_copy(big_buffer);
if (*queue_name)
queue_name, log_detail);
else
log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail);
+
+ single_id = start_id && stop_id && !f.queue_2stage
+ && Ustrcmp(start_id, stop_id) == 0;
}
/* If deliver_selectstring is a regex, compile it. */
}
for (queue_filename * fq = queue_get_spool_list(i, subdirs, &subcount,
- !queue_run_in_order);
+ !queue_run_in_order, NULL);
fq; fq = fq->next)
{
pid_t pid;
(double)load_average/1000.0,
(double)deliver_queue_load_max/1000.0);
+ /* If initial of a 2-phase run, maintain a set of child procs
+ to get disk parallelism */
+
+ if (f.queue_2stage && !queue_run_in_order)
+ {
+ int i;
+ if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1])
+ {
+ DEBUG(D_queue_run) debug_printf("q2stage waiting for child %d\n", (int)qpid[0]);
+ waitpid(qpid[0], NULL, 0);
+ DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
+ if (f.running_in_test_harness) i = 0;
+ else for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
+ qpid[i] = 0;
+ }
+ else
+ for (i = 0; qpid[i]; ) i++;
+ if ((qpid[i] = exim_fork(US"qrun-phase-one")))
+ continue; /* parent loops around */
+ }
+
/* Skip this message unless it's within the ID limits */
if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0)
- continue;
+ goto go_around;
if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0)
- continue;
+ goto go_around;
/* Check that the message still exists */
message_subdir[0] = fq->dir_uschar;
if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0)
- continue;
+ goto go_around;
/* There are some tests that require the reading of the header file. Ensure
the store used is scavenged afterwards so that this process doesn't keep
follow. If the message is chosen for delivery, the header is read again
in the deliver_message() function, in a subprocess. */
- if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue;
+ if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around;
f.dont_deliver = orig_dont_deliver;
/* Now decide if we want to deliver this message. As we have read the
spool_clear_header_globals();
store_reset(reset_point2);
- if (!wanted) continue; /* With next message */
+ if (!wanted) goto go_around; /* With next message */
}
/* OK, got a message we want to deliver. Create a pipe which will
report_time_since(×tamp_startup, US"queue msg selected");
#endif
- if ((pid = fork()) == 0)
+#ifndef DISABLE_TLS
+ if (!queue_tls_init)
+ {
+ queue_tls_init = TRUE;
+ /* Preload TLS library info for smtp transports. Once, and only if we
+ have a delivery to do. */
+ tls_client_creds_reload(FALSE);
+ }
+#endif
+
+single_item_retry:
+ if ((pid = exim_fork(US"qrun-delivery")) == 0)
{
int rc;
- testharness_pause_ms(100);
(void)close(pfd[pipe_read]);
rc = deliver_message(fq->text, force_delivery, FALSE);
- exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED);
+ exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED
+ ? EXIT_FAILURE : EXIT_SUCCESS);
}
if (pid < 0)
log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from "
"queue run: process %d crashed with signal %d while delivering %s",
(int)pid, status & 0x00ff, fq->text);
+ /* If single-item delivery was untried (likely due to locking)
+ retry once after a delay */
+
+ if (status & 0xff00 && single_id)
+ {
+ single_id = FALSE;
+ DEBUG(D_queue_run) debug_printf("qrun single-item pause before retry\n");
+ millisleep(500);
+ DEBUG(D_queue_run) debug_printf("qrun single-item retry after pause\n");
+ goto single_item_retry;
+ }
+
/* Before continuing, wait till the pipe gets closed at the far end. This
tells us that any children created by the delivery to re-use any SMTP
channels have all finished. Since no process actually writes to the pipe,
(void)close(pfd[pipe_read]);
set_process_info("running queue");
+ /* If initial of a 2-phase run, we are a child - so just exit */
+ if (f.queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS);
+
/* If we are in the test harness, and this is not the first of a 2-stage
queue run, update fudged queue times. */
uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
if (fqtnext) fudged_queue_times = fqtnext + 1;
}
+
+
+ continue;
+
+ go_around:
+ /* If initial of a 2-phase run, we are a child - so just exit */
+ if (f.queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS);
} /* End loop for list of messages */
tree_nonrecipients = NULL;
if (f.queue_2stage)
{
+
+ /* wait for last children */
+ for (int i = 0; i < nelem(qpid); i++)
+ if (qpid[i])
+ {
+ DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]);
+ waitpid(qpid[i], NULL, 0);
+ }
+ else break;
+
+#ifdef MEASURE_TIMING
+ report_time_since(×tamp_startup, US"queue_run 1st phase done");
+#endif
f.queue_2stage = FALSE;
queue_run(start_id, stop_id, TRUE);
}
/* Called as a result of -bpc
Arguments: none
-Returns: nothing
+Returns: count
*/
-void
+unsigned
queue_count(void)
{
int subcount;
-int count = 0;
+unsigned count = 0;
uschar subdirs[64];
-for (queue_filename *f = queue_get_spool_list(
- -1, /* entire queue */
- subdirs, /* for holding sub list */
- &subcount, /* for subcount */
- FALSE); /* not random */
- f; f = f->next) count++;
-fprintf(stdout, "%d\n", count);
+(void) queue_get_spool_list(-1, /* entire queue */
+ subdirs, /* for holding sub list */
+ &subcount, /* for subcount */
+ FALSE, /* not random */
+ &count); /* just get the count */
+return count;
}
+#define QUEUE_SIZE_AGE 60 /* update rate for queue_size */
+
+unsigned
+queue_count_cached(void)
+{
+time_t now;
+if ((now = time(NULL)) >= queue_size_next)
+ {
+ queue_size = queue_count();
+ queue_size_next = now + (f.running_in_test_harness ? 3 : QUEUE_SIZE_AGE);
+ }
+return queue_size;
+}
/************************************************
* List extra deliveries *
-1, /* entire queue */
subdirs, /* for holding sub list */
&subcount, /* for subcount */
- option >= 8); /* randomize if required */
+ option >= 8, /* randomize if required */
+ NULL); /* don't just count */
if (option >= 8) option -= 8;
parse_extract_address(argv[recipients_arg], &errmess, &start, &end,
&domain, (action == MSG_EDIT_SENDER));
- if (recipient == NULL)
+ if (!recipient)
{
yield = FALSE;
printf("- error while %s:\n bad address %s: %s\n",
doing, argv[recipients_arg], errmess);
}
- else if (recipient[0] != 0 && domain == 0)
+ else if (*recipient && domain == 0)
{
yield = FALSE;
printf("- error while %s:\n bad address %s: "
}
}
+
+
+/******************************************************************************/
+/******************************************************************************/
+
+#ifndef DISABLE_QUEUE_RAMP
+void
+queue_notify_daemon(const uschar * msgid)
+{
+uschar buf[MESSAGE_ID_LENGTH + 2];
+int fd;
+
+DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid);
+
+buf[0] = NOTIFY_MSG_QRUN;
+memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1);
+
+if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0)
+ {
+ struct sockaddr_un sa_un = {.sun_family = AF_UNIX};
+
+#ifdef EXIM_HAVE_ABSTRACT_UNIX_SOCKETS
+ int len = offsetof(struct sockaddr_un, sun_path) + 1
+ + snprintf(sa_un.sun_path+1, sizeof(sa_un.sun_path)-1, "%s",
+ expand_string(notifier_socket));
+ sa_un.sun_path[0] = 0;
+#else
+ int len = offsetof(struct sockaddr_un, sun_path)
+ + snprintf(sa_un.sun_path, sizeof(sa_un.sun_path), "%s",
+ expand_string(notifier_socket));
+#endif
+
+ if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sa_un, len) < 0)
+ DEBUG(D_queue_run)
+ debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno));
+ close(fd);
+ }
+else DEBUG(D_queue_run) debug_printf(" socket: %s\n", strerror(errno));
+}
+#endif
+
#endif /*!COMPILE_UTILITY*/
/* End of queue.c */