Two-phase queue run perf: parallel processes for phase one
[users/jgh/exim.git] / src / src / queue.c
index f808ed1036875a5d095e771db7276c77a3d6c522..d472b985134b0fc2168589d3f6ffe6b5e8d3daac 100644 (file)
@@ -346,6 +346,7 @@ const pcre *selectstring_regex_sender = NULL;
 uschar *log_detail = NULL;
 int subcount = 0;
 uschar subdirs[64];
+pid_t qpid[4] = {0};   /* Parallelism factor for q2stage 1st phase */
 
 #ifdef MEASURE_TIMING
 report_time_since(&timestamp_startup, US"queue_run start");
@@ -468,18 +469,40 @@ for (int i = queue_run_in_order ? -1 : 0;
           (double)load_average/1000.0,
           (double)deliver_queue_load_max/1000.0);
 
+    /* If initial of a 2-phase run, maintain a set of child procs
+    to get disk parallelism */
+
+    if (f.queue_2stage && !queue_run_in_order)
+      {
+      int i;
+      if (qpid[nelem(qpid) - 1])
+       {
+       DEBUG(D_queue_run) debug_printf("q2stage waiting for child\n");
+       waitpid(qpid[0], NULL, 0);
+       DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
+       for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
+       qpid[i] = 0;
+       }
+      else
+       for (i = 0; qpid[i]; ) i++;
+      DEBUG(D_queue_run) debug_printf("q2stage forking\n");
+      if ((qpid[i] = fork()))
+       continue;       /* parent loops around */
+      DEBUG(D_queue_run) debug_printf("q2stage child\n");
+      }
+
     /* Skip this message unless it's within the ID limits */
 
     if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0)
-      continue;
+      goto go_around;
     if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0)
-      continue;
+      goto go_around;
 
     /* Check that the message still exists */
 
     message_subdir[0] = fq->dir_uschar;
     if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0)
-      continue;
+      goto go_around;
 
     /* There are some tests that require the reading of the header file. Ensure
     the store used is scavenged afterwards so that this process doesn't keep
@@ -499,7 +522,7 @@ for (int i = queue_run_in_order ? -1 : 0;
       follow. If the message is chosen for delivery, the header is read again
       in the deliver_message() function, in a subprocess. */
 
-      if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue;
+      if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around;
       f.dont_deliver = orig_dont_deliver;
 
       /* Now decide if we want to deliver this message. As we have read the
@@ -571,7 +594,7 @@ for (int i = queue_run_in_order ? -1 : 0;
 
       spool_clear_header_globals();
       store_reset(reset_point2);
-      if (!wanted) continue;      /* With next message */
+      if (!wanted) goto go_around;      /* With next message */
       }
 
     /* OK, got a message we want to deliver. Create a pipe which will
@@ -660,6 +683,10 @@ for (int i = queue_run_in_order ? -1 : 0;
     (void)close(pfd[pipe_read]);
     set_process_info("running queue");
 
+    /* If initial of a 2-phase run, we are a child - so just exit */
+    if (f.queue_2stage && !queue_run_in_order)
+      exim_exit(EXIT_SUCCESS, US"2-phase child");
+
     /* If we are in the test harness, and this is not the first of a 2-stage
     queue run, update fudged queue times. */
 
@@ -668,6 +695,14 @@ for (int i = queue_run_in_order ? -1 : 0;
       uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
       if (fqtnext) fudged_queue_times = fqtnext + 1;
       }
+
+
+    continue;
+
+  go_around:
+    /* If initial of a 2-phase run, we are a child - so just exit */
+    if (f.queue_2stage && !queue_run_in_order)
+      exim_exit(EXIT_SUCCESS, US"2-phase child");
     }                                  /* End loop for list of messages */
 
   tree_nonrecipients = NULL;
@@ -695,6 +730,19 @@ turned off. */
 
 if (f.queue_2stage)
   {
+
+  /* wait for last children */
+  for (int i = 0; i < nelem(qpid); i++)
+    if (qpid[i])
+      {
+      DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]);
+      waitpid(qpid[i], NULL, 0);
+      }
+    else break;
+
+#ifdef MEASURE_TIMING
+  report_time_since(&timestamp_startup, US"queue_run 1st phase done");
+#endif
   f.queue_2stage = FALSE;
   queue_run(start_id, stop_id, TRUE);
   }