Copyright updates:
[exim.git] / src / src / queue.c
index a578014262863091eb123a08bdaab249e2b6c567..4bdd6fb1403a6e113effced76adfc5993740606b 100644 (file)
@@ -2,6 +2,7 @@
 *     Exim - an Internet mail transport agent    *
 *************************************************/
 
+/* Copyright (c) The Exim Maintainers 2020 - 2022 */
 /* Copyright (c) University of Cambridge 1995 - 2018 */
 /* See the file NOTICE for conditions of use and distribution. */
 
@@ -25,6 +26,9 @@ Michael Haardt. */
 #define LOG2_MAXNODES 32
 
 
+#ifndef DISABLE_TLS
+static BOOL queue_tls_init = FALSE;
+#endif
 
 /*************************************************
 *  Helper sort function for queue_get_spool_list *
@@ -110,13 +114,14 @@ Arguments:
   subdirs        vector to store list of subdirchars
   subcount       pointer to int in which to store count of subdirs
   randomize      TRUE if the order of the list is to be unpredictable
+  pcount        If not NULL, fill in with count of files and do not return list
 
 Returns:         pointer to a chain of queue name items
 */
 
 static queue_filename *
 queue_get_spool_list(int subdiroffset, uschar *subdirs, int *subcount,
-  BOOL randomize)
+  BOOL randomize, unsigned * pcount)
 {
 int i;
 int flags = 0;
@@ -124,8 +129,6 @@ int resetflags = -1;
 int subptr;
 queue_filename *yield = NULL;
 queue_filename *last = NULL;
-struct dirent *ent;
-DIR *dd;
 uschar buffer[256];
 queue_filename *root[LOG2_MAXNODES];
 
@@ -134,7 +137,9 @@ according to the bits of the flags variable. Get a collection of bits from the
 current time. Use the bottom 16 and just keep re-using them if necessary. When
 not randomizing, initialize the sublists for the bottom-up merge sort. */
 
-if (randomize)
+if (pcount)
+  *pcount = 0;
+else if (randomize)
   resetflags = time(NULL) & 0xFFFF;
 else
    for (i = 0; i < LOG2_MAXNODES; i++)
@@ -168,6 +173,7 @@ for (; i <= *subcount; i++)
   {
   int count = 0;
   int subdirchar = subdirs[i];      /* 0 for main directory */
+  DIR *dd;
 
   if (subdirchar != 0)
     {
@@ -176,12 +182,12 @@ for (; i <= *subcount; i++)
     }
 
   DEBUG(D_queue_run) debug_printf("looking in %s\n", buffer);
-  if (!(dd = opendir(CS buffer)))
+  if (!(dd = exim_opendir(buffer)))
     continue;
 
   /* Now scan the directory. */
 
-  while ((ent = readdir(dd)))
+  for (struct dirent *ent; ent = readdir(dd); )
     {
     uschar *name = US ent->d_name;
     int len = Ustrlen(name);
@@ -204,60 +210,63 @@ for (; i <= *subcount; i++)
 
     if (len == SPOOL_NAME_LENGTH &&
         Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0)
-      {
-      queue_filename *next =
-        store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name));
-      Ustrcpy(next->text, name);
-      next->dir_uschar = subdirchar;
-
-      /* Handle the creation of a randomized list. The first item becomes both
-      the top and bottom of the list. Subsequent items are inserted either at
-      the top or the bottom, randomly. This is, I argue, faster than doing a
-      sort by allocating a random number to each item, and it also saves having
-      to store the number with each item. */
-
-      if (randomize)
-        if (!yield)
-          {
-          next->next = NULL;
-          yield = last = next;
-          }
-        else
-          {
-          if (flags == 0)
-           flags = resetflags;
-          if ((flags & 1) == 0)
-            {
-            next->next = yield;
-            yield = next;
-            }
-          else
-            {
-            next->next = NULL;
-            last->next = next;
-            last = next;
-            }
-          flags = flags >> 1;
-          }
+      if (pcount)
+       (*pcount)++;
+      else
+       {
+       queue_filename * next =
+         store_get(sizeof(queue_filename) + Ustrlen(name), name);
+       Ustrcpy(next->text, name);
+       next->dir_uschar = subdirchar;
+
+       /* Handle the creation of a randomized list. The first item becomes both
+       the top and bottom of the list. Subsequent items are inserted either at
+       the top or the bottom, randomly. This is, I argue, faster than doing a
+       sort by allocating a random number to each item, and it also saves having
+       to store the number with each item. */
+
+       if (randomize)
+         if (!yield)
+           {
+           next->next = NULL;
+           yield = last = next;
+           }
+         else
+           {
+           if (flags == 0)
+             flags = resetflags;
+           if ((flags & 1) == 0)
+             {
+             next->next = yield;
+             yield = next;
+             }
+           else
+             {
+             next->next = NULL;
+             last->next = next;
+             last = next;
+             }
+           flags = flags >> 1;
+           }
 
-      /* Otherwise do a bottom-up merge sort based on the name. */
+       /* Otherwise do a bottom-up merge sort based on the name. */
 
-      else
-        {
-        next->next = NULL;
-        for (int j = 0; j < LOG2_MAXNODES; j++)
-          if (root[j])
-            {
-            next = merge_queue_lists(next, root[j]);
-            root[j] = j == LOG2_MAXNODES - 1 ? next : NULL;
-            }
-          else
-            {
-            root[j] = next;
-            break;
-            }
-        }
-      }
+       else
+         {
+         next->next = NULL;
+         for (int j = 0; j < LOG2_MAXNODES; j++)
+           if (root[j])
+             {
+             next = merge_queue_lists(next, root[j]);
+             root[j] = j == LOG2_MAXNODES - 1 ? next : NULL;
+             }
+           else
+             {
+             root[j] = next;
+             break;
+             }
+         }
+       }
     }
 
   /* Finished with this directory */
@@ -294,7 +303,7 @@ for (; i <= *subcount; i++)
 /* When using a bottom-up merge sort, do the final merging of the sublists.
 Then pass back the final list of file items. */
 
-if (!randomize)
+if (!pcount && !randomize)
   for (i = 0; i < LOG2_MAXNODES; ++i)
     yield = merge_queue_lists(yield, root[i]);
 
@@ -341,11 +350,13 @@ queue_run(uschar *start_id, uschar *stop_id, BOOL recurse)
 {
 BOOL force_delivery = f.queue_run_force || deliver_selectstring != NULL ||
   deliver_selectstring_sender != NULL;
-const pcre *selectstring_regex = NULL;
-const pcre *selectstring_regex_sender = NULL;
+const pcre2_code *selectstring_regex = NULL;
+const pcre2_code *selectstring_regex_sender = NULL;
 uschar *log_detail = NULL;
 int subcount = 0;
 uschar subdirs[64];
+pid_t qpid[4] = {0};   /* Parallelism factor for q2stage 1st phase */
+BOOL single_id = FALSE;
 
 #ifdef MEASURE_TIMING
 report_time_since(&timestamp_startup, US"queue_run start");
@@ -385,12 +396,18 @@ if (!recurse)
     p += sprintf(CS p, " -q%s", extras);
 
   if (deliver_selectstring)
-    p += sprintf(CS p, " -R%s %s", f.deliver_selectstring_regex? "r" : "",
-      deliver_selectstring);
+    {
+    snprintf(CS p, big_buffer_size - (p - big_buffer), " -R%s %s",
+      f.deliver_selectstring_regex? "r" : "", deliver_selectstring);
+    p += Ustrlen(CCS p);
+    }
 
   if (deliver_selectstring_sender)
-    p += sprintf(CS p, " -S%s %s", f.deliver_selectstring_sender_regex? "r" : "",
-      deliver_selectstring_sender);
+    {
+    snprintf(CS p, big_buffer_size - (p - big_buffer), " -S%s %s",
+      f.deliver_selectstring_sender_regex? "r" : "", deliver_selectstring_sender);
+    p += Ustrlen(CCS p);
+    }
 
   log_detail = string_copy(big_buffer);
   if (*queue_name)
@@ -398,6 +415,9 @@ if (!recurse)
       queue_name, log_detail);
   else
     log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail);
+
+  single_id = start_id && stop_id && !f.queue_2stage
+             && Ustrcmp(start_id, stop_id) == 0;
   }
 
 /* If deliver_selectstring is a regex, compile it. */
@@ -441,7 +461,7 @@ for (int i = queue_run_in_order ? -1 : 0;
     }
 
   for (queue_filename * fq = queue_get_spool_list(i, subdirs, &subcount,
-                                              !queue_run_in_order);
+                                            !queue_run_in_order, NULL);
        fq; fq = fq->next)
     {
     pid_t pid;
@@ -468,18 +488,39 @@ for (int i = queue_run_in_order ? -1 : 0;
           (double)load_average/1000.0,
           (double)deliver_queue_load_max/1000.0);
 
+    /* If initial of a 2-phase run, maintain a set of child procs
+    to get disk parallelism */
+
+    if (f.queue_2stage && !queue_run_in_order)
+      {
+      int i;
+      if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1])
+       {
+       DEBUG(D_queue_run) debug_printf("q2stage waiting for child %d\n", (int)qpid[0]);
+       waitpid(qpid[0], NULL, 0);
+       DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
+       if (f.running_in_test_harness) i = 0;
+       else for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
+       qpid[i] = 0;
+       }
+      else
+       for (i = 0; qpid[i]; ) i++;
+      if ((qpid[i] = exim_fork(US"qrun-phase-one")))
+       continue;       /* parent loops around */
+      }
+
     /* Skip this message unless it's within the ID limits */
 
     if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0)
-      continue;
+      goto go_around;
     if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0)
-      continue;
+      goto go_around;
 
     /* Check that the message still exists */
 
     message_subdir[0] = fq->dir_uschar;
     if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0)
-      continue;
+      goto go_around;
 
     /* There are some tests that require the reading of the header file. Ensure
     the store used is scavenged afterwards so that this process doesn't keep
@@ -499,7 +540,7 @@ for (int i = queue_run_in_order ? -1 : 0;
       follow. If the message is chosen for delivery, the header is read again
       in the deliver_message() function, in a subprocess. */
 
-      if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue;
+      if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around;
       f.dont_deliver = orig_dont_deliver;
 
       /* Now decide if we want to deliver this message. As we have read the
@@ -528,9 +569,7 @@ for (int i = queue_run_in_order ? -1 : 0;
 
       else if (  deliver_selectstring_sender
              && !(f.deliver_selectstring_sender_regex
-                 ? (pcre_exec(selectstring_regex_sender, NULL,
-                     CS sender_address, Ustrlen(sender_address), 0, PCRE_EOPT,
-                     NULL, 0) >= 0)
+                 ? regex_match(selectstring_regex_sender, sender_address, -1, NULL)
                  : (strstric(sender_address, deliver_selectstring_sender, FALSE)
                      != NULL)
              )   )
@@ -549,8 +588,7 @@ for (int i = queue_run_in_order ? -1 : 0;
           {
           uschar *address = recipients_list[i].address;
           if (  (f.deliver_selectstring_regex
-               ? (pcre_exec(selectstring_regex, NULL, CS address,
-                    Ustrlen(address), 0, PCRE_EOPT, NULL, 0) >= 0)
+               ? regex_match(selectstring_regex, address, -1, NULL)
                 : (strstric(address, deliver_selectstring, FALSE) != NULL)
                )
              && tree_search(tree_nonrecipients, address) == NULL
@@ -571,7 +609,7 @@ for (int i = queue_run_in_order ? -1 : 0;
 
       spool_clear_header_globals();
       store_reset(reset_point2);
-      if (!wanted) continue;      /* With next message */
+      if (!wanted) goto go_around;      /* With next message */
       }
 
     /* OK, got a message we want to deliver. Create a pipe which will
@@ -616,13 +654,24 @@ for (int i = queue_run_in_order ? -1 : 0;
     report_time_since(&timestamp_startup, US"queue msg selected");
 #endif
 
-    if ((pid = fork()) == 0)
+#ifndef DISABLE_TLS
+    if (!queue_tls_init)
+      {
+      queue_tls_init = TRUE;
+      /* Preload TLS library info for smtp transports.  Once, and only if we
+      have a delivery to do. */
+      tls_client_creds_reload(FALSE);
+      }
+#endif
+
+single_item_retry:
+    if ((pid = exim_fork(US"qrun-delivery")) == 0)
       {
       int rc;
-      testharness_pause_ms(100);
       (void)close(pfd[pipe_read]);
       rc = deliver_message(fq->text, force_delivery, FALSE);
-      exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED);
+      exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED
+               ? EXIT_FAILURE : EXIT_SUCCESS);
       }
     if (pid < 0)
       log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from "
@@ -647,6 +696,18 @@ for (int i = queue_run_in_order ? -1 : 0;
         "queue run: process %d crashed with signal %d while delivering %s",
         (int)pid, status & 0x00ff, fq->text);
 
+    /* If single-item delivery was untried (likely due to locking)
+    retry once after a delay */
+
+    if (status & 0xff00 && single_id)
+      {
+      single_id = FALSE;
+      DEBUG(D_queue_run) debug_printf("qrun single-item pause before retry\n");
+      millisleep(500);
+      DEBUG(D_queue_run) debug_printf("qrun single-item retry after pause\n");
+      goto single_item_retry;
+      }
+
     /* Before continuing, wait till the pipe gets closed at the far end. This
     tells us that any children created by the delivery to re-use any SMTP
     channels have all finished. Since no process actually writes to the pipe,
@@ -660,6 +721,10 @@ for (int i = queue_run_in_order ? -1 : 0;
     (void)close(pfd[pipe_read]);
     set_process_info("running queue");
 
+    /* If initial of a 2-phase run, we are a child - so just exit */
+    if (f.queue_2stage && !queue_run_in_order)
+      exim_exit(EXIT_SUCCESS);
+
     /* If we are in the test harness, and this is not the first of a 2-stage
     queue run, update fudged queue times. */
 
@@ -668,6 +733,14 @@ for (int i = queue_run_in_order ? -1 : 0;
       uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
       if (fqtnext) fudged_queue_times = fqtnext + 1;
       }
+
+
+    continue;
+
+  go_around:
+    /* If initial of a 2-phase run, we are a child - so just exit */
+    if (f.queue_2stage && !queue_run_in_order)
+      exim_exit(EXIT_SUCCESS);
     }                                  /* End loop for list of messages */
 
   tree_nonrecipients = NULL;
@@ -695,6 +768,19 @@ turned off. */
 
 if (f.queue_2stage)
   {
+
+  /* wait for last children */
+  for (int i = 0; i < nelem(qpid); i++)
+    if (qpid[i])
+      {
+      DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]);
+      waitpid(qpid[i], NULL, 0);
+      }
+    else break;
+
+#ifdef MEASURE_TIMING
+  report_time_since(&timestamp_startup, US"queue_run 1st phase done");
+#endif
   f.queue_2stage = FALSE;
   queue_run(start_id, stop_id, TRUE);
   }
@@ -719,26 +805,38 @@ if (!recurse)
 /* Called as a result of -bpc
 
 Arguments:  none
-Returns:    nothing
+Returns:    count
 */
 
-void
+unsigned
 queue_count(void)
 {
 int subcount;
-int count = 0;
+unsigned count = 0;
 uschar subdirs[64];
 
-for (queue_filename *f = queue_get_spool_list(
-                               -1,             /* entire queue */
-                               subdirs,        /* for holding sub list */
-                               &subcount,      /* for subcount */
-                               FALSE);         /* not random */
-    f; f = f->next) count++;
-fprintf(stdout, "%d\n", count);
+(void) queue_get_spool_list(-1,                /* entire queue */
+                       subdirs,        /* for holding sub list */
+                       &subcount,      /* for subcount */
+                       FALSE,          /* not random */
+                       &count);        /* just get the count */
+return count;
 }
 
 
+#define QUEUE_SIZE_AGE 60      /* update rate for queue_size */
+
+unsigned
+queue_count_cached(void)
+{
+time_t now;
+if ((now = time(NULL)) >= queue_size_next)
+  {
+  queue_size = queue_count();
+  queue_size_next = now + (f.running_in_test_harness ? 3 : QUEUE_SIZE_AGE);
+  }
+return queue_size;
+}
 
 /************************************************
 *          List extra deliveries                *
@@ -803,8 +901,8 @@ if (count > 0)
   queue_filename *last = NULL;
   for (int i = 0; i < count; i++)
     {
-    queue_filename *next =
-      store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2, is_tainted(list[i]));
+    queue_filename * next =
+      store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2, list[i]);
     sprintf(CS next->text, "%s-H", list[i]);
     next->dir_uschar = '*';
     next->next = NULL;
@@ -820,7 +918,8 @@ else
           -1,             /* entire queue */
           subdirs,        /* for holding sub list */
           &subcount,      /* for subcount */
-          option >= 8);   /* randomize if required */
+          option >= 8,   /* randomize if required */
+         NULL);          /* don't just count */
 
 if (option >= 8) option -= 8;
 
@@ -1249,15 +1348,15 @@ switch(action)
            deliver_domain = dom
              ? CUS string_copyn(addr+dom, end - dom) : CUS"";
 
-           event_raise(event_action, US"msg:fail:internal",
-             string_sprintf("message removed by %s", username));
+           (void) event_raise(event_action, US"msg:fail:internal",
+             string_sprintf("message removed by %s", username), NULL);
 
            deliver_localpart = save_local;
            deliver_domain = save_domain;
            }
          }
        }
-      (void) event_raise(event_action, US"msg:complete", NULL);
+      (void) event_raise(event_action, US"msg:complete", NULL, NULL);
 #endif
       log_write(0, LOG_MAIN, "removed by %s", username);
       log_write(0, LOG_MAIN, "Completed");
@@ -1327,13 +1426,13 @@ switch(action)
       parse_extract_address(argv[recipients_arg], &errmess, &start, &end,
         &domain, (action == MSG_EDIT_SENDER));
 
-    if (recipient == NULL)
+    if (!recipient)
       {
       yield = FALSE;
       printf("- error while %s:\n  bad address %s: %s\n",
         doing, argv[recipients_arg], errmess);
       }
-    else if (recipient[0] != 0 && domain == 0)
+    else if (*recipient && domain == 0)
       {
       yield = FALSE;
       printf("- error while %s:\n  bad address %s: "
@@ -1421,34 +1520,68 @@ queue_check_only(void)
 {
 int sep = 0;
 struct stat statbuf;
-const uschar *s;
-uschar *ss;
-uschar buffer[1024];
+const uschar * s = queue_only_file;
+uschar * ss;
 
-if (queue_only_file == NULL) return;
-
-s = queue_only_file;
-while ((ss = string_nextinlist(&s, &sep, buffer, sizeof(buffer))) != NULL)
-  {
-  if (Ustrncmp(ss, "smtp", 4) == 0)
-    {
-    ss += 4;
-    if (Ustat(ss, &statbuf) == 0)
-      {
-      f.queue_smtp = TRUE;
-      DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss);
-      }
-    }
-  else
-    {
-    if (Ustat(ss, &statbuf) == 0)
+if (s)
+  while ((ss = string_nextinlist(&s, &sep, NULL, 0)))
+    if (Ustrncmp(ss, "smtp", 4) == 0)
       {
-      queue_only = TRUE;
-      DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss);
+      ss += 4;
+      if (Ustat(ss, &statbuf) == 0)
+       {
+       f.queue_smtp = TRUE;
+       DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss);
+       }
       }
-    }
+    else
+      if (Ustat(ss, &statbuf) == 0)
+       {
+       queue_only = TRUE;
+       DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss);
+       }
+}
+
+
+
+/******************************************************************************/
+/******************************************************************************/
+
+#ifndef DISABLE_QUEUE_RAMP
+void
+queue_notify_daemon(const uschar * msgid)
+{
+uschar buf[MESSAGE_ID_LENGTH + 2];
+int fd;
+
+DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid);
+
+buf[0] = NOTIFY_MSG_QRUN;
+memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1);
+
+if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0)
+  {
+  struct sockaddr_un sa_un = {.sun_family = AF_UNIX};
+
+#ifdef EXIM_HAVE_ABSTRACT_UNIX_SOCKETS
+  int len = offsetof(struct sockaddr_un, sun_path) + 1
+    + snprintf(sa_un.sun_path+1, sizeof(sa_un.sun_path)-1, "%s",
+               expand_string(notifier_socket));
+  sa_un.sun_path[0] = 0;
+#else
+  int len = offsetof(struct sockaddr_un, sun_path)
+    + snprintf(sa_un.sun_path, sizeof(sa_un.sun_path), "%s",
+               expand_string(notifier_socket));
+#endif
+
+  if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sa_un, len) < 0)
+    DEBUG(D_queue_run)
+      debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno));
+  close(fd);
   }
+else DEBUG(D_queue_run) debug_printf(" socket: %s\n", strerror(errno));
 }
+#endif
 
 #endif /*!COMPILE_UTILITY*/