Two-phase queue run perf: parallel processes for phase one
[users/jgh/exim.git] / src / src / queue.c
index d8c160a3af8527e01e4826289eef05f359ad240c..d472b985134b0fc2168589d3f6ffe6b5e8d3daac 100644 (file)
@@ -346,6 +346,7 @@ const pcre *selectstring_regex_sender = NULL;
 uschar *log_detail = NULL;
 int subcount = 0;
 uschar subdirs[64];
+pid_t qpid[4] = {0};   /* Parallelism factor for q2stage 1st phase */
 
 #ifdef MEASURE_TIMING
 report_time_since(&timestamp_startup, US"queue_run start");
@@ -468,18 +469,40 @@ for (int i = queue_run_in_order ? -1 : 0;
           (double)load_average/1000.0,
           (double)deliver_queue_load_max/1000.0);
 
+    /* If initial of a 2-phase run, maintain a set of child procs
+    to get disk parallelism */
+
+    if (f.queue_2stage && !queue_run_in_order)
+      {
+      int i;
+      if (qpid[nelem(qpid) - 1])
+       {
+       DEBUG(D_queue_run) debug_printf("q2stage waiting for child\n");
+       waitpid(qpid[0], NULL, 0);
+       DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
+       for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
+       qpid[i] = 0;
+       }
+      else
+       for (i = 0; qpid[i]; ) i++;
+      DEBUG(D_queue_run) debug_printf("q2stage forking\n");
+      if ((qpid[i] = fork()))
+       continue;       /* parent loops around */
+      DEBUG(D_queue_run) debug_printf("q2stage child\n");
+      }
+
     /* Skip this message unless it's within the ID limits */
 
     if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0)
-      continue;
+      goto go_around;
     if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0)
-      continue;
+      goto go_around;
 
     /* Check that the message still exists */
 
     message_subdir[0] = fq->dir_uschar;
     if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0)
-      continue;
+      goto go_around;
 
     /* There are some tests that require the reading of the header file. Ensure
     the store used is scavenged afterwards so that this process doesn't keep
@@ -499,7 +522,7 @@ for (int i = queue_run_in_order ? -1 : 0;
       follow. If the message is chosen for delivery, the header is read again
       in the deliver_message() function, in a subprocess. */
 
-      if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue;
+      if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around;
       f.dont_deliver = orig_dont_deliver;
 
       /* Now decide if we want to deliver this message. As we have read the
@@ -571,7 +594,7 @@ for (int i = queue_run_in_order ? -1 : 0;
 
       spool_clear_header_globals();
       store_reset(reset_point2);
-      if (!wanted) continue;      /* With next message */
+      if (!wanted) goto go_around;      /* With next message */
       }
 
     /* OK, got a message we want to deliver. Create a pipe which will
@@ -638,11 +661,11 @@ for (int i = queue_run_in_order ? -1 : 0;
     /* A zero return means a delivery was attempted; turn off the force flag
     for any subsequent calls unless queue_force is set. */
 
-    if ((status & 0xffff) == 0) force_delivery = f.queue_run_force;
+    if (!(status & 0xffff)) force_delivery = f.queue_run_force;
 
     /* If the process crashed, tell somebody */
 
-    else if ((status & 0x00ff) != 0)
+    else if (status & 0x00ff)
       log_write(0, LOG_MAIN|LOG_PANIC,
         "queue run: process %d crashed with signal %d while delivering %s",
         (int)pid, status & 0x00ff, fq->text);
@@ -654,19 +677,32 @@ for (int i = queue_run_in_order ? -1 : 0;
 
     set_process_info("running queue: waiting for children of %d", pid);
     if ((status = read(pfd[pipe_read], buffer, sizeof(buffer))) != 0)
-      log_write(0, LOG_MAIN|LOG_PANIC, "queue run: %s on pipe",
-               status > 0 ? "unexpected data" : "error");
+      log_write(0, LOG_MAIN|LOG_PANIC, status > 0 ?
+       "queue run: unexpected data on pipe" : "queue run: error on pipe: %s",
+       strerror(errno));
     (void)close(pfd[pipe_read]);
     set_process_info("running queue");
 
+    /* If initial of a 2-phase run, we are a child - so just exit */
+    if (f.queue_2stage && !queue_run_in_order)
+      exim_exit(EXIT_SUCCESS, US"2-phase child");
+
     /* If we are in the test harness, and this is not the first of a 2-stage
     queue run, update fudged queue times. */
 
     if (f.running_in_test_harness && !f.queue_2stage)
       {
-      uschar *fqtnext = Ustrchr(fudged_queue_times, '/');
-      if (fqtnext != NULL) fudged_queue_times = fqtnext + 1;
+      uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
+      if (fqtnext) fudged_queue_times = fqtnext + 1;
       }
+
+
+    continue;
+
+  go_around:
+    /* If initial of a 2-phase run, we are a child - so just exit */
+    if (f.queue_2stage && !queue_run_in_order)
+      exim_exit(EXIT_SUCCESS, US"2-phase child");
     }                                  /* End loop for list of messages */
 
   tree_nonrecipients = NULL;
@@ -694,6 +730,19 @@ turned off. */
 
 if (f.queue_2stage)
   {
+
+  /* wait for last children */
+  for (int i = 0; i < nelem(qpid); i++)
+    if (qpid[i])
+      {
+      DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]);
+      waitpid(qpid[i], NULL, 0);
+      }
+    else break;
+
+#ifdef MEASURE_TIMING
+  report_time_since(&timestamp_startup, US"queue_run 1st phase done");
+#endif
   f.queue_2stage = FALSE;
   queue_run(start_id, stop_id, TRUE);
   }
@@ -1265,6 +1314,14 @@ switch(action)
     }
 
 
+  case MSG_SETQUEUE:
+    /* The global "queue_name_dest" is used as destination, "queue_name"
+    as source */
+
+    spool_move_message(id, message_subdir, US"", US"");
+    break;
+
+
   case MSG_MARK_ALL_DELIVERED:
   for (int i = 0; i < recipients_count; i++)
     tree_add_nonrecipient(recipients_list[i].address);
@@ -1412,33 +1469,26 @@ queue_check_only(void)
 {
 int sep = 0;
 struct stat statbuf;
-const uschar *s;
-uschar *ss;
-uschar buffer[1024];
+const uschar * s = queue_only_file;
+uschar * ss;
 
-if (queue_only_file == NULL) return;
-
-s = queue_only_file;
-while ((ss = string_nextinlist(&s, &sep, buffer, sizeof(buffer))) != NULL)
-  {
-  if (Ustrncmp(ss, "smtp", 4) == 0)
-    {
-    ss += 4;
-    if (Ustat(ss, &statbuf) == 0)
-      {
-      f.queue_smtp = TRUE;
-      DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss);
-      }
-    }
-  else
-    {
-    if (Ustat(ss, &statbuf) == 0)
+if (s)
+  while ((ss = string_nextinlist(&s, &sep, NULL, 0)))
+    if (Ustrncmp(ss, "smtp", 4) == 0)
       {
-      queue_only = TRUE;
-      DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss);
+      ss += 4;
+      if (Ustat(ss, &statbuf) == 0)
+       {
+       f.queue_smtp = TRUE;
+       DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss);
+       }
       }
-    }
-  }
+    else
+      if (Ustat(ss, &statbuf) == 0)
+       {
+       queue_only = TRUE;
+       DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss);
+       }
 }
 
 #endif /*!COMPILE_UTILITY*/