uschar *log_detail = NULL;
int subcount = 0;
uschar subdirs[64];
+pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */
#ifdef MEASURE_TIMING
report_time_since(×tamp_startup, US"queue_run start");
(double)load_average/1000.0,
(double)deliver_queue_load_max/1000.0);
+ /* If initial of a 2-phase run, maintain a set of child procs
+ to get disk parallelism */
+
+ if (f.queue_2stage && !queue_run_in_order)
+ {
+ int i;
+ if (qpid[nelem(qpid) - 1])
+ {
+ DEBUG(D_queue_run) debug_printf("q2stage waiting for child\n");
+ waitpid(qpid[0], NULL, 0);
+ DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
+ for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
+ qpid[i] = 0;
+ }
+ else
+ for (i = 0; qpid[i]; ) i++;
+ DEBUG(D_queue_run) debug_printf("q2stage forking\n");
+ if ((qpid[i] = fork()))
+ continue; /* parent loops around */
+ DEBUG(D_queue_run) debug_printf("q2stage child\n");
+ }
+
/* Skip this message unless it's within the ID limits */
if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0)
- continue;
+ goto go_around;
if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0)
- continue;
+ goto go_around;
/* Check that the message still exists */
message_subdir[0] = fq->dir_uschar;
if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0)
- continue;
+ goto go_around;
/* There are some tests that require the reading of the header file. Ensure
the store used is scavenged afterwards so that this process doesn't keep
follow. If the message is chosen for delivery, the header is read again
in the deliver_message() function, in a subprocess. */
- if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue;
+ if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around;
f.dont_deliver = orig_dont_deliver;
/* Now decide if we want to deliver this message. As we have read the
spool_clear_header_globals();
store_reset(reset_point2);
- if (!wanted) continue; /* With next message */
+ if (!wanted) goto go_around; /* With next message */
}
/* OK, got a message we want to deliver. Create a pipe which will
/* A zero return means a delivery was attempted; turn off the force flag
for any subsequent calls unless queue_force is set. */
- if ((status & 0xffff) == 0) force_delivery = f.queue_run_force;
+ if (!(status & 0xffff)) force_delivery = f.queue_run_force;
/* If the process crashed, tell somebody */
- else if ((status & 0x00ff) != 0)
+ else if (status & 0x00ff)
log_write(0, LOG_MAIN|LOG_PANIC,
"queue run: process %d crashed with signal %d while delivering %s",
(int)pid, status & 0x00ff, fq->text);
set_process_info("running queue: waiting for children of %d", pid);
if ((status = read(pfd[pipe_read], buffer, sizeof(buffer))) != 0)
- log_write(0, LOG_MAIN|LOG_PANIC, "queue run: %s on pipe",
- status > 0 ? "unexpected data" : "error");
+ log_write(0, LOG_MAIN|LOG_PANIC, status > 0 ?
+ "queue run: unexpected data on pipe" : "queue run: error on pipe: %s",
+ strerror(errno));
(void)close(pfd[pipe_read]);
set_process_info("running queue");
+ /* If initial of a 2-phase run, we are a child - so just exit */
+ if (f.queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS, US"2-phase child");
+
/* If we are in the test harness, and this is not the first of a 2-stage
queue run, update fudged queue times. */
uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
if (fqtnext) fudged_queue_times = fqtnext + 1;
}
+
+
+ continue;
+
+ go_around:
+ /* If initial of a 2-phase run, we are a child - so just exit */
+ if (f.queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS, US"2-phase child");
} /* End loop for list of messages */
tree_nonrecipients = NULL;
if (f.queue_2stage)
{
+
+ /* wait for last children */
+ for (int i = 0; i < nelem(qpid); i++)
+ if (qpid[i])
+ {
+ DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]);
+ waitpid(qpid[i], NULL, 0);
+ }
+ else break;
+
+#ifdef MEASURE_TIMING
+ report_time_since(×tamp_startup, US"queue_run 1st phase done");
+#endif
f.queue_2stage = FALSE;
queue_run(start_id, stop_id, TRUE);
}
}
+ case MSG_SETQUEUE:
+ /* The global "queue_name_dest" is used as destination, "queue_name"
+ as source */
+
+ spool_move_message(id, message_subdir, US"", US"");
+ break;
+
+
case MSG_MARK_ALL_DELIVERED:
for (int i = 0; i < recipients_count; i++)
tree_add_nonrecipient(recipients_list[i].address);
{
int sep = 0;
struct stat statbuf;
-const uschar *s;
-uschar *ss;
-uschar buffer[1024];
+const uschar * s = queue_only_file;
+uschar * ss;
-if (queue_only_file == NULL) return;
-
-s = queue_only_file;
-while ((ss = string_nextinlist(&s, &sep, buffer, sizeof(buffer))) != NULL)
- {
- if (Ustrncmp(ss, "smtp", 4) == 0)
- {
- ss += 4;
- if (Ustat(ss, &statbuf) == 0)
- {
- f.queue_smtp = TRUE;
- DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss);
- }
- }
- else
- {
- if (Ustat(ss, &statbuf) == 0)
+if (s)
+ while ((ss = string_nextinlist(&s, &sep, NULL, 0)))
+ if (Ustrncmp(ss, "smtp", 4) == 0)
{
- queue_only = TRUE;
- DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss);
+ ss += 4;
+ if (Ustat(ss, &statbuf) == 0)
+ {
+ f.queue_smtp = TRUE;
+ DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss);
+ }
}
- }
- }
+ else
+ if (Ustat(ss, &statbuf) == 0)
+ {
+ queue_only = TRUE;
+ DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss);
+ }
}
#endif /*!COMPILE_UTILITY*/