-/* Routines with knowledge of spool layout */
-
-#ifndef COMPILE_UTILITY
-static void
-spool_pname_buf(uschar * buf, int len)
-{
-snprintf(CS buf, len, "%s/%s/input", spool_directory, queue_name);
-}
-
-uschar *
-spool_dname(const uschar * purpose, uschar * subdir)
-{
-return string_sprintf("%s/%s/%s/%s",
- spool_directory, queue_name, purpose, subdir);
-}
-#endif
-
-uschar *
-spool_sname(const uschar * purpose, uschar * subdir)
-{
-return string_sprintf("%s%s%s%s%s",
- queue_name, *queue_name ? "/" : "",
- purpose,
- *subdir ? "/" : "", subdir);
-}
-
-uschar *
-spool_fname(const uschar * purpose, const uschar * subdir, const uschar * fname,
- const uschar * suffix)
-{
-return string_sprintf("%s/%s/%s/%s/%s%s",
- spool_directory, queue_name, purpose, subdir, fname, suffix);
-}
Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0)
{
queue_filename *next =
- store_get(sizeof(queue_filename) + Ustrlen(name));
+ store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name));
Ustrcpy(next->text, name);
next->dir_uschar = subdirchar;
uschar *log_detail = NULL;
int subcount = 0;
uschar subdirs[64];
+pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */
+
+#ifdef MEASURE_TIMING
+report_time_since(×tamp_startup, US"queue_run start");
+#endif
/* Cancel any specific queue domains. Turn off the flag that causes SMTP
deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag
i <= (queue_run_in_order ? -1 : subcount);
i++)
{
- void *reset_point1 = store_get(0);
+ rmark reset_point1 = store_mark();
DEBUG(D_queue_run)
{
(double)load_average/1000.0,
(double)deliver_queue_load_max/1000.0);
+ /* If initial of a 2-phase run, maintain a set of child procs
+ to get disk parallelism */
+
+ if (f.queue_2stage && !queue_run_in_order)
+ {
+ int i;
+ if (qpid[nelem(qpid) - 1])
+ {
+ DEBUG(D_queue_run) debug_printf("q2stage waiting for child\n");
+ waitpid(qpid[0], NULL, 0);
+ DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
+ for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
+ qpid[i] = 0;
+ }
+ else
+ for (i = 0; qpid[i]; ) i++;
+ DEBUG(D_queue_run) debug_printf("q2stage forking\n");
+ if ((qpid[i] = fork()))
+ continue; /* parent loops around */
+ DEBUG(D_queue_run) debug_printf("q2stage child\n");
+ }
+
/* Skip this message unless it's within the ID limits */
if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0)
- continue;
+ goto go_around;
if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0)
- continue;
+ goto go_around;
/* Check that the message still exists */
message_subdir[0] = fq->dir_uschar;
if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0)
- continue;
+ goto go_around;
/* There are some tests that require the reading of the header file. Ensure
the store used is scavenged afterwards so that this process doesn't keep
{
BOOL wanted = TRUE;
BOOL orig_dont_deliver = f.dont_deliver;
- void *reset_point2 = store_get(0);
+ rmark reset_point2 = store_mark();
/* Restore the original setting of dont_deliver after reading the header,
so that a setting for a particular message doesn't force it for any that
follow. If the message is chosen for delivery, the header is read again
in the deliver_message() function, in a subprocess. */
- if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue;
+ if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around;
f.dont_deliver = orig_dont_deliver;
/* Now decide if we want to deliver this message. As we have read the
spool_clear_header_globals();
store_reset(reset_point2);
- if (!wanted) continue; /* With next message */
+ if (!wanted) goto go_around; /* With next message */
}
/* OK, got a message we want to deliver. Create a pipe which will
set_process_info("running queue: %s", fq->text);
fq->text[SPOOL_NAME_LENGTH-2] = 0;
+#ifdef MEASURE_TIMING
+ report_time_since(×tamp_startup, US"queue msg selected");
+#endif
+
if ((pid = fork()) == 0)
{
int rc;
- if (f.running_in_test_harness) millisleep(100);
+ testharness_pause_ms(100);
(void)close(pfd[pipe_read]);
rc = deliver_message(fq->text, force_delivery, FALSE);
- _exit(rc == DELIVER_NOT_ATTEMPTED);
+ exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED);
}
if (pid < 0)
log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from "
/* A zero return means a delivery was attempted; turn off the force flag
for any subsequent calls unless queue_force is set. */
- if ((status & 0xffff) == 0) force_delivery = f.queue_run_force;
+ if (!(status & 0xffff)) force_delivery = f.queue_run_force;
/* If the process crashed, tell somebody */
- else if ((status & 0x00ff) != 0)
+ else if (status & 0x00ff)
log_write(0, LOG_MAIN|LOG_PANIC,
"queue run: process %d crashed with signal %d while delivering %s",
(int)pid, status & 0x00ff, fq->text);
set_process_info("running queue: waiting for children of %d", pid);
if ((status = read(pfd[pipe_read], buffer, sizeof(buffer))) != 0)
- log_write(0, LOG_MAIN|LOG_PANIC, "queue run: %s on pipe",
- status > 0 ? "unexpected data" : "error");
+ log_write(0, LOG_MAIN|LOG_PANIC, status > 0 ?
+ "queue run: unexpected data on pipe" : "queue run: error on pipe: %s",
+ strerror(errno));
(void)close(pfd[pipe_read]);
set_process_info("running queue");
+ /* If initial of a 2-phase run, we are a child - so just exit */
+ if (f.queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS, US"2-phase child");
+
/* If we are in the test harness, and this is not the first of a 2-stage
queue run, update fudged queue times. */
if (f.running_in_test_harness && !f.queue_2stage)
{
- uschar *fqtnext = Ustrchr(fudged_queue_times, '/');
- if (fqtnext != NULL) fudged_queue_times = fqtnext + 1;
+ uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
+ if (fqtnext) fudged_queue_times = fqtnext + 1;
}
+
+
+ continue;
+
+ go_around:
+ /* If initial of a 2-phase run, we are a child - so just exit */
+ if (f.queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS, US"2-phase child");
} /* End loop for list of messages */
tree_nonrecipients = NULL;
if (f.queue_2stage)
{
+
+ /* wait for last children */
+ for (int i = 0; i < nelem(qpid); i++)
+ if (qpid[i])
+ {
+ DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]);
+ waitpid(qpid[i], NULL, 0);
+ }
+ else break;
+
+#ifdef MEASURE_TIMING
+ report_time_since(×tamp_startup, US"queue_run 1st phase done");
+#endif
f.queue_2stage = FALSE;
queue_run(start_id, stop_id, TRUE);
}
Returns: nothing
*/
-static void queue_list_extras(tree_node *p)
+static void
+queue_list_extras(tree_node *p)
{
-if (p->left != NULL) queue_list_extras(p->left);
+if (p->left) queue_list_extras(p->left);
if (!p->data.val) printf(" +D %s\n", p->name);
-if (p->right != NULL) queue_list_extras(p->right);
+if (p->right) queue_list_extras(p->right);
}
{
int subcount;
int now = (int)time(NULL);
-void *reset_point;
+rmark reset_point;
queue_filename * qf = NULL;
uschar subdirs[64];
for (int i = 0; i < count; i++)
{
queue_filename *next =
- store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2);
+ store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2, is_tainted(list[i]));
sprintf(CS next->text, "%s-H", list[i]);
next->dir_uschar = '*';
next->next = NULL;
/* Now scan the chain and print information, resetting store used
each time. */
-for (reset_point = store_get(0);
- qf;
+for (;
+ qf && (reset_point = store_mark());
spool_clear_header_globals(), store_reset(reset_point), qf = qf->next
)
{
for (int i = 0; i < 2; i++)
{
- message_subdir[0] = split_spool_directory == (i == 0) ? id[5] : 0;
+ set_subdir_str(message_subdir, id, i);
if ((fd = Uopen(spool_fname(subdirectory, message_subdir, id, suffix),
O_RDONLY, 0)) >= 0)
break;
if (removed)
{
#ifndef DISABLE_EVENT
- for (int i = 0; i < recipients_count; i++)
+ if (event_action) for (int i = 0; i < recipients_count; i++)
{
tree_node *delivered =
tree_search(tree_nonrecipients, recipients_list[i].address);
}
+ case MSG_SETQUEUE:
+ /* The global "queue_name_dest" is used as destination, "queue_name"
+ as source */
+
+ spool_move_message(id, message_subdir, US"", US"");
+ break;
+
+
case MSG_MARK_ALL_DELIVERED:
for (int i = 0; i < recipients_count; i++)
tree_add_nonrecipient(recipients_list[i].address);
{
int sep = 0;
struct stat statbuf;
-const uschar *s;
-uschar *ss;
-uschar buffer[1024];
-
-if (queue_only_file == NULL) return;
+const uschar * s = queue_only_file;
+uschar * ss;
-s = queue_only_file;
-while ((ss = string_nextinlist(&s, &sep, buffer, sizeof(buffer))) != NULL)
- {
- if (Ustrncmp(ss, "smtp", 4) == 0)
- {
- ss += 4;
- if (Ustat(ss, &statbuf) == 0)
- {
- f.queue_smtp = TRUE;
- DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss);
- }
- }
- else
- {
- if (Ustat(ss, &statbuf) == 0)
+if (s)
+ while ((ss = string_nextinlist(&s, &sep, NULL, 0)))
+ if (Ustrncmp(ss, "smtp", 4) == 0)
{
- queue_only = TRUE;
- DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss);
+ ss += 4;
+ if (Ustat(ss, &statbuf) == 0)
+ {
+ f.queue_smtp = TRUE;
+ DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss);
+ }
}
- }
- }
+ else
+ if (Ustat(ss, &statbuf) == 0)
+ {
+ queue_only = TRUE;
+ DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss);
+ }
}
#endif /*!COMPILE_UTILITY*/