Fix small typo
[exim.git] / src / src / queue.c
index 9f9e409ed964f0ac915181287778e39d423fc324..567784575de2a3d7e292cf0ef1474e29472a2f3a 100644 (file)
@@ -3,6 +3,7 @@
 *************************************************/
 
 /* Copyright (c) University of Cambridge 1995 - 2018 */
 *************************************************/
 
 /* Copyright (c) University of Cambridge 1995 - 2018 */
+/* Copyright (c) The Exim Maintainers 2020 */
 /* See the file NOTICE for conditions of use and distribution. */
 
 /* Functions that operate on the input queue. */
 /* See the file NOTICE for conditions of use and distribution. */
 
 /* Functions that operate on the input queue. */
 
 
 
 
 
 
-/* Routines with knowledge of spool layout */
-
-#ifndef COMPILE_UTILITY
-static void
-spool_pname_buf(uschar * buf, int len)
-{
-snprintf(CS buf, len, "%s/%s/input", spool_directory, queue_name);
-}
-
-uschar *
-spool_dname(const uschar * purpose, uschar * subdir)
-{
-return string_sprintf("%s/%s/%s/%s",
-       spool_directory, queue_name, purpose, subdir);
-}
-#endif
-
-uschar *
-spool_sname(const uschar * purpose, uschar * subdir)
-{
-return string_sprintf("%s%s%s%s%s",
-                   queue_name, *queue_name ? "/" : "",
-                   purpose,
-                   *subdir ? "/" : "", subdir);
-}
-
-uschar *
-spool_fname(const uschar * purpose, const uschar * subdir, const uschar * fname,
-               const uschar * suffix)
-{
-return string_sprintf("%s/%s/%s/%s/%s%s",
-       spool_directory, queue_name, purpose, subdir, fname, suffix);
-}
 
 
 
 
 
 
@@ -58,6 +26,9 @@ Michael Haardt. */
 #define LOG2_MAXNODES 32
 
 
 #define LOG2_MAXNODES 32
 
 
+#ifndef DISABLE_TLS
+static BOOL queue_tls_init = FALSE;
+#endif
 
 /*************************************************
 *  Helper sort function for queue_get_spool_list *
 
 /*************************************************
 *  Helper sort function for queue_get_spool_list *
@@ -143,13 +114,14 @@ Arguments:
   subdirs        vector to store list of subdirchars
   subcount       pointer to int in which to store count of subdirs
   randomize      TRUE if the order of the list is to be unpredictable
   subdirs        vector to store list of subdirchars
   subcount       pointer to int in which to store count of subdirs
   randomize      TRUE if the order of the list is to be unpredictable
+  pcount        If not NULL, fill in with count of files and do not return list
 
 Returns:         pointer to a chain of queue name items
 */
 
 static queue_filename *
 queue_get_spool_list(int subdiroffset, uschar *subdirs, int *subcount,
 
 Returns:         pointer to a chain of queue name items
 */
 
 static queue_filename *
 queue_get_spool_list(int subdiroffset, uschar *subdirs, int *subcount,
-  BOOL randomize)
+  BOOL randomize, unsigned * pcount)
 {
 int i;
 int flags = 0;
 {
 int i;
 int flags = 0;
@@ -157,8 +129,6 @@ int resetflags = -1;
 int subptr;
 queue_filename *yield = NULL;
 queue_filename *last = NULL;
 int subptr;
 queue_filename *yield = NULL;
 queue_filename *last = NULL;
-struct dirent *ent;
-DIR *dd;
 uschar buffer[256];
 queue_filename *root[LOG2_MAXNODES];
 
 uschar buffer[256];
 queue_filename *root[LOG2_MAXNODES];
 
@@ -167,7 +137,9 @@ according to the bits of the flags variable. Get a collection of bits from the
 current time. Use the bottom 16 and just keep re-using them if necessary. When
 not randomizing, initialize the sublists for the bottom-up merge sort. */
 
 current time. Use the bottom 16 and just keep re-using them if necessary. When
 not randomizing, initialize the sublists for the bottom-up merge sort. */
 
-if (randomize)
+if (pcount)
+  *pcount = 0;
+else if (randomize)
   resetflags = time(NULL) & 0xFFFF;
 else
    for (i = 0; i < LOG2_MAXNODES; i++)
   resetflags = time(NULL) & 0xFFFF;
 else
    for (i = 0; i < LOG2_MAXNODES; i++)
@@ -201,6 +173,7 @@ for (; i <= *subcount; i++)
   {
   int count = 0;
   int subdirchar = subdirs[i];      /* 0 for main directory */
   {
   int count = 0;
   int subdirchar = subdirs[i];      /* 0 for main directory */
+  DIR *dd;
 
   if (subdirchar != 0)
     {
 
   if (subdirchar != 0)
     {
@@ -209,12 +182,12 @@ for (; i <= *subcount; i++)
     }
 
   DEBUG(D_queue_run) debug_printf("looking in %s\n", buffer);
     }
 
   DEBUG(D_queue_run) debug_printf("looking in %s\n", buffer);
-  if (!(dd = opendir(CS buffer)))
+  if (!(dd = exim_opendir(buffer)))
     continue;
 
   /* Now scan the directory. */
 
     continue;
 
   /* Now scan the directory. */
 
-  while ((ent = readdir(dd)))
+  for (struct dirent *ent; ent = readdir(dd); )
     {
     uschar *name = US ent->d_name;
     int len = Ustrlen(name);
     {
     uschar *name = US ent->d_name;
     int len = Ustrlen(name);
@@ -237,61 +210,63 @@ for (; i <= *subcount; i++)
 
     if (len == SPOOL_NAME_LENGTH &&
         Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0)
 
     if (len == SPOOL_NAME_LENGTH &&
         Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0)
-      {
-      queue_filename *next =
-        store_get(sizeof(queue_filename) + Ustrlen(name));
-      Ustrcpy(next->text, name);
-      next->dir_uschar = subdirchar;
-
-      /* Handle the creation of a randomized list. The first item becomes both
-      the top and bottom of the list. Subsequent items are inserted either at
-      the top or the bottom, randomly. This is, I argue, faster than doing a
-      sort by allocating a random number to each item, and it also saves having
-      to store the number with each item. */
-
-      if (randomize)
-        if (!yield)
-          {
-          next->next = NULL;
-          yield = last = next;
-          }
-        else
-          {
-          if (flags == 0)
-           flags = resetflags;
-          if ((flags & 1) == 0)
-            {
-            next->next = yield;
-            yield = next;
-            }
-          else
-            {
-            next->next = NULL;
-            last->next = next;
-            last = next;
-            }
-          flags = flags >> 1;
-          }
+      if (pcount)
+       (*pcount)++;
+      else
+       {
+       queue_filename *next =
+         store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name));
+       Ustrcpy(next->text, name);
+       next->dir_uschar = subdirchar;
+
+       /* Handle the creation of a randomized list. The first item becomes both
+       the top and bottom of the list. Subsequent items are inserted either at
+       the top or the bottom, randomly. This is, I argue, faster than doing a
+       sort by allocating a random number to each item, and it also saves having
+       to store the number with each item. */
+
+       if (randomize)
+         if (!yield)
+           {
+           next->next = NULL;
+           yield = last = next;
+           }
+         else
+           {
+           if (flags == 0)
+             flags = resetflags;
+           if ((flags & 1) == 0)
+             {
+             next->next = yield;
+             yield = next;
+             }
+           else
+             {
+             next->next = NULL;
+             last->next = next;
+             last = next;
+             }
+           flags = flags >> 1;
+           }
 
 
-      /* Otherwise do a bottom-up merge sort based on the name. */
+       /* Otherwise do a bottom-up merge sort based on the name. */
 
 
-      else
-        {
-        int j;
-        next->next = NULL;
-        for (j = 0; j < LOG2_MAXNODES; j++)
-          if (root[j])
-            {
-            next = merge_queue_lists(next, root[j]);
-            root[j] = j == LOG2_MAXNODES - 1 ? next : NULL;
-            }
-          else
-            {
-            root[j] = next;
-            break;
-            }
-        }
-      }
+       else
+         {
+         next->next = NULL;
+         for (int j = 0; j < LOG2_MAXNODES; j++)
+           if (root[j])
+             {
+             next = merge_queue_lists(next, root[j]);
+             root[j] = j == LOG2_MAXNODES - 1 ? next : NULL;
+             }
+           else
+             {
+             root[j] = next;
+             break;
+             }
+         }
+       }
     }
 
   /* Finished with this directory */
     }
 
   /* Finished with this directory */
@@ -328,7 +303,7 @@ for (; i <= *subcount; i++)
 /* When using a bottom-up merge sort, do the final merging of the sublists.
 Then pass back the final list of file items. */
 
 /* When using a bottom-up merge sort, do the final merging of the sublists.
 Then pass back the final list of file items. */
 
-if (!randomize)
+if (!pcount && !randomize)
   for (i = 0; i < LOG2_MAXNODES; ++i)
     yield = merge_queue_lists(yield, root[i]);
 
   for (i = 0; i < LOG2_MAXNODES; ++i)
     yield = merge_queue_lists(yield, root[i]);
 
@@ -379,8 +354,13 @@ const pcre *selectstring_regex = NULL;
 const pcre *selectstring_regex_sender = NULL;
 uschar *log_detail = NULL;
 int subcount = 0;
 const pcre *selectstring_regex_sender = NULL;
 uschar *log_detail = NULL;
 int subcount = 0;
-int i;
 uschar subdirs[64];
 uschar subdirs[64];
+pid_t qpid[4] = {0};   /* Parallelism factor for q2stage 1st phase */
+BOOL single_id = FALSE;
+
+#ifdef MEASURE_TIMING
+report_time_since(&timestamp_startup, US"queue_run start");
+#endif
 
 /* Cancel any specific queue domains. Turn off the flag that causes SMTP
 deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag
 
 /* Cancel any specific queue domains. Turn off the flag that causes SMTP
 deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag
@@ -416,12 +396,18 @@ if (!recurse)
     p += sprintf(CS p, " -q%s", extras);
 
   if (deliver_selectstring)
     p += sprintf(CS p, " -q%s", extras);
 
   if (deliver_selectstring)
-    p += sprintf(CS p, " -R%s %s", f.deliver_selectstring_regex? "r" : "",
-      deliver_selectstring);
+    {
+    snprintf(CS p, big_buffer_size - (p - big_buffer), " -R%s %s",
+      f.deliver_selectstring_regex? "r" : "", deliver_selectstring);
+    p += Ustrlen(CCS p);
+    }
 
   if (deliver_selectstring_sender)
 
   if (deliver_selectstring_sender)
-    p += sprintf(CS p, " -S%s %s", f.deliver_selectstring_sender_regex? "r" : "",
-      deliver_selectstring_sender);
+    {
+    snprintf(CS p, big_buffer_size - (p - big_buffer), " -S%s %s",
+      f.deliver_selectstring_sender_regex? "r" : "", deliver_selectstring_sender);
+    p += Ustrlen(CCS p);
+    }
 
   log_detail = string_copy(big_buffer);
   if (*queue_name)
 
   log_detail = string_copy(big_buffer);
   if (*queue_name)
@@ -429,6 +415,9 @@ if (!recurse)
       queue_name, log_detail);
   else
     log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail);
       queue_name, log_detail);
   else
     log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail);
+
+  single_id = start_id && stop_id && !f.queue_2stage
+             && Ustrcmp(start_id, stop_id) == 0;
   }
 
 /* If deliver_selectstring is a regex, compile it. */
   }
 
 /* If deliver_selectstring is a regex, compile it. */
@@ -455,12 +444,11 @@ subsequent iterations.
 When the first argument of queue_get_spool_list() is -1 (for queue_run_in_
 order), it scans all directories and makes a single message list. */
 
 When the first argument of queue_get_spool_list() is -1 (for queue_run_in_
 order), it scans all directories and makes a single message list. */
 
-for (i = queue_run_in_order ? -1 : 0;
+for (int i = queue_run_in_order ? -1 : 0;
      i <= (queue_run_in_order ? -1 : subcount);
      i++)
   {
      i <= (queue_run_in_order ? -1 : subcount);
      i++)
   {
-  queue_filename * fq;
-  void *reset_point1 = store_get(0);
+  rmark reset_point1 = store_mark();
 
   DEBUG(D_queue_run)
     {
 
   DEBUG(D_queue_run)
     {
@@ -472,9 +460,9 @@ for (i = queue_run_in_order ? -1 : 0;
       debug_printf("queue running subdirectory '%c'\n", subdirs[i]);
     }
 
       debug_printf("queue running subdirectory '%c'\n", subdirs[i]);
     }
 
-  for (fq = queue_get_spool_list(i, subdirs, &subcount, !queue_run_in_order);
-       fq;
-       fq = fq->next)
+  for (queue_filename * fq = queue_get_spool_list(i, subdirs, &subcount,
+                                            !queue_run_in_order, NULL);
+       fq; fq = fq->next)
     {
     pid_t pid;
     int status;
     {
     pid_t pid;
     int status;
@@ -500,18 +488,39 @@ for (i = queue_run_in_order ? -1 : 0;
           (double)load_average/1000.0,
           (double)deliver_queue_load_max/1000.0);
 
           (double)load_average/1000.0,
           (double)deliver_queue_load_max/1000.0);
 
+    /* If initial of a 2-phase run, maintain a set of child procs
+    to get disk parallelism */
+
+    if (f.queue_2stage && !queue_run_in_order)
+      {
+      int i;
+      if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1])
+       {
+       DEBUG(D_queue_run) debug_printf("q2stage waiting for child %d\n", (int)qpid[0]);
+       waitpid(qpid[0], NULL, 0);
+       DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
+       if (f.running_in_test_harness) i = 0;
+       else for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
+       qpid[i] = 0;
+       }
+      else
+       for (i = 0; qpid[i]; ) i++;
+      if ((qpid[i] = exim_fork(US"qrun-phase-one")))
+       continue;       /* parent loops around */
+      }
+
     /* Skip this message unless it's within the ID limits */
 
     if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0)
     /* Skip this message unless it's within the ID limits */
 
     if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0)
-      continue;
+      goto go_around;
     if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0)
     if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0)
-      continue;
+      goto go_around;
 
     /* Check that the message still exists */
 
     message_subdir[0] = fq->dir_uschar;
     if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0)
 
     /* Check that the message still exists */
 
     message_subdir[0] = fq->dir_uschar;
     if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0)
-      continue;
+      goto go_around;
 
     /* There are some tests that require the reading of the header file. Ensure
     the store used is scavenged afterwards so that this process doesn't keep
 
     /* There are some tests that require the reading of the header file. Ensure
     the store used is scavenged afterwards so that this process doesn't keep
@@ -524,14 +533,14 @@ for (i = queue_run_in_order ? -1 : 0;
       {
       BOOL wanted = TRUE;
       BOOL orig_dont_deliver = f.dont_deliver;
       {
       BOOL wanted = TRUE;
       BOOL orig_dont_deliver = f.dont_deliver;
-      void *reset_point2 = store_get(0);
+      rmark reset_point2 = store_mark();
 
       /* Restore the original setting of dont_deliver after reading the header,
       so that a setting for a particular message doesn't force it for any that
       follow. If the message is chosen for delivery, the header is read again
       in the deliver_message() function, in a subprocess. */
 
 
       /* Restore the original setting of dont_deliver after reading the header,
       so that a setting for a particular message doesn't force it for any that
       follow. If the message is chosen for delivery, the header is read again
       in the deliver_message() function, in a subprocess. */
 
-      if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue;
+      if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around;
       f.dont_deliver = orig_dont_deliver;
 
       /* Now decide if we want to deliver this message. As we have read the
       f.dont_deliver = orig_dont_deliver;
 
       /* Now decide if we want to deliver this message. As we have read the
@@ -603,7 +612,7 @@ for (i = queue_run_in_order ? -1 : 0;
 
       spool_clear_header_globals();
       store_reset(reset_point2);
 
       spool_clear_header_globals();
       store_reset(reset_point2);
-      if (!wanted) continue;      /* With next message */
+      if (!wanted) goto go_around;      /* With next message */
       }
 
     /* OK, got a message we want to deliver. Create a pipe which will
       }
 
     /* OK, got a message we want to deliver. Create a pipe which will
@@ -644,13 +653,28 @@ for (i = queue_run_in_order ? -1 : 0;
 
     set_process_info("running queue: %s", fq->text);
     fq->text[SPOOL_NAME_LENGTH-2] = 0;
 
     set_process_info("running queue: %s", fq->text);
     fq->text[SPOOL_NAME_LENGTH-2] = 0;
-    if ((pid = fork()) == 0)
+#ifdef MEASURE_TIMING
+    report_time_since(&timestamp_startup, US"queue msg selected");
+#endif
+
+#ifndef DISABLE_TLS
+    if (!queue_tls_init)
+      {
+      queue_tls_init = TRUE;
+      /* Preload TLS library info for smtp transports.  Once, and only if we
+      have a delivery to do. */
+      tls_client_creds_reload(FALSE);
+      }
+#endif
+
+single_item_retry:
+    if ((pid = exim_fork(US"qrun-delivery")) == 0)
       {
       int rc;
       {
       int rc;
-      if (f.running_in_test_harness) millisleep(100);
       (void)close(pfd[pipe_read]);
       rc = deliver_message(fq->text, force_delivery, FALSE);
       (void)close(pfd[pipe_read]);
       rc = deliver_message(fq->text, force_delivery, FALSE);
-      _exit(rc == DELIVER_NOT_ATTEMPTED);
+      exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED
+               ? EXIT_FAILURE : EXIT_SUCCESS);
       }
     if (pid < 0)
       log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from "
       }
     if (pid < 0)
       log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from "
@@ -666,15 +690,27 @@ for (i = queue_run_in_order ? -1 : 0;
     /* A zero return means a delivery was attempted; turn off the force flag
     for any subsequent calls unless queue_force is set. */
 
     /* A zero return means a delivery was attempted; turn off the force flag
     for any subsequent calls unless queue_force is set. */
 
-    if ((status & 0xffff) == 0) force_delivery = f.queue_run_force;
+    if (!(status & 0xffff)) force_delivery = f.queue_run_force;
 
     /* If the process crashed, tell somebody */
 
 
     /* If the process crashed, tell somebody */
 
-    else if ((status & 0x00ff) != 0)
+    else if (status & 0x00ff)
       log_write(0, LOG_MAIN|LOG_PANIC,
         "queue run: process %d crashed with signal %d while delivering %s",
         (int)pid, status & 0x00ff, fq->text);
 
       log_write(0, LOG_MAIN|LOG_PANIC,
         "queue run: process %d crashed with signal %d while delivering %s",
         (int)pid, status & 0x00ff, fq->text);
 
+    /* If single-item delivery was untried (likely due to locking)
+    retry once after a delay */
+
+    if (status & 0xff00 && single_id)
+      {
+      single_id = FALSE;
+      DEBUG(D_queue_run) debug_printf("qrun single-item pause before retry\n");
+      millisleep(500);
+      DEBUG(D_queue_run) debug_printf("qrun single-item retry after pause\n");
+      goto single_item_retry;
+      }
+
     /* Before continuing, wait till the pipe gets closed at the far end. This
     tells us that any children created by the delivery to re-use any SMTP
     channels have all finished. Since no process actually writes to the pipe,
     /* Before continuing, wait till the pipe gets closed at the far end. This
     tells us that any children created by the delivery to re-use any SMTP
     channels have all finished. Since no process actually writes to the pipe,
@@ -682,19 +718,32 @@ for (i = queue_run_in_order ? -1 : 0;
 
     set_process_info("running queue: waiting for children of %d", pid);
     if ((status = read(pfd[pipe_read], buffer, sizeof(buffer))) != 0)
 
     set_process_info("running queue: waiting for children of %d", pid);
     if ((status = read(pfd[pipe_read], buffer, sizeof(buffer))) != 0)
-      log_write(0, LOG_MAIN|LOG_PANIC, "queue run: %s on pipe",
-               status > 0 ? "unexpected data" : "error");
+      log_write(0, LOG_MAIN|LOG_PANIC, status > 0 ?
+       "queue run: unexpected data on pipe" : "queue run: error on pipe: %s",
+       strerror(errno));
     (void)close(pfd[pipe_read]);
     set_process_info("running queue");
 
     (void)close(pfd[pipe_read]);
     set_process_info("running queue");
 
+    /* If initial of a 2-phase run, we are a child - so just exit */
+    if (f.queue_2stage && !queue_run_in_order)
+      exim_exit(EXIT_SUCCESS);
+
     /* If we are in the test harness, and this is not the first of a 2-stage
     queue run, update fudged queue times. */
 
     if (f.running_in_test_harness && !f.queue_2stage)
       {
     /* If we are in the test harness, and this is not the first of a 2-stage
     queue run, update fudged queue times. */
 
     if (f.running_in_test_harness && !f.queue_2stage)
       {
-      uschar *fqtnext = Ustrchr(fudged_queue_times, '/');
-      if (fqtnext != NULL) fudged_queue_times = fqtnext + 1;
+      uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
+      if (fqtnext) fudged_queue_times = fqtnext + 1;
       }
       }
+
+
+    continue;
+
+  go_around:
+    /* If initial of a 2-phase run, we are a child - so just exit */
+    if (f.queue_2stage && !queue_run_in_order)
+      exim_exit(EXIT_SUCCESS);
     }                                  /* End loop for list of messages */
 
   tree_nonrecipients = NULL;
     }                                  /* End loop for list of messages */
 
   tree_nonrecipients = NULL;
@@ -704,9 +753,9 @@ for (i = queue_run_in_order ? -1 : 0;
   sub-directories have been found, randomize their order if necessary. */
 
   if (i == 0 && subcount > 1 && !queue_run_in_order)
   sub-directories have been found, randomize their order if necessary. */
 
   if (i == 0 && subcount > 1 && !queue_run_in_order)
-    {
-    int j, r;
-    for (j = 1; j <= subcount; j++)
+    for (int j = 1; j <= subcount; j++)
+      {
+      int r;
       if ((r = random_number(100)) >= 50)
         {
         int k = (r % subcount) + 1;
       if ((r = random_number(100)) >= 50)
         {
         int k = (r % subcount) + 1;
@@ -714,7 +763,7 @@ for (i = queue_run_in_order ? -1 : 0;
         subdirs[j] = subdirs[k];
         subdirs[k] = x;
         }
         subdirs[j] = subdirs[k];
         subdirs[k] = x;
         }
-    }
+      }
   }                                    /* End loop for multiple directories */
 
 /* If queue_2stage is true, we do it all again, with the 2stage flag
   }                                    /* End loop for multiple directories */
 
 /* If queue_2stage is true, we do it all again, with the 2stage flag
@@ -722,6 +771,19 @@ turned off. */
 
 if (f.queue_2stage)
   {
 
 if (f.queue_2stage)
   {
+
+  /* wait for last children */
+  for (int i = 0; i < nelem(qpid); i++)
+    if (qpid[i])
+      {
+      DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]);
+      waitpid(qpid[i], NULL, 0);
+      }
+    else break;
+
+#ifdef MEASURE_TIMING
+  report_time_since(&timestamp_startup, US"queue_run 1st phase done");
+#endif
   f.queue_2stage = FALSE;
   queue_run(start_id, stop_id, TRUE);
   }
   f.queue_2stage = FALSE;
   queue_run(start_id, stop_id, TRUE);
   }
@@ -746,26 +808,38 @@ if (!recurse)
 /* Called as a result of -bpc
 
 Arguments:  none
 /* Called as a result of -bpc
 
 Arguments:  none
-Returns:    nothing
+Returns:    count
 */
 
 */
 
-void
+unsigned
 queue_count(void)
 {
 int subcount;
 queue_count(void)
 {
 int subcount;
-int count = 0;
-queue_filename *f = NULL;
+unsigned count = 0;
 uschar subdirs[64];
 uschar subdirs[64];
-f = queue_get_spool_list(
-        -1,             /* entire queue */
-        subdirs,        /* for holding sub list */
-        &subcount,      /* for subcount */
-        FALSE);         /* not random */
-for (; f != NULL; f = f->next) count++;
-fprintf(stdout, "%d\n", count);
+
+(void) queue_get_spool_list(-1,                /* entire queue */
+                       subdirs,        /* for holding sub list */
+                       &subcount,      /* for subcount */
+                       FALSE,          /* not random */
+                       &count);        /* just get the count */
+return count;
 }
 
 
 }
 
 
+#define QUEUE_SIZE_AGE 60      /* update rate for queue_size */
+
+unsigned
+queue_count_cached(void)
+{
+time_t now;
+if ((now = time(NULL)) >= queue_size_next)
+  {
+  queue_size = queue_count();
+  queue_size_next = now + (f.running_in_test_harness ? 3 : QUEUE_SIZE_AGE);
+  }
+return queue_size;
+}
 
 /************************************************
 *          List extra deliveries                *
 
 /************************************************
 *          List extra deliveries                *
@@ -781,11 +855,12 @@ Argument:    points to the tree node
 Returns:     nothing
 */
 
 Returns:     nothing
 */
 
-static void queue_list_extras(tree_node *p)
+static void
+queue_list_extras(tree_node *p)
 {
 {
-if (p->left != NULL) queue_list_extras(p->left);
+if (p->left) queue_list_extras(p->left);
 if (!p->data.val) printf("       +D %s\n", p->name);
 if (!p->data.val) printf("       +D %s\n", p->name);
-if (p->right != NULL) queue_list_extras(p->right);
+if (p->right) queue_list_extras(p->right);
 }
 
 
 }
 
 
@@ -816,10 +891,9 @@ Returns:      nothing
 void
 queue_list(int option, uschar **list, int count)
 {
 void
 queue_list(int option, uschar **list, int count)
 {
-int i;
 int subcount;
 int now = (int)time(NULL);
 int subcount;
 int now = (int)time(NULL);
-void *reset_point;
+rmark reset_point;
 queue_filename * qf = NULL;
 uschar subdirs[64];
 
 queue_filename * qf = NULL;
 uschar subdirs[64];
 
@@ -828,10 +902,10 @@ uschar subdirs[64];
 if (count > 0)
   {
   queue_filename *last = NULL;
 if (count > 0)
   {
   queue_filename *last = NULL;
-  for (i = 0; i < count; i++)
+  for (int i = 0; i < count; i++)
     {
     queue_filename *next =
     {
     queue_filename *next =
-      store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2);
+      store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2, is_tainted(list[i]));
     sprintf(CS next->text, "%s-H", list[i]);
     next->dir_uschar = '*';
     next->next = NULL;
     sprintf(CS next->text, "%s-H", list[i]);
     next->dir_uschar = '*';
     next->next = NULL;
@@ -847,15 +921,16 @@ else
           -1,             /* entire queue */
           subdirs,        /* for holding sub list */
           &subcount,      /* for subcount */
           -1,             /* entire queue */
           subdirs,        /* for holding sub list */
           &subcount,      /* for subcount */
-          option >= 8);   /* randomize if required */
+          option >= 8,   /* randomize if required */
+         NULL);          /* don't just count */
 
 if (option >= 8) option -= 8;
 
 /* Now scan the chain and print information, resetting store used
 each time. */
 
 
 if (option >= 8) option -= 8;
 
 /* Now scan the chain and print information, resetting store used
 each time. */
 
-for (reset_point = store_get(0);
-    qf;
+for (;
+    qf && (reset_point = store_mark());
     spool_clear_header_globals(), store_reset(reset_point), qf = qf->next
     )
   {
     spool_clear_header_globals(), store_reset(reset_point), qf = qf->next
     )
   {
@@ -874,7 +949,7 @@ for (reset_point = store_get(0);
 
   if (env_read)
     {
 
   if (env_read)
     {
-    int ptr;
+    int i, ptr;
     FILE *jread;
     struct stat statbuf;
     uschar * fname = spool_fname(US"input", message_subdir, qf->text, US"");
     FILE *jread;
     struct stat statbuf;
     uschar * fname = spool_fname(US"input", message_subdir, qf->text, US"");
@@ -912,7 +987,7 @@ for (reset_point = store_get(0);
     }
 
   fprintf(stdout, "%s ", string_format_size(size, big_buffer));
     }
 
   fprintf(stdout, "%s ", string_format_size(size, big_buffer));
-  for (i = 0; i < 16; i++) fputc(qf->text[i], stdout);
+  for (int i = 0; i < 16; i++) fputc(qf->text[i], stdout);
 
   if (env_read && sender_address)
     {
 
   if (env_read && sender_address)
     {
@@ -947,7 +1022,7 @@ for (reset_point = store_get(0);
 
   if (recipients_list)
     {
 
   if (recipients_list)
     {
-    for (i = 0; i < recipients_count; i++)
+    for (int i = 0; i < recipients_count; i++)
       {
       tree_node *delivered =
         tree_search(tree_nonrecipients, recipients_list[i].address);
       {
       tree_node *delivered =
         tree_search(tree_nonrecipients, recipients_list[i].address);
@@ -986,7 +1061,6 @@ Returns:          FALSE if there was any problem
 BOOL
 queue_action(uschar *id, int action, uschar **argv, int argc, int recipients_arg)
 {
 BOOL
 queue_action(uschar *id, int action, uschar **argv, int argc, int recipients_arg)
 {
-int i, j;
 BOOL yield = TRUE;
 BOOL removed = FALSE;
 struct passwd *pw;
 BOOL yield = TRUE;
 BOOL removed = FALSE;
 struct passwd *pw;
@@ -1005,7 +1079,7 @@ done. Only admin users may read the spool files. */
 
 if (action >= MSG_SHOW_BODY)
   {
 
 if (action >= MSG_SHOW_BODY)
   {
-  int fd, i, rc;
+  int fd, rc;
   uschar *subdirectory, *suffix;
 
   if (!f.admin_user)
   uschar *subdirectory, *suffix;
 
   if (!f.admin_user)
@@ -1036,9 +1110,9 @@ if (action >= MSG_SHOW_BODY)
     suffix = US"";
     }
 
     suffix = US"";
     }
 
-  for (i = 0; i < 2; i++)
+  for (int i = 0; i < 2; i++)
     {
     {
-    message_subdir[0] = split_spool_directory == (i == 0) ? id[5] : 0;
+    set_subdir_str(message_subdir, id, i);
     if ((fd = Uopen(spool_fname(subdirectory, message_subdir, id, suffix),
                    O_RDONLY, 0)) >= 0)
       break;
     if ((fd = Uopen(spool_fname(subdirectory, message_subdir, id, suffix),
                    O_RDONLY, 0)) >= 0)
       break;
@@ -1135,7 +1209,7 @@ switch(action)
     deliver_in_buffer = store_malloc(DELIVER_IN_BUFFER_SIZE);
     deliver_out_buffer = store_malloc(DELIVER_OUT_BUFFER_SIZE);
     tctx.u.fd = 1;
     deliver_in_buffer = store_malloc(DELIVER_IN_BUFFER_SIZE);
     deliver_out_buffer = store_malloc(DELIVER_OUT_BUFFER_SIZE);
     tctx.u.fd = 1;
-    transport_write_message(&tctx, 0);
+    (void) transport_write_message(&tctx, 0);
     break;
     }
 
     break;
     }
 
@@ -1203,7 +1277,7 @@ switch(action)
     suffix[2] = 0;
     message_subdir[0] = id[5];
 
     suffix[2] = 0;
     message_subdir[0] = id[5];
 
-    for (j = 0; j < 2; message_subdir[0] = 0, j++)
+    for (int j = 0; j < 2; message_subdir[0] = 0, j++)
       {
       uschar * fname = spool_fname(US"msglog", message_subdir, id, US"");
 
       {
       uschar * fname = spool_fname(US"msglog", message_subdir, id, US"");
 
@@ -1223,7 +1297,7 @@ switch(action)
        DEBUG(D_any) debug_printf(" (ok)\n");
        }
 
        DEBUG(D_any) debug_printf(" (ok)\n");
        }
 
-      for (i = 0; i < 3; i++)
+      for (int i = 0; i < 3; i++)
        {
        uschar * fname;
 
        {
        uschar * fname;
 
@@ -1255,6 +1329,38 @@ switch(action)
       else printf("has been removed or did not exist\n");
     if (removed)
       {
       else printf("has been removed or did not exist\n");
     if (removed)
       {
+#ifndef DISABLE_EVENT
+      if (event_action) for (int i = 0; i < recipients_count; i++)
+       {
+       tree_node *delivered =
+         tree_search(tree_nonrecipients, recipients_list[i].address);
+       if (!delivered)
+         {
+         uschar * save_local = deliver_localpart;
+         const uschar * save_domain = deliver_domain;
+         uschar * addr = recipients_list[i].address, * errmsg = NULL;
+         int start, end, dom;
+
+         if (!parse_extract_address(addr, &errmsg, &start, &end, &dom, TRUE))
+           log_write(0, LOG_MAIN|LOG_PANIC,
+             "failed to parse address '%.100s'\n: %s", addr, errmsg);
+         else
+           {
+           deliver_localpart =
+             string_copyn(addr+start, dom ? (dom-1) - start : end - start);
+           deliver_domain = dom
+             ? CUS string_copyn(addr+dom, end - dom) : CUS"";
+
+           event_raise(event_action, US"msg:fail:internal",
+             string_sprintf("message removed by %s", username));
+
+           deliver_localpart = save_local;
+           deliver_domain = save_domain;
+           }
+         }
+       }
+      (void) event_raise(event_action, US"msg:complete", NULL);
+#endif
       log_write(0, LOG_MAIN, "removed by %s", username);
       log_write(0, LOG_MAIN, "Completed");
       }
       log_write(0, LOG_MAIN, "removed by %s", username);
       log_write(0, LOG_MAIN, "Completed");
       }
@@ -1262,15 +1368,22 @@ switch(action)
     }
 
 
     }
 
 
+  case MSG_SETQUEUE:
+    /* The global "queue_name_dest" is used as destination, "queue_name"
+    as source */
+
+    spool_move_message(id, message_subdir, US"", US"");
+    break;
+
+
   case MSG_MARK_ALL_DELIVERED:
   case MSG_MARK_ALL_DELIVERED:
-  for (i = 0; i < recipients_count; i++)
-    {
+  for (int i = 0; i < recipients_count; i++)
     tree_add_nonrecipient(recipients_list[i].address);
     tree_add_nonrecipient(recipients_list[i].address);
-    }
+
   if (spool_write_header(id, SW_MODIFYING, &errmsg) >= 0)
     {
     printf("has been modified\n");
   if (spool_write_header(id, SW_MODIFYING, &errmsg) >= 0)
     {
     printf("has been modified\n");
-    for (i = 0; i < recipients_count; i++)
+    for (int i = 0; i < recipients_count; i++)
       log_write(0, LOG_MAIN, "address <%s> marked delivered by %s",
         recipients_list[i].address, username);
     }
       log_write(0, LOG_MAIN, "address <%s> marked delivered by %s",
         recipients_list[i].address, username);
     }
@@ -1316,13 +1429,13 @@ switch(action)
       parse_extract_address(argv[recipients_arg], &errmess, &start, &end,
         &domain, (action == MSG_EDIT_SENDER));
 
       parse_extract_address(argv[recipients_arg], &errmess, &start, &end,
         &domain, (action == MSG_EDIT_SENDER));
 
-    if (recipient == NULL)
+    if (!recipient)
       {
       yield = FALSE;
       printf("- error while %s:\n  bad address %s: %s\n",
         doing, argv[recipients_arg], errmess);
       }
       {
       yield = FALSE;
       printf("- error while %s:\n  bad address %s: %s\n",
         doing, argv[recipients_arg], errmess);
       }
-    else if (recipient[0] != 0 && domain == 0)
+    else if (*recipient && domain == 0)
       {
       yield = FALSE;
       printf("- error while %s:\n  bad address %s: "
       {
       yield = FALSE;
       printf("- error while %s:\n  bad address %s: "
@@ -1341,6 +1454,7 @@ switch(action)
         }
       else if (action == MSG_MARK_DELIVERED)
         {
         }
       else if (action == MSG_MARK_DELIVERED)
         {
+       int i;
         for (i = 0; i < recipients_count; i++)
           if (Ustrcmp(recipients_list[i].address, recipient) == 0) break;
         if (i >= recipients_count)
         for (i = 0; i < recipients_count; i++)
           if (Ustrcmp(recipients_list[i].address, recipient) == 0) break;
         if (i >= recipients_count)
@@ -1409,34 +1523,68 @@ queue_check_only(void)
 {
 int sep = 0;
 struct stat statbuf;
 {
 int sep = 0;
 struct stat statbuf;
-const uschar *s;
-uschar *ss;
-uschar buffer[1024];
+const uschar * s = queue_only_file;
+uschar * ss;
 
 
-if (queue_only_file == NULL) return;
-
-s = queue_only_file;
-while ((ss = string_nextinlist(&s, &sep, buffer, sizeof(buffer))) != NULL)
-  {
-  if (Ustrncmp(ss, "smtp", 4) == 0)
-    {
-    ss += 4;
-    if (Ustat(ss, &statbuf) == 0)
+if (s)
+  while ((ss = string_nextinlist(&s, &sep, NULL, 0)))
+    if (Ustrncmp(ss, "smtp", 4) == 0)
       {
       {
-      f.queue_smtp = TRUE;
-      DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss);
-      }
-    }
-  else
-    {
-    if (Ustat(ss, &statbuf) == 0)
-      {
-      queue_only = TRUE;
-      DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss);
+      ss += 4;
+      if (Ustat(ss, &statbuf) == 0)
+       {
+       f.queue_smtp = TRUE;
+       DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss);
+       }
       }
       }
-    }
+    else
+      if (Ustat(ss, &statbuf) == 0)
+       {
+       queue_only = TRUE;
+       DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss);
+       }
+}
+
+
+
+/******************************************************************************/
+/******************************************************************************/
+
+#ifndef DISABLE_QUEUE_RAMP
+void
+queue_notify_daemon(const uschar * msgid)
+{
+uschar buf[MESSAGE_ID_LENGTH + 2];
+int fd;
+
+DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid);
+
+buf[0] = NOTIFY_MSG_QRUN;
+memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1);
+
+if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0)
+  {
+  struct sockaddr_un sa_un = {.sun_family = AF_UNIX};
+
+#ifdef EXIM_HAVE_ABSTRACT_UNIX_SOCKETS
+  int len = offsetof(struct sockaddr_un, sun_path) + 1
+    + snprintf(sa_un.sun_path+1, sizeof(sa_un.sun_path)-1, "%s",
+               expand_string(notifier_socket));
+  sa_un.sun_path[0] = 0;
+#else
+  int len = offsetof(struct sockaddr_un, sun_path)
+    + snprintf(sa_un.sun_path, sizeof(sa_un.sun_path), "%s",
+               expand_string(notifier_socket));
+#endif
+
+  if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sa_un, len) < 0)
+    DEBUG(D_queue_run)
+      debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno));
+  close(fd);
   }
   }
+else DEBUG(D_queue_run) debug_printf(" socket: %s\n", strerror(errno));
 }
 }
+#endif
 
 #endif /*!COMPILE_UTILITY*/
 
 
 #endif /*!COMPILE_UTILITY*/