uschar *log_detail = NULL;
int subcount = 0;
uschar subdirs[64];
+pid_t qpid[4] = {0}; /* Parallelism factor for q2stage 1st phase */
#ifdef MEASURE_TIMING
report_time_since(×tamp_startup, US"queue_run start");
(double)load_average/1000.0,
(double)deliver_queue_load_max/1000.0);
+ /* If initial of a 2-phase run, maintain a set of child procs
+ to get disk parallelism */
+
+ if (f.queue_2stage && !queue_run_in_order)
+ {
+ int i;
+ if (qpid[nelem(qpid) - 1])
+ {
+ DEBUG(D_queue_run) debug_printf("q2stage waiting for child\n");
+ waitpid(qpid[0], NULL, 0);
+ DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
+ for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
+ qpid[i] = 0;
+ }
+ else
+ for (i = 0; qpid[i]; ) i++;
+ DEBUG(D_queue_run) debug_printf("q2stage forking\n");
+ if ((qpid[i] = fork()))
+ continue; /* parent loops around */
+ DEBUG(D_queue_run) debug_printf("q2stage child\n");
+ }
+
/* Skip this message unless it's within the ID limits */
if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0)
- continue;
+ goto go_around;
if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0)
- continue;
+ goto go_around;
/* Check that the message still exists */
message_subdir[0] = fq->dir_uschar;
if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0)
- continue;
+ goto go_around;
/* There are some tests that require the reading of the header file. Ensure
the store used is scavenged afterwards so that this process doesn't keep
follow. If the message is chosen for delivery, the header is read again
in the deliver_message() function, in a subprocess. */
- if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue;
+ if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around;
f.dont_deliver = orig_dont_deliver;
/* Now decide if we want to deliver this message. As we have read the
spool_clear_header_globals();
store_reset(reset_point2);
- if (!wanted) continue; /* With next message */
+ if (!wanted) goto go_around; /* With next message */
}
/* OK, got a message we want to deliver. Create a pipe which will
(void)close(pfd[pipe_read]);
set_process_info("running queue");
+ /* If initial of a 2-phase run, we are a child - so just exit */
+ if (f.queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS, US"2-phase child");
+
/* If we are in the test harness, and this is not the first of a 2-stage
queue run, update fudged queue times. */
uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
if (fqtnext) fudged_queue_times = fqtnext + 1;
}
+
+
+ continue;
+
+ go_around:
+ /* If initial of a 2-phase run, we are a child - so just exit */
+ if (f.queue_2stage && !queue_run_in_order)
+ exim_exit(EXIT_SUCCESS, US"2-phase child");
} /* End loop for list of messages */
tree_nonrecipients = NULL;
if (f.queue_2stage)
{
+
+ /* wait for last children */
+ for (int i = 0; i < nelem(qpid); i++)
+ if (qpid[i])
+ {
+ DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]);
+ waitpid(qpid[i], NULL, 0);
+ }
+ else break;
+
+#ifdef MEASURE_TIMING
+ report_time_since(×tamp_startup, US"queue_run 1st phase done");
+#endif
f.queue_2stage = FALSE;
queue_run(start_id, stop_id, TRUE);
}