Merge branch '4.next'
[users/jgh/exim.git] / src / src / queue.c
index 4452163d0336b8e9a6fd4e2a34691d7bbe6c885d..6748afd5d8c99e5eaa237ccf29f530faaf165edc 100644 (file)
@@ -3,6 +3,7 @@
 *************************************************/
 
 /* Copyright (c) University of Cambridge 1995 - 2018 */
 *************************************************/
 
 /* Copyright (c) University of Cambridge 1995 - 2018 */
+/* Copyright (c) The Exim Maintainers 2020 */
 /* See the file NOTICE for conditions of use and distribution. */
 
 /* Functions that operate on the input queue. */
 /* See the file NOTICE for conditions of use and distribution. */
 
 /* Functions that operate on the input queue. */
@@ -110,13 +111,14 @@ Arguments:
   subdirs        vector to store list of subdirchars
   subcount       pointer to int in which to store count of subdirs
   randomize      TRUE if the order of the list is to be unpredictable
   subdirs        vector to store list of subdirchars
   subcount       pointer to int in which to store count of subdirs
   randomize      TRUE if the order of the list is to be unpredictable
+  pcount        If not NULL, fill in with count of files and do not return list
 
 Returns:         pointer to a chain of queue name items
 */
 
 static queue_filename *
 queue_get_spool_list(int subdiroffset, uschar *subdirs, int *subcount,
 
 Returns:         pointer to a chain of queue name items
 */
 
 static queue_filename *
 queue_get_spool_list(int subdiroffset, uschar *subdirs, int *subcount,
-  BOOL randomize)
+  BOOL randomize, unsigned * pcount)
 {
 int i;
 int flags = 0;
 {
 int i;
 int flags = 0;
@@ -124,8 +126,6 @@ int resetflags = -1;
 int subptr;
 queue_filename *yield = NULL;
 queue_filename *last = NULL;
 int subptr;
 queue_filename *yield = NULL;
 queue_filename *last = NULL;
-struct dirent *ent;
-DIR *dd;
 uschar buffer[256];
 queue_filename *root[LOG2_MAXNODES];
 
 uschar buffer[256];
 queue_filename *root[LOG2_MAXNODES];
 
@@ -134,7 +134,9 @@ according to the bits of the flags variable. Get a collection of bits from the
 current time. Use the bottom 16 and just keep re-using them if necessary. When
 not randomizing, initialize the sublists for the bottom-up merge sort. */
 
 current time. Use the bottom 16 and just keep re-using them if necessary. When
 not randomizing, initialize the sublists for the bottom-up merge sort. */
 
-if (randomize)
+if (pcount)
+  *pcount = 0;
+else if (randomize)
   resetflags = time(NULL) & 0xFFFF;
 else
    for (i = 0; i < LOG2_MAXNODES; i++)
   resetflags = time(NULL) & 0xFFFF;
 else
    for (i = 0; i < LOG2_MAXNODES; i++)
@@ -168,6 +170,7 @@ for (; i <= *subcount; i++)
   {
   int count = 0;
   int subdirchar = subdirs[i];      /* 0 for main directory */
   {
   int count = 0;
   int subdirchar = subdirs[i];      /* 0 for main directory */
+  DIR *dd;
 
   if (subdirchar != 0)
     {
 
   if (subdirchar != 0)
     {
@@ -176,12 +179,12 @@ for (; i <= *subcount; i++)
     }
 
   DEBUG(D_queue_run) debug_printf("looking in %s\n", buffer);
     }
 
   DEBUG(D_queue_run) debug_printf("looking in %s\n", buffer);
-  if (!(dd = opendir(CS buffer)))
+  if (!(dd = exim_opendir(buffer)))
     continue;
 
   /* Now scan the directory. */
 
     continue;
 
   /* Now scan the directory. */
 
-  while ((ent = readdir(dd)))
+  for (struct dirent *ent; ent = readdir(dd); )
     {
     uschar *name = US ent->d_name;
     int len = Ustrlen(name);
     {
     uschar *name = US ent->d_name;
     int len = Ustrlen(name);
@@ -204,60 +207,63 @@ for (; i <= *subcount; i++)
 
     if (len == SPOOL_NAME_LENGTH &&
         Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0)
 
     if (len == SPOOL_NAME_LENGTH &&
         Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0)
-      {
-      queue_filename *next =
-        store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name));
-      Ustrcpy(next->text, name);
-      next->dir_uschar = subdirchar;
-
-      /* Handle the creation of a randomized list. The first item becomes both
-      the top and bottom of the list. Subsequent items are inserted either at
-      the top or the bottom, randomly. This is, I argue, faster than doing a
-      sort by allocating a random number to each item, and it also saves having
-      to store the number with each item. */
-
-      if (randomize)
-        if (!yield)
-          {
-          next->next = NULL;
-          yield = last = next;
-          }
-        else
-          {
-          if (flags == 0)
-           flags = resetflags;
-          if ((flags & 1) == 0)
-            {
-            next->next = yield;
-            yield = next;
-            }
-          else
-            {
-            next->next = NULL;
-            last->next = next;
-            last = next;
-            }
-          flags = flags >> 1;
-          }
+      if (pcount)
+       (*pcount)++;
+      else
+       {
+       queue_filename *next =
+         store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name));
+       Ustrcpy(next->text, name);
+       next->dir_uschar = subdirchar;
+
+       /* Handle the creation of a randomized list. The first item becomes both
+       the top and bottom of the list. Subsequent items are inserted either at
+       the top or the bottom, randomly. This is, I argue, faster than doing a
+       sort by allocating a random number to each item, and it also saves having
+       to store the number with each item. */
+
+       if (randomize)
+         if (!yield)
+           {
+           next->next = NULL;
+           yield = last = next;
+           }
+         else
+           {
+           if (flags == 0)
+             flags = resetflags;
+           if ((flags & 1) == 0)
+             {
+             next->next = yield;
+             yield = next;
+             }
+           else
+             {
+             next->next = NULL;
+             last->next = next;
+             last = next;
+             }
+           flags = flags >> 1;
+           }
 
 
-      /* Otherwise do a bottom-up merge sort based on the name. */
+       /* Otherwise do a bottom-up merge sort based on the name. */
 
 
-      else
-        {
-        next->next = NULL;
-        for (int j = 0; j < LOG2_MAXNODES; j++)
-          if (root[j])
-            {
-            next = merge_queue_lists(next, root[j]);
-            root[j] = j == LOG2_MAXNODES - 1 ? next : NULL;
-            }
-          else
-            {
-            root[j] = next;
-            break;
-            }
-        }
-      }
+       else
+         {
+         next->next = NULL;
+         for (int j = 0; j < LOG2_MAXNODES; j++)
+           if (root[j])
+             {
+             next = merge_queue_lists(next, root[j]);
+             root[j] = j == LOG2_MAXNODES - 1 ? next : NULL;
+             }
+           else
+             {
+             root[j] = next;
+             break;
+             }
+         }
+       }
     }
 
   /* Finished with this directory */
     }
 
   /* Finished with this directory */
@@ -294,7 +300,7 @@ for (; i <= *subcount; i++)
 /* When using a bottom-up merge sort, do the final merging of the sublists.
 Then pass back the final list of file items. */
 
 /* When using a bottom-up merge sort, do the final merging of the sublists.
 Then pass back the final list of file items. */
 
-if (!randomize)
+if (!pcount && !randomize)
   for (i = 0; i < LOG2_MAXNODES; ++i)
     yield = merge_queue_lists(yield, root[i]);
 
   for (i = 0; i < LOG2_MAXNODES; ++i)
     yield = merge_queue_lists(yield, root[i]);
 
@@ -347,6 +353,7 @@ uschar *log_detail = NULL;
 int subcount = 0;
 uschar subdirs[64];
 pid_t qpid[4] = {0};   /* Parallelism factor for q2stage 1st phase */
 int subcount = 0;
 uschar subdirs[64];
 pid_t qpid[4] = {0};   /* Parallelism factor for q2stage 1st phase */
+BOOL single_id = FALSE;
 
 #ifdef MEASURE_TIMING
 report_time_since(&timestamp_startup, US"queue_run start");
 
 #ifdef MEASURE_TIMING
 report_time_since(&timestamp_startup, US"queue_run start");
@@ -399,6 +406,9 @@ if (!recurse)
       queue_name, log_detail);
   else
     log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail);
       queue_name, log_detail);
   else
     log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail);
+
+  single_id = start_id && stop_id && !f.queue_2stage
+             && Ustrcmp(start_id, stop_id) == 0;
   }
 
 /* If deliver_selectstring is a regex, compile it. */
   }
 
 /* If deliver_selectstring is a regex, compile it. */
@@ -442,7 +452,7 @@ for (int i = queue_run_in_order ? -1 : 0;
     }
 
   for (queue_filename * fq = queue_get_spool_list(i, subdirs, &subcount,
     }
 
   for (queue_filename * fq = queue_get_spool_list(i, subdirs, &subcount,
-                                              !queue_run_in_order);
+                                            !queue_run_in_order, NULL);
        fq; fq = fq->next)
     {
     pid_t pid;
        fq; fq = fq->next)
     {
     pid_t pid;
@@ -477,18 +487,17 @@ for (int i = queue_run_in_order ? -1 : 0;
       int i;
       if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1])
        {
       int i;
       if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1])
        {
-       DEBUG(D_queue_run) debug_printf("q2stage waiting for child\n");
+       DEBUG(D_queue_run) debug_printf("q2stage waiting for child %d\n", (int)qpid[0]);
        waitpid(qpid[0], NULL, 0);
        DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
        waitpid(qpid[0], NULL, 0);
        DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
-       for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
+       if (f.running_in_test_harness) i = 0;
+       else for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
        qpid[i] = 0;
        }
       else
        for (i = 0; qpid[i]; ) i++;
        qpid[i] = 0;
        }
       else
        for (i = 0; qpid[i]; ) i++;
-      DEBUG(D_queue_run) debug_printf("q2stage forking\n");
-      if ((qpid[i] = fork()))
+      if ((qpid[i] = exim_fork(US"qrun-phase-one")))
        continue;       /* parent loops around */
        continue;       /* parent loops around */
-      DEBUG(D_queue_run) debug_printf("q2stage child\n");
       }
 
     /* Skip this message unless it's within the ID limits */
       }
 
     /* Skip this message unless it's within the ID limits */
@@ -639,13 +648,14 @@ for (int i = queue_run_in_order ? -1 : 0;
     report_time_since(&timestamp_startup, US"queue msg selected");
 #endif
 
     report_time_since(&timestamp_startup, US"queue msg selected");
 #endif
 
-    if ((pid = fork()) == 0)
+single_item_retry:
+    if ((pid = exim_fork(US"qrun-delivery")) == 0)
       {
       int rc;
       {
       int rc;
-      testharness_pause_ms(100);
       (void)close(pfd[pipe_read]);
       rc = deliver_message(fq->text, force_delivery, FALSE);
       (void)close(pfd[pipe_read]);
       rc = deliver_message(fq->text, force_delivery, FALSE);
-      exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED);
+      exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED
+               ? EXIT_FAILURE : EXIT_SUCCESS);
       }
     if (pid < 0)
       log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from "
       }
     if (pid < 0)
       log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from "
@@ -670,6 +680,18 @@ for (int i = queue_run_in_order ? -1 : 0;
         "queue run: process %d crashed with signal %d while delivering %s",
         (int)pid, status & 0x00ff, fq->text);
 
         "queue run: process %d crashed with signal %d while delivering %s",
         (int)pid, status & 0x00ff, fq->text);
 
+    /* If single-item delivery was untried (likely due to locking)
+    retry once after a delay */
+
+    if (status & 0xff00 && single_id)
+      {
+      single_id = FALSE;
+      DEBUG(D_queue_run) debug_printf("qrun single-item pause before retry\n");
+      millisleep(500);
+      DEBUG(D_queue_run) debug_printf("qrun single-item retry after pause\n");
+      goto single_item_retry;
+      }
+
     /* Before continuing, wait till the pipe gets closed at the far end. This
     tells us that any children created by the delivery to re-use any SMTP
     channels have all finished. Since no process actually writes to the pipe,
     /* Before continuing, wait till the pipe gets closed at the far end. This
     tells us that any children created by the delivery to re-use any SMTP
     channels have all finished. Since no process actually writes to the pipe,
@@ -685,7 +707,7 @@ for (int i = queue_run_in_order ? -1 : 0;
 
     /* If initial of a 2-phase run, we are a child - so just exit */
     if (f.queue_2stage && !queue_run_in_order)
 
     /* If initial of a 2-phase run, we are a child - so just exit */
     if (f.queue_2stage && !queue_run_in_order)
-      exim_exit(EXIT_SUCCESS, US"2-phase child");
+      exim_exit(EXIT_SUCCESS);
 
     /* If we are in the test harness, and this is not the first of a 2-stage
     queue run, update fudged queue times. */
 
     /* If we are in the test harness, and this is not the first of a 2-stage
     queue run, update fudged queue times. */
@@ -702,7 +724,7 @@ for (int i = queue_run_in_order ? -1 : 0;
   go_around:
     /* If initial of a 2-phase run, we are a child - so just exit */
     if (f.queue_2stage && !queue_run_in_order)
   go_around:
     /* If initial of a 2-phase run, we are a child - so just exit */
     if (f.queue_2stage && !queue_run_in_order)
-      exim_exit(EXIT_SUCCESS, US"2-phase child");
+      exim_exit(EXIT_SUCCESS);
     }                                  /* End loop for list of messages */
 
   tree_nonrecipients = NULL;
     }                                  /* End loop for list of messages */
 
   tree_nonrecipients = NULL;
@@ -777,12 +799,11 @@ int subcount;
 unsigned count = 0;
 uschar subdirs[64];
 
 unsigned count = 0;
 uschar subdirs[64];
 
-for (queue_filename * f = queue_get_spool_list(
-                               -1,             /* entire queue */
-                               subdirs,        /* for holding sub list */
-                               &subcount,      /* for subcount */
-                               FALSE);         /* not random */
-    f; f = f->next) count++;
+(void) queue_get_spool_list(-1,                /* entire queue */
+                       subdirs,        /* for holding sub list */
+                       &subcount,      /* for subcount */
+                       FALSE,          /* not random */
+                       &count);        /* just get the count */
 return count;
 }
 
 return count;
 }
 
@@ -881,7 +902,8 @@ else
           -1,             /* entire queue */
           subdirs,        /* for holding sub list */
           &subcount,      /* for subcount */
           -1,             /* entire queue */
           subdirs,        /* for holding sub list */
           &subcount,      /* for subcount */
-          option >= 8);   /* randomize if required */
+          option >= 8,   /* randomize if required */
+         NULL);          /* don't just count */
 
 if (option >= 8) option -= 8;
 
 
 if (option >= 8) option -= 8;
 
@@ -1388,13 +1410,13 @@ switch(action)
       parse_extract_address(argv[recipients_arg], &errmess, &start, &end,
         &domain, (action == MSG_EDIT_SENDER));
 
       parse_extract_address(argv[recipients_arg], &errmess, &start, &end,
         &domain, (action == MSG_EDIT_SENDER));
 
-    if (recipient == NULL)
+    if (!recipient)
       {
       yield = FALSE;
       printf("- error while %s:\n  bad address %s: %s\n",
         doing, argv[recipients_arg], errmess);
       }
       {
       yield = FALSE;
       printf("- error while %s:\n  bad address %s: %s\n",
         doing, argv[recipients_arg], errmess);
       }
-    else if (recipient[0] != 0 && domain == 0)
+    else if (*recipient && domain == 0)
       {
       yield = FALSE;
       printf("- error while %s:\n  bad address %s: "
       {
       yield = FALSE;
       printf("- error while %s:\n  bad address %s: "
@@ -1509,7 +1531,7 @@ if (s)
 /******************************************************************************/
 /******************************************************************************/
 
 /******************************************************************************/
 /******************************************************************************/
 
-#ifdef EXPERIMENTAL_QUEUE_RAMP
+#ifndef DISABLE_QUEUE_RAMP
 void
 queue_notify_daemon(const uschar * msgid)
 {
 void
 queue_notify_daemon(const uschar * msgid)
 {
@@ -1523,21 +1545,20 @@ memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1);
 
 if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0)
   {
 
 if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0)
   {
-  struct sockaddr_un sun = {.sun_family = AF_UNIX};
-  int slen;
+  struct sockaddr_un sa_un = {.sun_family = AF_UNIX};
 
 #ifdef EXIM_HAVE_ABSTRACT_UNIX_SOCKETS
   int len = offsetof(struct sockaddr_un, sun_path) + 1
 
 #ifdef EXIM_HAVE_ABSTRACT_UNIX_SOCKETS
   int len = offsetof(struct sockaddr_un, sun_path) + 1
-    + snprintf(sun.sun_path+1, sizeof(sun.sun_path)-1, "%s",
-       NOTIFIER_SOCKET_NAME);
-  sun.sun_path[0] = 0;
+    + snprintf(sa_un.sun_path+1, sizeof(sa_un.sun_path)-1, "%s",
+               expand_string(notifier_socket));
+  sa_un.sun_path[0] = 0;
 #else
   int len = offsetof(struct sockaddr_un, sun_path)
 #else
   int len = offsetof(struct sockaddr_un, sun_path)
-    + snprintf(sun.sun_path, sizeof(sun.sun_path), "%s/%s",
-       spool_directory, NOTIFIER_SOCKET_NAME);
+    + snprintf(sa_un.sun_path, sizeof(sa_un.sun_path), "%s",
+               expand_string(notifier_socket));
 #endif
 
 #endif
 
-  if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sun, len) < 0)
+  if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sa_un, len) < 0)
     DEBUG(D_queue_run)
       debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno));
   close(fd);
     DEBUG(D_queue_run)
       debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno));
   close(fd);