* Exim - an Internet mail transport agent *
*************************************************/
-/* Copyright (c) University of Cambridge 1995 - 2015 */
+/* Copyright (c) University of Cambridge 1995 - 2018 */
/* See the file NOTICE for conditions of use and distribution. */
/* Functions that operate on the input queue. */
-/* Routines with knowlege of spool layout */
-
-static void
-spool_pname_buf(uschar * buf, int len)
-{
-snprintf(CS buf, len, "%s/%s/input", spool_directory, queue_name);
-}
-
-static uschar *
-spool_dname(const uschar * purpose, uschar * subdir)
-{
-return string_sprintf("%s/%s/%s/%s",
- spool_directory, queue_name, purpose, subdir);
-}
-
-uschar *
-spool_sname(const uschar * purpose, uschar * subdir)
-{
-return string_sprintf("%s%s%s%s%s",
- queue_name, *queue_name ? "/" : "",
- purpose,
- *subdir ? "/" : "", subdir);
-}
-
-uschar *
-spool_fname(const uschar * purpose, uschar * subdir, uschar * fname, uschar * suffix)
-{
-return string_sprintf("%s/%s/%s/%s/%s%s",
- spool_directory, queue_name, purpose, subdir, fname, suffix);
-}
queue_filename *first = NULL;
queue_filename **append = &first;
-while (a != NULL && b != NULL)
+while (a && b)
{
- if (Ustrcmp(a->text, b->text) < 0)
+ int d;
+ if ((d = Ustrncmp(a->text, b->text, 6)) == 0)
+ d = Ustrcmp(a->text + 14, b->text + 14);
+ if (d < 0)
{
*append = a;
append= &a->next;
}
}
-*append=((a != NULL)? a : b);
+*append = a ? a : b;
return first;
}
subdirs vector to store list of subdirchars
subcount pointer to int in which to store count of subdirs
randomize TRUE if the order of the list is to be unpredictable
+ pcount If not NULL, fill in with count of files and do not return list
Returns: pointer to a chain of queue name items
*/
static queue_filename *
queue_get_spool_list(int subdiroffset, uschar *subdirs, int *subcount,
- BOOL randomize)
+ BOOL randomize, unsigned * pcount)
{
int i;
int flags = 0;
current time. Use the bottom 16 and just keep re-using them if necessary. When
not randomizing, initialize the sublists for the bottom-up merge sort. */
-if (randomize) resetflags = time(NULL) & 0xFFFF;
- else for (i = 0; i < LOG2_MAXNODES; i++) root[i] = NULL;
+if (pcount)
+ *pcount = 0;
+else if (randomize)
+ resetflags = time(NULL) & 0xFFFF;
+else
+ for (i = 0; i < LOG2_MAXNODES; i++)
+ root[i] = NULL;
/* If processing the full queue, or just the top-level, start at the base
directory, and initialize the first subdirectory name (as none). Otherwise,
subdirs[0] = 0;
*subcount = 0;
}
-else i = subdiroffset;
+else
+ i = subdiroffset;
/* Set up prototype for the directory name. */
/* Now scan the directory. */
- while ((ent = readdir(dd)) != NULL)
+ while ((ent = readdir(dd)))
{
uschar *name = US ent->d_name;
int len = Ustrlen(name);
if (len == SPOOL_NAME_LENGTH &&
Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0)
- {
- queue_filename *next =
- store_get(sizeof(queue_filename) + Ustrlen(name));
- Ustrcpy(next->text, name);
- next->dir_uschar = subdirchar;
-
- /* Handle the creation of a randomized list. The first item becomes both
- the top and bottom of the list. Subsequent items are inserted either at
- the top or the bottom, randomly. This is, I argue, faster than doing a
- sort by allocating a random number to each item, and it also saves having
- to store the number with each item. */
-
- if (randomize)
- {
- if (yield == NULL)
- {
- next->next = NULL;
- yield = last = next;
- }
- else
- {
- if (flags == 0) flags = resetflags;
- if ((flags & 1) == 0)
- {
- next->next = yield;
- yield = next;
- }
- else
- {
- next->next = NULL;
- last->next = next;
- last = next;
- }
- flags = flags >> 1;
- }
- }
+ if (pcount)
+ (*pcount)++;
+ else
+ {
+ queue_filename *next =
+ store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name));
+ Ustrcpy(next->text, name);
+ next->dir_uschar = subdirchar;
+
+ /* Handle the creation of a randomized list. The first item becomes both
+ the top and bottom of the list. Subsequent items are inserted either at
+ the top or the bottom, randomly. This is, I argue, faster than doing a
+ sort by allocating a random number to each item, and it also saves having
+ to store the number with each item. */
+
+ if (randomize)
+ if (!yield)
+ {
+ next->next = NULL;
+ yield = last = next;
+ }
+ else
+ {
+ if (flags == 0)
+ flags = resetflags;
+ if ((flags & 1) == 0)
+ {
+ next->next = yield;
+ yield = next;
+ }
+ else
+ {
+ next->next = NULL;
+ last->next = next;
+ last = next;
+ }
+ flags = flags >> 1;
+ }
- /* Otherwise do a bottom-up merge sort based on the name. */
+ /* Otherwise do a bottom-up merge sort based on the name. */
- else
- {
- int j;
- next->next = NULL;
- for (j = 0; j < LOG2_MAXNODES; j++)
- {
- if (root[j] != NULL)
- {
- next = merge_queue_lists(next, root[j]);
- root[j] = (j == LOG2_MAXNODES - 1)? next : NULL;
- }
- else
- {
- root[j] = next;
- break;
- }
- }
- }
- }
+ else
+ {
+ next->next = NULL;
+ for (int j = 0; j < LOG2_MAXNODES; j++)
+ if (root[j])
+ {
+ next = merge_queue_lists(next, root[j]);
+ root[j] = j == LOG2_MAXNODES - 1 ? next : NULL;
+ }
+ else
+ {
+ root[j] = next;
+ break;
+ }
+ }
+ }
}
/* Finished with this directory */
/* If we have just scanned the base directory, and subdiroffset is 0,
we do not want to continue scanning the sub-directories. */
- else
- {
- if (subdiroffset == 0) break;
- }
+ else if (subdiroffset == 0)
+ break;
} /* Loop for multiple subdirectories */
/* When using a bottom-up merge sort, do the final merging of the sublists.
Then pass back the final list of file items. */
-if (!randomize)
+if (!pcount && !randomize)
for (i = 0; i < LOG2_MAXNODES; ++i)
yield = merge_queue_lists(yield, root[i]);
void
queue_run(uschar *start_id, uschar *stop_id, BOOL recurse)
{
-BOOL force_delivery = queue_run_force || deliver_selectstring != NULL ||
+BOOL force_delivery = f.queue_run_force || deliver_selectstring != NULL ||
deliver_selectstring_sender != NULL;
const pcre *selectstring_regex = NULL;
const pcre *selectstring_regex_sender = NULL;
uschar *log_detail = NULL;
int subcount = 0;
-int i;
uschar subdirs[64];
+pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */
+BOOL single_id = FALSE;
+
+#ifdef MEASURE_TIMING
+report_time_since(×tamp_startup, US"queue_run start");
+#endif
/* Cancel any specific queue domains. Turn off the flag that causes SMTP
deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag
queue_domains = NULL;
queue_smtp_domains = NULL;
-queue_smtp = queue_2stage;
+f.queue_smtp = f.queue_2stage;
queue_run_pid = getpid();
-queue_running = TRUE;
+f.queue_running = TRUE;
/* Log the true start of a queue run, and fancy options */
uschar extras[8];
uschar *p = extras;
- if (queue_2stage) *p++ = 'q';
- if (queue_run_first_delivery) *p++ = 'i';
- if (queue_run_force) *p++ = 'f';
- if (deliver_force_thaw) *p++ = 'f';
- if (queue_run_local) *p++ = 'l';
+ if (f.queue_2stage) *p++ = 'q';
+ if (f.queue_run_first_delivery) *p++ = 'i';
+ if (f.queue_run_force) *p++ = 'f';
+ if (f.deliver_force_thaw) *p++ = 'f';
+ if (f.queue_run_local) *p++ = 'l';
*p = 0;
p = big_buffer;
- sprintf(CS p, "pid=%d", (int)queue_run_pid);
- while (*p != 0) p++;
+ p += sprintf(CS p, "pid=%d", (int)queue_run_pid);
if (extras[0] != 0)
- {
- sprintf(CS p, " -q%s", extras);
- while (*p != 0) p++;
- }
+ p += sprintf(CS p, " -q%s", extras);
- if (deliver_selectstring != NULL)
- {
- sprintf(CS p, " -R%s %s", deliver_selectstring_regex? "r" : "",
+ if (deliver_selectstring)
+ p += sprintf(CS p, " -R%s %s", f.deliver_selectstring_regex? "r" : "",
deliver_selectstring);
- while (*p != 0) p++;
- }
- if (deliver_selectstring_sender != NULL)
- {
- sprintf(CS p, " -S%s %s", deliver_selectstring_sender_regex? "r" : "",
+ if (deliver_selectstring_sender)
+ p += sprintf(CS p, " -S%s %s", f.deliver_selectstring_sender_regex? "r" : "",
deliver_selectstring_sender);
- while (*p != 0) p++;
- }
log_detail = string_copy(big_buffer);
if (*queue_name)
queue_name, log_detail);
else
log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail);
+
+ single_id = start_id && stop_id && !f.queue_2stage
+ && Ustrcmp(start_id, stop_id) == 0;
}
/* If deliver_selectstring is a regex, compile it. */
-if (deliver_selectstring != NULL && deliver_selectstring_regex)
+if (deliver_selectstring && f.deliver_selectstring_regex)
selectstring_regex = regex_must_compile(deliver_selectstring, TRUE, FALSE);
-if (deliver_selectstring_sender != NULL && deliver_selectstring_sender_regex)
+if (deliver_selectstring_sender && f.deliver_selectstring_sender_regex)
selectstring_regex_sender =
regex_must_compile(deliver_selectstring_sender, TRUE, FALSE);
When the first argument of queue_get_spool_list() is 0, it scans the top
directory, fills in subdirs, and sets subcount. The order of the directories is
then randomized after the first time through, before they are scanned in
-subsqeuent iterations.
+subsequent iterations.
When the first argument of queue_get_spool_list() is -1 (for queue_run_in_
order), it scans all directories and makes a single message list. */
-for (i = (queue_run_in_order? -1 : 0);
- i <= (queue_run_in_order? -1 : subcount);
+for (int i = queue_run_in_order ? -1 : 0;
+ i <= (queue_run_in_order ? -1 : subcount);
i++)
{
- queue_filename *f;
- void *reset_point1 = store_get(0);
+ rmark reset_point1 = store_mark();
DEBUG(D_queue_run)
{
debug_printf("queue running subdirectory '%c'\n", subdirs[i]);
}
- for (f = queue_get_spool_list(i, subdirs, &subcount, !queue_run_in_order);
- f != NULL;
- f = f->next)
+ for (queue_filename * fq = queue_get_spool_list(i, subdirs, &subcount,
+ !queue_run_in_order, NULL);
+ fq; fq = fq->next)
{
pid_t pid;
int status;
/* Unless deliveries are forced, if deliver_queue_load_max is non-negative,
check that the load average is low enough to permit deliveries. */
- if (!queue_run_force && deliver_queue_load_max >= 0)
- {
- load_average = os_getloadavg();
- if (load_average > deliver_queue_load_max)
+ if (!f.queue_run_force && deliver_queue_load_max >= 0)
+ if ((load_average = os_getloadavg()) > deliver_queue_load_max)
{
log_write(L_queue_run, LOG_MAIN, "Abandon queue run: %s (load %.2f, max %.2f)",
log_detail,
break;
}
else
- {
DEBUG(D_load) debug_printf("load average = %.2f max = %.2f\n",
(double)load_average/1000.0,
(double)deliver_queue_load_max/1000.0);
- }
+
+ /* If initial of a 2-phase run, maintain a set of child procs
+ to get disk parallelism */
+
+ if (f.queue_2stage && !queue_run_in_order)
+ {
+ int i;
+ if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1])
+ {
+ DEBUG(D_queue_run) debug_printf("q2stage waiting for child %d\n", (int)qpid[0]);
+ waitpid(qpid[0], NULL, 0);
+ DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
+ if (f.running_in_test_harness) i = 0;
+ else for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
+ qpid[i] = 0;
+ }
+ else
+ for (i = 0; qpid[i]; ) i++;
+ DEBUG(D_queue_run) debug_printf("q2stage forking\n");
+ if ((qpid[i] = fork()))
+ continue; /* parent loops around */
+ DEBUG(D_queue_run) debug_printf("q2stage child\n");
}
/* Skip this message unless it's within the ID limits */
- if (stop_id != NULL && Ustrncmp(f->text, stop_id, MESSAGE_ID_LENGTH) > 0)
- continue;
- if (start_id != NULL && Ustrncmp(f->text, start_id, MESSAGE_ID_LENGTH) < 0)
- continue;
+ if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0)
+ goto go_around;
+ if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0)
+ goto go_around;
/* Check that the message still exists */
- message_subdir[0] = f->dir_uschar;
- if (Ustat(spool_fname(US"input", message_subdir, f->text, US""), &statbuf) < 0)
- continue;
+ message_subdir[0] = fq->dir_uschar;
+ if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0)
+ goto go_around;
/* There are some tests that require the reading of the header file. Ensure
the store used is scavenged afterwards so that this process doesn't keep
delivering, but it's cheaper than forking a delivery process for each
message when many are not going to be delivered. */
- if (deliver_selectstring != NULL || deliver_selectstring_sender != NULL ||
- queue_run_first_delivery)
+ if (deliver_selectstring || deliver_selectstring_sender ||
+ f.queue_run_first_delivery)
{
BOOL wanted = TRUE;
- BOOL orig_dont_deliver = dont_deliver;
- void *reset_point2 = store_get(0);
+ BOOL orig_dont_deliver = f.dont_deliver;
+ rmark reset_point2 = store_mark();
/* Restore the original setting of dont_deliver after reading the header,
so that a setting for a particular message doesn't force it for any that
follow. If the message is chosen for delivery, the header is read again
in the deliver_message() function, in a subprocess. */
- if (spool_read_header(f->text, FALSE, TRUE) != spool_read_OK) continue;
- dont_deliver = orig_dont_deliver;
+ if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around;
+ f.dont_deliver = orig_dont_deliver;
/* Now decide if we want to deliver this message. As we have read the
header file, we might as well do the freeze test now, and save forking
another process. */
- if (deliver_freeze && !deliver_force_thaw)
+ if (f.deliver_freeze && !f.deliver_force_thaw)
{
log_write(L_skip_delivery, LOG_MAIN, "Message is frozen");
wanted = FALSE;
/* Check first_delivery in the case when there are no message logs. */
- else if (queue_run_first_delivery && !deliver_firsttime)
+ else if (f.queue_run_first_delivery && !f.deliver_firsttime)
{
- DEBUG(D_queue_run) debug_printf("%s: not first delivery\n", f->text);
+ DEBUG(D_queue_run) debug_printf("%s: not first delivery\n", fq->text);
wanted = FALSE;
}
- /* Check for a matching address if deliver_selectstring[_sender} is set.
+ /* Check for a matching address if deliver_selectstring[_sender] is set.
If so, we do a fully delivery - don't want to omit other addresses since
their routing might trigger re-writing etc. */
/* Sender matching */
- else if (deliver_selectstring_sender != NULL &&
- !(deliver_selectstring_sender_regex?
- (pcre_exec(selectstring_regex_sender, NULL, CS sender_address,
- Ustrlen(sender_address), 0, PCRE_EOPT, NULL, 0) >= 0)
- :
- (strstric(sender_address, deliver_selectstring_sender, FALSE)
- != NULL)))
+ else if ( deliver_selectstring_sender
+ && !(f.deliver_selectstring_sender_regex
+ ? (pcre_exec(selectstring_regex_sender, NULL,
+ CS sender_address, Ustrlen(sender_address), 0, PCRE_EOPT,
+ NULL, 0) >= 0)
+ : (strstric(sender_address, deliver_selectstring_sender, FALSE)
+ != NULL)
+ ) )
{
DEBUG(D_queue_run) debug_printf("%s: sender address did not match %s\n",
- f->text, deliver_selectstring_sender);
+ fq->text, deliver_selectstring_sender);
wanted = FALSE;
}
/* Recipient matching */
- else if (deliver_selectstring != NULL)
+ else if (deliver_selectstring)
{
int i;
for (i = 0; i < recipients_count; i++)
{
uschar *address = recipients_list[i].address;
- if ((deliver_selectstring_regex?
- (pcre_exec(selectstring_regex, NULL, CS address,
- Ustrlen(address), 0, PCRE_EOPT, NULL, 0) >= 0)
- :
- (strstric(address, deliver_selectstring, FALSE) != NULL))
- &&
- tree_search(tree_nonrecipients, address) == NULL)
+ if ( (f.deliver_selectstring_regex
+ ? (pcre_exec(selectstring_regex, NULL, CS address,
+ Ustrlen(address), 0, PCRE_EOPT, NULL, 0) >= 0)
+ : (strstric(address, deliver_selectstring, FALSE) != NULL)
+ )
+ && tree_search(tree_nonrecipients, address) == NULL
+ )
break;
}
{
DEBUG(D_queue_run)
debug_printf("%s: no recipient address matched %s\n",
- f->text, deliver_selectstring);
+ fq->text, deliver_selectstring);
wanted = FALSE;
}
}
/* Recover store used when reading the header */
+ spool_clear_header_globals();
store_reset(reset_point2);
- if (!wanted) continue; /* With next message */
+ if (!wanted) goto go_around; /* With next message */
}
/* OK, got a message we want to deliver. Create a pipe which will
pretty cheap. */
if (pipe(pfd) < 0)
- {
log_write(0, LOG_MAIN|LOG_PANIC_DIE, "failed to create pipe in queue "
"runner process %d: %s", queue_run_pid, strerror(errno));
- }
queue_run_pipe = pfd[pipe_write]; /* To ensure it gets passed on. */
/* Make sure it isn't stdin. This seems unlikely, but just to be on the
/* Now deliver the message; get the id by cutting the -H off the file
name. The return of the process is zero if a delivery was attempted. */
- set_process_info("running queue: %s", f->text);
- f->text[SPOOL_NAME_LENGTH-2] = 0;
+ set_process_info("running queue: %s", fq->text);
+ fq->text[SPOOL_NAME_LENGTH-2] = 0;
+#ifdef MEASURE_TIMING
+ report_time_since(×tamp_startup, US"queue msg selected");
+#endif
+
+single_item_retry:
if ((pid = fork()) == 0)
{
int rc;
- if (running_in_test_harness) millisleep(100);
+ testharness_pause_ms(100);
(void)close(pfd[pipe_read]);
- rc = deliver_message(f->text, force_delivery, FALSE);
- _exit(rc == DELIVER_NOT_ATTEMPTED);
+ rc = deliver_message(fq->text, force_delivery, FALSE);
+ exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED, US"qrun-delivery");
}
if (pid < 0)
log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from "
then wait for the first level process to terminate. */
(void)close(pfd[pipe_write]);
- set_process_info("running queue: waiting for %s (%d)", f->text, pid);
+ set_process_info("running queue: waiting for %s (%d)", fq->text, pid);
while (wait(&status) != pid);
/* A zero return means a delivery was attempted; turn off the force flag
for any subsequent calls unless queue_force is set. */
- if ((status & 0xffff) == 0) force_delivery = queue_run_force;
+ if (!(status & 0xffff)) force_delivery = f.queue_run_force;
/* If the process crashed, tell somebody */
- else if ((status & 0x00ff) != 0)
- {
+ else if (status & 0x00ff)
log_write(0, LOG_MAIN|LOG_PANIC,
"queue run: process %d crashed with signal %d while delivering %s",
- (int)pid, status & 0x00ff, f->text);
+ (int)pid, status & 0x00ff, fq->text);
+
+ /* If single-item delivery was untried (likely due to locking)
+ retry once after a delay */
+
+ if (status & 0xff00 && single_id)
+ {
+ single_id = FALSE;
+ DEBUG(D_queue_run) debug_printf("qrun single-item pause before retry\n");
+ millisleep(500);
+ DEBUG(D_queue_run) debug_printf("qrun single-item retry after pause\n");
+ goto single_item_retry;
}
/* Before continuing, wait till the pipe gets closed at the far end. This
the mere fact that read() unblocks is enough. */
set_process_info("running queue: waiting for children of %d", pid);
- if (read(pfd[pipe_read], buffer, sizeof(buffer)) > 0)
- log_write(0, LOG_MAIN|LOG_PANIC, "queue run: unexpected data on pipe");
+ if ((status = read(pfd[pipe_read], buffer, sizeof(buffer))) != 0)
+ log_write(0, LOG_MAIN|LOG_PANIC, status > 0 ?
+ "queue run: unexpected data on pipe" : "queue run: error on pipe: %s",
+ strerror(errno));
(void)close(pfd[pipe_read]);
set_process_info("running queue");
+ /* If initial of a 2-phase run, we are a child - so just exit */
+ if (f.queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS, US"2-phase child");
+
/* If we are in the test harness, and this is not the first of a 2-stage
queue run, update fudged queue times. */
- if (running_in_test_harness && !queue_2stage)
+ if (f.running_in_test_harness && !f.queue_2stage)
{
- uschar *fqtnext = Ustrchr(fudged_queue_times, '/');
- if (fqtnext != NULL) fudged_queue_times = fqtnext + 1;
+ uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
+ if (fqtnext) fudged_queue_times = fqtnext + 1;
}
+
+
+ continue;
+
+ go_around:
+ /* If initial of a 2-phase run, we are a child - so just exit */
+ if (f.queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS, US"2-phase child");
} /* End loop for list of messages */
+ tree_nonrecipients = NULL;
store_reset(reset_point1); /* Scavenge list of messages */
/* If this was the first time through for random order processing, and
sub-directories have been found, randomize their order if necessary. */
if (i == 0 && subcount > 1 && !queue_run_in_order)
- {
- int j;
- for (j = 1; j <= subcount; j++)
+ for (int j = 1; j <= subcount; j++)
{
- int r = random_number(100);
- if (r >= 50)
+ int r;
+ if ((r = random_number(100)) >= 50)
{
int k = (r % subcount) + 1;
int x = subdirs[j];
subdirs[k] = x;
}
}
- }
} /* End loop for multiple directories */
/* If queue_2stage is true, we do it all again, with the 2stage flag
turned off. */
-if (queue_2stage)
+if (f.queue_2stage)
{
- queue_2stage = FALSE;
+
+ /* wait for last children */
+ for (int i = 0; i < nelem(qpid); i++)
+ if (qpid[i])
+ {
+ DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]);
+ waitpid(qpid[i], NULL, 0);
+ }
+ else break;
+
+#ifdef MEASURE_TIMING
+ report_time_since(×tamp_startup, US"queue_run 1st phase done");
+#endif
+ f.queue_2stage = FALSE;
queue_run(start_id, stop_id, TRUE);
}
/* Called as a result of -bpc
Arguments: none
-Returns: nothing
+Returns: count
*/
-void
+unsigned
queue_count(void)
{
int subcount;
-int count = 0;
-queue_filename *f = NULL;
+unsigned count = 0;
uschar subdirs[64];
-f = queue_get_spool_list(
- -1, /* entire queue */
- subdirs, /* for holding sub list */
- &subcount, /* for subcount */
- FALSE); /* not random */
-for (; f != NULL; f = f->next) count++;
-fprintf(stdout, "%d\n", count);
+
+(void) queue_get_spool_list(-1, /* entire queue */
+ subdirs, /* for holding sub list */
+ &subcount, /* for subcount */
+ FALSE, /* not random */
+ &count); /* just get the count */
+return count;
}
+#define QUEUE_SIZE_AGE 60 /* update rate for queue_size */
+
+unsigned
+queue_count_cached(void)
+{
+time_t now;
+if ((now = time(NULL)) >= queue_size_next)
+ {
+ queue_size = queue_count();
+ queue_size_next = now + (f.running_in_test_harness ? 3 : QUEUE_SIZE_AGE);
+ }
+return queue_size;
+}
/************************************************
* List extra deliveries *
Returns: nothing
*/
-static void queue_list_extras(tree_node *p)
+static void
+queue_list_extras(tree_node *p)
{
-if (p->left != NULL) queue_list_extras(p->left);
+if (p->left) queue_list_extras(p->left);
if (!p->data.val) printf(" +D %s\n", p->name);
-if (p->right != NULL) queue_list_extras(p->right);
+if (p->right) queue_list_extras(p->right);
}
void
queue_list(int option, uschar **list, int count)
{
-int i;
int subcount;
int now = (int)time(NULL);
-void *reset_point;
-queue_filename *f = NULL;
+rmark reset_point;
+queue_filename * qf = NULL;
uschar subdirs[64];
/* If given a list of messages, build a chain containing their ids. */
if (count > 0)
{
queue_filename *last = NULL;
- for (i = 0; i < count; i++)
+ for (int i = 0; i < count; i++)
{
queue_filename *next =
- store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2);
+ store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2, is_tainted(list[i]));
sprintf(CS next->text, "%s-H", list[i]);
next->dir_uschar = '*';
next->next = NULL;
- if (i == 0) f = next; else last->next = next;
+ if (i == 0) qf = next; else last->next = next;
last = next;
}
}
/* Otherwise get a list of the entire queue, in order if necessary. */
else
- f = queue_get_spool_list(
+ qf = queue_get_spool_list(
-1, /* entire queue */
subdirs, /* for holding sub list */
&subcount, /* for subcount */
- option >= 8); /* randomize if required */
+ option >= 8, /* randomize if required */
+ NULL); /* don't just count */
if (option >= 8) option -= 8;
/* Now scan the chain and print information, resetting store used
each time. */
-reset_point = store_get(0);
-
-for (; f != NULL; f = f->next)
+for (;
+ qf && (reset_point = store_mark());
+ spool_clear_header_globals(), store_reset(reset_point), qf = qf->next
+ )
{
int rc, save_errno;
int size = 0;
BOOL env_read;
- store_reset(reset_point);
message_size = 0;
- message_subdir[0] = f->dir_uschar;
- rc = spool_read_header(f->text, FALSE, count <= 0);
- if (rc == spool_read_notopen && errno == ENOENT && count <= 0) continue;
+ message_subdir[0] = qf->dir_uschar;
+ rc = spool_read_header(qf->text, FALSE, count <= 0);
+ if (rc == spool_read_notopen && errno == ENOENT && count <= 0)
+ continue;
save_errno = errno;
env_read = (rc == spool_read_OK || rc == spool_read_hdrerror);
if (env_read)
{
- int ptr;
+ int i, ptr;
FILE *jread;
struct stat statbuf;
- uschar * fname = spool_fname(US"input", message_subdir, f->text, US"");
+ uschar * fname = spool_fname(US"input", message_subdir, qf->text, US"");
ptr = Ustrlen(fname)-1;
fname[ptr] = 'D';
if (Ustat(fname, &statbuf) == 0)
size = message_size + statbuf.st_size - SPOOL_DATA_START_OFFSET + 1;
- i = (now - received_time)/60; /* minutes on queue */
+ i = (now - received_time.tv_sec)/60; /* minutes on queue */
if (i > 90)
{
i = (i + 30)/60;
/* Collect delivered addresses from any J file */
fname[ptr] = 'J';
- jread = Ufopen(fname, "rb");
- if (jread != NULL)
+ if ((jread = Ufopen(fname, "rb")))
{
while (Ufgets(big_buffer, big_buffer_size, jread) != NULL)
{
}
fprintf(stdout, "%s ", string_format_size(size, big_buffer));
- for (i = 0; i < 16; i++) fputc(f->text[i], stdout);
+ for (int i = 0; i < 16; i++) fputc(qf->text[i], stdout);
- if (env_read && sender_address != NULL)
+ if (env_read && sender_address)
{
printf(" <%s>", sender_address);
- if (sender_set_untrusted) printf(" (%s)", originator_login);
+ if (f.sender_set_untrusted) printf(" (%s)", originator_login);
}
if (rc != spool_read_OK)
if (save_errno == ERRNO_SPOOLFORMAT)
{
struct stat statbuf;
- uschar * fname = spool_fname(US"input", message_subdir, f->text, US"");
+ uschar * fname = spool_fname(US"input", message_subdir, qf->text, US"");
if (Ustat(fname, &statbuf) == 0)
printf("*** spool format error: size=" OFF_T_FMT " ***",
}
}
- if (deliver_freeze) printf(" *** frozen ***");
+ if (f.deliver_freeze) printf(" *** frozen ***");
printf("\n");
- if (recipients_list != NULL)
+ if (recipients_list)
{
- for (i = 0; i < recipients_count; i++)
+ for (int i = 0; i < recipients_count; i++)
{
tree_node *delivered =
tree_search(tree_nonrecipients, recipients_list[i].address);
if (!delivered || option != 1)
- printf(" %s %s\n", (delivered != NULL)? "D":" ",
- recipients_list[i].address);
- if (delivered != NULL) delivered->data.val = TRUE;
+ printf(" %s %s\n",
+ delivered ? "D" : " ", recipients_list[i].address);
+ if (delivered) delivered->data.val = TRUE;
}
- if (option == 2 && tree_nonrecipients != NULL)
+ if (option == 2 && tree_nonrecipients)
queue_list_extras(tree_nonrecipients);
printf("\n");
}
BOOL
queue_action(uschar *id, int action, uschar **argv, int argc, int recipients_arg)
{
-int i, j;
BOOL yield = TRUE;
BOOL removed = FALSE;
struct passwd *pw;
if (action >= MSG_SHOW_BODY)
{
- int fd, i, rc;
+ int fd, rc;
uschar *subdirectory, *suffix;
- if (!admin_user)
+ if (!f.admin_user)
{
printf("Permission denied\n");
return FALSE;
suffix = US"";
}
- for (i = 0; i < 2; i++)
+ for (int i = 0; i < 2; i++)
{
- message_subdir[0] = split_spool_directory == (i == 0) ? id[5] : 0;
+ set_subdir_str(message_subdir, id, i);
if ((fd = Uopen(spool_fname(subdirectory, message_subdir, id, suffix),
O_RDONLY, 0)) >= 0)
break;
{
yield = FALSE;
printf("Spool data file for %s does not exist\n", id);
- if (action != MSG_REMOVE || !admin_user) return FALSE;
+ if (action != MSG_REMOVE || !f.admin_user) return FALSE;
printf("Continuing, to ensure all files removed\n");
}
else
printf("Spool read error for %s: %s\n", spoolname, strerror(errno));
else
printf("Spool format error for %s\n", spoolname);
- if (action != MSG_REMOVE || !admin_user)
+ if (action != MSG_REMOVE || !f.admin_user)
{
(void)close(deliver_datafile);
deliver_datafile = -1;
mess about, but the original sender is permitted to remove a message. That's
why we leave this check until after the headers are read. */
-if (!admin_user && (action != MSG_REMOVE || real_uid != originator_uid))
+if (!f.admin_user && (action != MSG_REMOVE || real_uid != originator_uid))
{
printf("Permission denied\n");
(void)close(deliver_datafile);
switch(action)
{
case MSG_SHOW_COPY:
- deliver_in_buffer = store_malloc(DELIVER_IN_BUFFER_SIZE);
- deliver_out_buffer = store_malloc(DELIVER_OUT_BUFFER_SIZE);
- transport_write_message(1, NULL, 0);
- break;
+ {
+ transport_ctx tctx = {{0}};
+ deliver_in_buffer = store_malloc(DELIVER_IN_BUFFER_SIZE);
+ deliver_out_buffer = store_malloc(DELIVER_OUT_BUFFER_SIZE);
+ tctx.u.fd = 1;
+ (void) transport_write_message(&tctx, 0);
+ break;
+ }
case MSG_FREEZE:
- if (deliver_freeze)
+ if (f.deliver_freeze)
{
yield = FALSE;
printf("is already frozen\n");
}
else
{
- deliver_freeze = TRUE;
- deliver_manual_thaw = FALSE;
+ f.deliver_freeze = TRUE;
+ f.deliver_manual_thaw = FALSE;
deliver_frozen_at = time(NULL);
if (spool_write_header(id, SW_MODIFYING, &errmsg) >= 0)
{
case MSG_THAW:
- if (!deliver_freeze)
+ if (!f.deliver_freeze)
{
yield = FALSE;
printf("is not frozen\n");
}
else
{
- deliver_freeze = FALSE;
- deliver_manual_thaw = TRUE;
+ f.deliver_freeze = FALSE;
+ f.deliver_manual_thaw = TRUE;
if (spool_write_header(id, SW_MODIFYING, &errmsg) >= 0)
{
printf("is no longer frozen\n");
suffix[2] = 0;
message_subdir[0] = id[5];
- for (j = 0; j < 2; message_subdir[0] = 0, j++)
+ for (int j = 0; j < 2; message_subdir[0] = 0, j++)
{
uschar * fname = spool_fname(US"msglog", message_subdir, id, US"");
DEBUG(D_any) debug_printf(" (ok)\n");
}
- for (i = 0; i < 3; i++)
+ for (int i = 0; i < 3; i++)
{
uschar * fname;
else printf("has been removed or did not exist\n");
if (removed)
{
+#ifndef DISABLE_EVENT
+ if (event_action) for (int i = 0; i < recipients_count; i++)
+ {
+ tree_node *delivered =
+ tree_search(tree_nonrecipients, recipients_list[i].address);
+ if (!delivered)
+ {
+ uschar * save_local = deliver_localpart;
+ const uschar * save_domain = deliver_domain;
+ uschar * addr = recipients_list[i].address, * errmsg = NULL;
+ int start, end, dom;
+
+ if (!parse_extract_address(addr, &errmsg, &start, &end, &dom, TRUE))
+ log_write(0, LOG_MAIN|LOG_PANIC,
+ "failed to parse address '%.100s'\n: %s", addr, errmsg);
+ else
+ {
+ deliver_localpart =
+ string_copyn(addr+start, dom ? (dom-1) - start : end - start);
+ deliver_domain = dom
+ ? CUS string_copyn(addr+dom, end - dom) : CUS"";
+
+ event_raise(event_action, US"msg:fail:internal",
+ string_sprintf("message removed by %s", username));
+
+ deliver_localpart = save_local;
+ deliver_domain = save_domain;
+ }
+ }
+ }
+ (void) event_raise(event_action, US"msg:complete", NULL);
+#endif
log_write(0, LOG_MAIN, "removed by %s", username);
log_write(0, LOG_MAIN, "Completed");
}
}
+ case MSG_SETQUEUE:
+ /* The global "queue_name_dest" is used as destination, "queue_name"
+ as source */
+
+ spool_move_message(id, message_subdir, US"", US"");
+ break;
+
+
case MSG_MARK_ALL_DELIVERED:
- for (i = 0; i < recipients_count; i++)
- {
+ for (int i = 0; i < recipients_count; i++)
tree_add_nonrecipient(recipients_list[i].address);
- }
+
if (spool_write_header(id, SW_MODIFYING, &errmsg) >= 0)
{
printf("has been modified\n");
- for (i = 0; i < recipients_count; i++)
+ for (int i = 0; i < recipients_count; i++)
log_write(0, LOG_MAIN, "address <%s> marked delivered by %s",
recipients_list[i].address, username);
}
parse_extract_address(argv[recipients_arg], &errmess, &start, &end,
&domain, (action == MSG_EDIT_SENDER));
- if (recipient == NULL)
+ if (!recipient)
{
yield = FALSE;
printf("- error while %s:\n bad address %s: %s\n",
doing, argv[recipients_arg], errmess);
}
- else if (recipient[0] != 0 && domain == 0)
+ else if (*recipient && domain == 0)
{
yield = FALSE;
printf("- error while %s:\n bad address %s: "
}
else if (action == MSG_MARK_DELIVERED)
{
+ int i;
for (i = 0; i < recipients_count; i++)
if (Ustrcmp(recipients_list[i].address, recipient) == 0) break;
if (i >= recipients_count)
void
queue_check_only(void)
{
-BOOL *set;
int sep = 0;
struct stat statbuf;
-const uschar *s;
-uschar *ss, *name;
-uschar buffer[1024];
+const uschar * s = queue_only_file;
+uschar * ss;
-if (queue_only_file == NULL) return;
+if (s)
+ while ((ss = string_nextinlist(&s, &sep, NULL, 0)))
+ if (Ustrncmp(ss, "smtp", 4) == 0)
+ {
+ ss += 4;
+ if (Ustat(ss, &statbuf) == 0)
+ {
+ f.queue_smtp = TRUE;
+ DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss);
+ }
+ }
+ else
+ if (Ustat(ss, &statbuf) == 0)
+ {
+ queue_only = TRUE;
+ DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss);
+ }
+}
-s = queue_only_file;
-while ((ss = string_nextinlist(&s, &sep, buffer, sizeof(buffer))) != NULL)
+
+
+/******************************************************************************/
+/******************************************************************************/
+
+#ifdef EXPERIMENTAL_QUEUE_RAMP
+void
+queue_notify_daemon(const uschar * msgid)
+{
+uschar buf[MESSAGE_ID_LENGTH + 2];
+int fd;
+
+DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid);
+
+buf[0] = NOTIFY_MSG_QRUN;
+memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1);
+
+if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0)
{
- if (Ustrncmp(ss, "smtp", 4) == 0)
- {
- name = US"queue_smtp";
- set = &queue_smtp;
- ss += 4;
- }
- else
- {
- name = US"queue_only";
- set = &queue_only;
- }
+ struct sockaddr_un sa_un = {.sun_family = AF_UNIX};
+
+#ifdef EXIM_HAVE_ABSTRACT_UNIX_SOCKETS
+ int len = offsetof(struct sockaddr_un, sun_path) + 1
+ + snprintf(sa_un.sun_path+1, sizeof(sa_un.sun_path)-1, "%s",
+ expand_string(notifier_socket));
+ sa_un.sun_path[0] = 0;
+#else
+ int len = offsetof(struct sockaddr_un, sun_path)
+ + snprintf(sa_un.sun_path, sizeof(sa_un.sun_path), "%s",
+ expand_string(notifier_socket));
+#endif
- if (Ustat(ss, &statbuf) == 0)
- {
- *set = TRUE;
- DEBUG(D_receive) debug_printf("%s set because %s exists\n", name, ss);
- }
+ if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sa_un, len) < 0)
+ DEBUG(D_queue_run)
+ debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno));
+ close(fd);
}
+else DEBUG(D_queue_run) debug_printf(" socket: %s\n", strerror(errno));
}
+#endif
#endif /*!COMPILE_UTILITY*/