*************************************************/
/* Copyright (c) University of Cambridge 1995 - 2018 */
+/* Copyright (c) The Exim Maintainers 2020 */
/* See the file NOTICE for conditions of use and distribution. */
/* Functions that operate on the input queue. */
subdirs vector to store list of subdirchars
subcount pointer to int in which to store count of subdirs
randomize TRUE if the order of the list is to be unpredictable
+ pcount If not NULL, fill in with count of files and do not return list
Returns: pointer to a chain of queue name items
*/
static queue_filename *
queue_get_spool_list(int subdiroffset, uschar *subdirs, int *subcount,
- BOOL randomize)
+ BOOL randomize, unsigned * pcount)
{
int i;
int flags = 0;
int subptr;
queue_filename *yield = NULL;
queue_filename *last = NULL;
-struct dirent *ent;
-DIR *dd;
uschar buffer[256];
queue_filename *root[LOG2_MAXNODES];
current time. Use the bottom 16 and just keep re-using them if necessary. When
not randomizing, initialize the sublists for the bottom-up merge sort. */
-if (randomize)
+if (pcount)
+ *pcount = 0;
+else if (randomize)
resetflags = time(NULL) & 0xFFFF;
else
for (i = 0; i < LOG2_MAXNODES; i++)
{
int count = 0;
int subdirchar = subdirs[i]; /* 0 for main directory */
+ DIR *dd;
if (subdirchar != 0)
{
}
DEBUG(D_queue_run) debug_printf("looking in %s\n", buffer);
- if (!(dd = opendir(CS buffer)))
+ if (!(dd = exim_opendir(buffer)))
continue;
/* Now scan the directory. */
- while ((ent = readdir(dd)))
+ for (struct dirent *ent; ent = readdir(dd); )
{
uschar *name = US ent->d_name;
int len = Ustrlen(name);
if (len == SPOOL_NAME_LENGTH &&
Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0)
- {
- queue_filename *next =
- store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name));
- Ustrcpy(next->text, name);
- next->dir_uschar = subdirchar;
-
- /* Handle the creation of a randomized list. The first item becomes both
- the top and bottom of the list. Subsequent items are inserted either at
- the top or the bottom, randomly. This is, I argue, faster than doing a
- sort by allocating a random number to each item, and it also saves having
- to store the number with each item. */
-
- if (randomize)
- if (!yield)
- {
- next->next = NULL;
- yield = last = next;
- }
- else
- {
- if (flags == 0)
- flags = resetflags;
- if ((flags & 1) == 0)
- {
- next->next = yield;
- yield = next;
- }
- else
- {
- next->next = NULL;
- last->next = next;
- last = next;
- }
- flags = flags >> 1;
- }
+ if (pcount)
+ (*pcount)++;
+ else
+ {
+ queue_filename *next =
+ store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name));
+ Ustrcpy(next->text, name);
+ next->dir_uschar = subdirchar;
+
+ /* Handle the creation of a randomized list. The first item becomes both
+ the top and bottom of the list. Subsequent items are inserted either at
+ the top or the bottom, randomly. This is, I argue, faster than doing a
+ sort by allocating a random number to each item, and it also saves having
+ to store the number with each item. */
+
+ if (randomize)
+ if (!yield)
+ {
+ next->next = NULL;
+ yield = last = next;
+ }
+ else
+ {
+ if (flags == 0)
+ flags = resetflags;
+ if ((flags & 1) == 0)
+ {
+ next->next = yield;
+ yield = next;
+ }
+ else
+ {
+ next->next = NULL;
+ last->next = next;
+ last = next;
+ }
+ flags = flags >> 1;
+ }
- /* Otherwise do a bottom-up merge sort based on the name. */
+ /* Otherwise do a bottom-up merge sort based on the name. */
- else
- {
- next->next = NULL;
- for (int j = 0; j < LOG2_MAXNODES; j++)
- if (root[j])
- {
- next = merge_queue_lists(next, root[j]);
- root[j] = j == LOG2_MAXNODES - 1 ? next : NULL;
- }
- else
- {
- root[j] = next;
- break;
- }
- }
- }
+ else
+ {
+ next->next = NULL;
+ for (int j = 0; j < LOG2_MAXNODES; j++)
+ if (root[j])
+ {
+ next = merge_queue_lists(next, root[j]);
+ root[j] = j == LOG2_MAXNODES - 1 ? next : NULL;
+ }
+ else
+ {
+ root[j] = next;
+ break;
+ }
+ }
+ }
}
/* Finished with this directory */
/* When using a bottom-up merge sort, do the final merging of the sublists.
Then pass back the final list of file items. */
-if (!randomize)
+if (!pcount && !randomize)
for (i = 0; i < LOG2_MAXNODES; ++i)
yield = merge_queue_lists(yield, root[i]);
int subcount = 0;
uschar subdirs[64];
pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */
+BOOL single_id = FALSE;
#ifdef MEASURE_TIMING
report_time_since(×tamp_startup, US"queue_run start");
queue_name, log_detail);
else
log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail);
+
+ single_id = start_id && stop_id && !f.queue_2stage
+ && Ustrcmp(start_id, stop_id) == 0;
}
/* If deliver_selectstring is a regex, compile it. */
}
for (queue_filename * fq = queue_get_spool_list(i, subdirs, &subcount,
- !queue_run_in_order);
+ !queue_run_in_order, NULL);
fq; fq = fq->next)
{
pid_t pid;
int i;
if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1])
{
- DEBUG(D_queue_run) debug_printf("q2stage waiting for child\n");
+ DEBUG(D_queue_run) debug_printf("q2stage waiting for child %d\n", (int)qpid[0]);
waitpid(qpid[0], NULL, 0);
DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
- for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
+ if (f.running_in_test_harness) i = 0;
+ else for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
qpid[i] = 0;
}
else
for (i = 0; qpid[i]; ) i++;
- DEBUG(D_queue_run) debug_printf("q2stage forking\n");
- if ((qpid[i] = fork()))
+ if ((qpid[i] = exim_fork(US"qrun-phase-one")))
continue; /* parent loops around */
- DEBUG(D_queue_run) debug_printf("q2stage child\n");
}
/* Skip this message unless it's within the ID limits */
report_time_since(×tamp_startup, US"queue msg selected");
#endif
- if ((pid = fork()) == 0)
+single_item_retry:
+ if ((pid = exim_fork(US"qrun-delivery")) == 0)
{
int rc;
- testharness_pause_ms(100);
(void)close(pfd[pipe_read]);
rc = deliver_message(fq->text, force_delivery, FALSE);
- exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED);
+ exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED
+ ? EXIT_FAILURE : EXIT_SUCCESS);
}
if (pid < 0)
log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from "
"queue run: process %d crashed with signal %d while delivering %s",
(int)pid, status & 0x00ff, fq->text);
+ /* If single-item delivery was untried (likely due to locking)
+ retry once after a delay */
+
+ if (status & 0xff00 && single_id)
+ {
+ single_id = FALSE;
+ DEBUG(D_queue_run) debug_printf("qrun single-item pause before retry\n");
+ millisleep(500);
+ DEBUG(D_queue_run) debug_printf("qrun single-item retry after pause\n");
+ goto single_item_retry;
+ }
+
/* Before continuing, wait till the pipe gets closed at the far end. This
tells us that any children created by the delivery to re-use any SMTP
channels have all finished. Since no process actually writes to the pipe,
/* If initial of a 2-phase run, we are a child - so just exit */
if (f.queue_2stage && !queue_run_in_order)
- exim_exit(EXIT_SUCCESS, US"2-phase child");
+ exim_exit(EXIT_SUCCESS);
/* If we are in the test harness, and this is not the first of a 2-stage
queue run, update fudged queue times. */
go_around:
/* If initial of a 2-phase run, we are a child - so just exit */
if (f.queue_2stage && !queue_run_in_order)
- exim_exit(EXIT_SUCCESS, US"2-phase child");
+ exim_exit(EXIT_SUCCESS);
} /* End loop for list of messages */
tree_nonrecipients = NULL;
unsigned count = 0;
uschar subdirs[64];
-for (queue_filename * f = queue_get_spool_list(
- -1, /* entire queue */
- subdirs, /* for holding sub list */
- &subcount, /* for subcount */
- FALSE); /* not random */
- f; f = f->next) count++;
+(void) queue_get_spool_list(-1, /* entire queue */
+ subdirs, /* for holding sub list */
+ &subcount, /* for subcount */
+ FALSE, /* not random */
+ &count); /* just get the count */
return count;
}
-1, /* entire queue */
subdirs, /* for holding sub list */
&subcount, /* for subcount */
- option >= 8); /* randomize if required */
+ option >= 8, /* randomize if required */
+ NULL); /* don't just count */
if (option >= 8) option -= 8;
parse_extract_address(argv[recipients_arg], &errmess, &start, &end,
&domain, (action == MSG_EDIT_SENDER));
- if (recipient == NULL)
+ if (!recipient)
{
yield = FALSE;
printf("- error while %s:\n bad address %s: %s\n",
doing, argv[recipients_arg], errmess);
}
- else if (recipient[0] != 0 && domain == 0)
+ else if (*recipient && domain == 0)
{
yield = FALSE;
printf("- error while %s:\n bad address %s: "
/******************************************************************************/
/******************************************************************************/
-#ifdef EXPERIMENTAL_QUEUE_RAMP
+#ifndef DISABLE_QUEUE_RAMP
void
queue_notify_daemon(const uschar * msgid)
{
if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0)
{
- struct sockaddr_un sun = {.sun_family = AF_UNIX};
- int slen;
+ struct sockaddr_un sa_un = {.sun_family = AF_UNIX};
#ifdef EXIM_HAVE_ABSTRACT_UNIX_SOCKETS
int len = offsetof(struct sockaddr_un, sun_path) + 1
- + snprintf(sun.sun_path+1, sizeof(sun.sun_path)-1, "%s",
- NOTIFIER_SOCKET_NAME);
- sun.sun_path[0] = 0;
+ + snprintf(sa_un.sun_path+1, sizeof(sa_un.sun_path)-1, "%s",
+ expand_string(notifier_socket));
+ sa_un.sun_path[0] = 0;
#else
int len = offsetof(struct sockaddr_un, sun_path)
- + snprintf(sun.sun_path, sizeof(sun.sun_path), "%s/%s",
- spool_directory, NOTIFIER_SOCKET_NAME);
+ + snprintf(sa_un.sun_path, sizeof(sa_un.sun_path), "%s",
+ expand_string(notifier_socket));
#endif
- if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sun, len) < 0)
+ if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sa_un, len) < 0)
DEBUG(D_queue_run)
debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno));
close(fd);