Named queues: support multiple queue-runners from single daemon
[exim.git] / src / src / daemon.c
index 7666626ed39a7a4ee9f1833fb0e95ff3178523e3..b0533c28f41701a0bcc4af6249574e5acfb91379 100644 (file)
@@ -3,7 +3,7 @@
 *************************************************/
 
 /* Copyright (c) The Exim Maintainers 2020 - 2022 */
-/* Copyright (c) University of Cambridge 1995 - 2018 */
+/* Copyright (c) University of Cambridge 1995 - 2023 */
 /* See the file NOTICE for conditions of use and distribution. */
 /* SPDX-License-Identifier: GPL-2.0-or-later */
 
 /* Structure for holding data for each SMTP connection */
 
 typedef struct smtp_slot {
-  pid_t pid;                       /* pid of the spawned reception process */
-  uschar *host_address;            /* address of the client host */
+  pid_t                pid;            /* pid of the spawned reception process */
+  uschar *     host_address;   /* address of the client host */
 } smtp_slot;
 
+typedef struct runner_slot {
+  pid_t                pid;            /* pid of spawned queue-runner process */
+  const uschar *queue_name;    /* pointer to the name in the qrunner struct */
+} runner_slot;
+
 /* An empty slot for initializing (Standard C does not allow constructor
 expressions in assignments except as initializers in declarations). */
 
 static smtp_slot empty_smtp_slot = { .pid = 0, .host_address = NULL };
 
-
-
 /*************************************************
 *               Local static variables           *
 *************************************************/
@@ -39,9 +42,11 @@ static int   accept_retry_count = 0;
 static int   accept_retry_errno;
 static BOOL  accept_retry_select_failed;
 
-static int   queue_run_count = 0;
-static pid_t *queue_pid_slots = NULL;
-static smtp_slot *smtp_slots = NULL;
+static int   queue_run_count = 0;      /* current runners */
+
+static unsigned queue_runner_slot_count = 0;
+static runner_slot * queue_runner_slots = NULL;
+static smtp_slot * smtp_slots = NULL;
 
 static BOOL  write_pid = TRUE;
 
@@ -920,19 +925,30 @@ while ((pid = waitpid(-1, &status, WNOHANG)) > 0)
   /* If it wasn't an accepting process, see if it was a queue-runner
   process that we are tracking. */
 
-  if (queue_pid_slots)
-    {
-    int max = atoi(CS expand_string(queue_run_max));
-    for (int i = 0; i < max; i++)
-      if (queue_pid_slots[i] == pid)
+  if (queue_runner_slots)
+    for (unsigned i = 0; i < queue_runner_slot_count; i++)
+      {
+      runner_slot * r = queue_runner_slots + i;
+      if (r->pid == pid)
         {
-        queue_pid_slots[i] = 0;
+        r->pid = 0;                    /* free up the slot */
+
         if (--queue_run_count < 0) queue_run_count = 0;
         DEBUG(D_any) debug_printf("%d queue-runner process%s now running\n",
-          queue_run_count, (queue_run_count == 1)? "" : "es");
+          queue_run_count, queue_run_count == 1 ? "" : "es");
+
+       for (qrunner ** p = &qrunners, * q = qrunners; q; p = &q->next, q = *p)
+         if (q->name == r->queue_name)
+           {
+           if (q->interval)            /* a periodic queue run */
+             q->run_count--;
+           else                        /* a one-time run */
+             *p = q->next;             /* drop this qrunner */
+           break;
+           }
         break;
         }
-    }
+      }
   }
 }
 
@@ -1232,7 +1248,11 @@ bad:
 }
 
 
+/* Data for notifier-triggered queue runs */
+
 static uschar queuerun_msgid[MESSAGE_ID_LENGTH+1];
+static const uschar * queuerun_msg_qname;
+
 
 /* The notifier socket has something to read. Pull the message from it, decode
 and do the action.
@@ -1320,7 +1340,12 @@ switch (buf[0])
     /* this should be a message_id */
     DEBUG(D_queue_run)
       debug_printf("%s: qrunner trigger: %s\n", __FUNCTION__, buf+1);
+
     memcpy(queuerun_msgid, buf+1, MESSAGE_ID_LENGTH+1);
+
+    for (qrunner * q = qrunners; q; q = q->next)
+      if (Ustrcmp(q->name, buf+1+MESSAGE_ID_LENGTH+1) == 0)
+       { queuerun_msg_qname = q->name; break; }
     return TRUE;
 #endif
 
@@ -1383,7 +1408,45 @@ ALARM(resignal_interval);
 }
 
 
-static void
+/* Re-sort the qrunners list, and return the shortest interval.
+That could be negatime.
+The next-tick times should have been updated by any runs initiated,
+though will not be when the global limit on runners was reached.
+
+Unlikely to have many queues, so insertion-sort.
+*/
+
+static int
+next_qrunner_interval(void)
+{
+qrunner * sorted = NULL;
+for (qrunner * q = qrunners, * next; q; q = next)
+  {
+  next = q->next;
+  q->next = NULL;
+  if (sorted)
+    {
+    qrunner ** p = &sorted;
+    for (qrunner * qq; qq = *p; p = &(qq->next))
+      if (  q->next_tick < qq->next_tick
+        || q->next_tick == qq->next_tick && q->interval < qq->interval
+        )
+       {
+       *p = q;
+       q->next = qq;
+       goto INSERTED;
+       }
+    *p = q;
+  INSERTED: ;
+    }
+  else
+    sorted = q;
+  }
+qrunners = sorted;
+return qrunners ? qrunners->next_tick - time(NULL) : 0;
+}
+
+static int
 daemon_qrun(int local_queue_run_max, struct pollfd * fd_polls, int listen_socket_count)
 {
 DEBUG(D_any) debug_printf("%s received\n",
@@ -1392,140 +1455,207 @@ DEBUG(D_any) debug_printf("%s received\n",
 #endif
   "SIGALRM");
 
-/* Do a full queue run in a child process, if required, unless we already
-have enough queue runners on the go. If we are not running as root, a
-re-exec is required. */
-
-if (  queue_interval > 0
-   && (local_queue_run_max <= 0 || queue_run_count < local_queue_run_max))
-  {
-pid_t pid;
+/* Do a full queue run in a child process, if required, unless we already have
+enough queue runners on the go. If we are not running as root, a re-exec is
+required. In the calling process, restart the alamr timer for the next run.  */
 
-  if ((pid = exim_fork(US"queue-runner")) == 0)
+if (is_multiple_qrun())
+  if (local_queue_run_max <= 0 || queue_run_count < local_queue_run_max)
     {
-    /* Disable debugging if it's required only for the daemon process. We
-    leave the above message, because it ties up with the "child ended"
-    debugging messages. */
+    qrunner * q = NULL;
+
+#ifndef DISABLE_QUEUE_RAMP
+    if (*queuerun_msgid)       /* See if we can start another runner for this queue */
+      {
+      for (qrunner * qq = qrunners; qq; qq = qq->next)
+       if (qq->name == queuerun_msg_qname)
+         {
+         q = qq->run_count < qq->run_max ? qq : NULL;
+         break;
+         }
+      }
+    else
+#endif
+      /* In order of run priority, find the first queue for which we can start
+      a runner */
 
-    if (f.debug_daemon) debug_selector = 0;
+      for (q = qrunners; q; q = q->next)
+       if (q->run_count < q->run_max) break;
+
+    if (q)
+      {
+      pid_t pid;
 
-    /* Close any open listening sockets in the child */
+      /* Bump this queue's next-tick by it's interval */
 
-    close_daemon_sockets(daemon_notifier_fd,
-      fd_polls, listen_socket_count);
+      if (q->interval)
+       {
+       time_t now = time(NULL);
+       do ; while ((q->next_tick += q->interval) <= now);
+       }
 
-    /* Reset SIGHUP and SIGCHLD in the child in both cases. */
+      if ((pid = exim_fork(US"queue-runner")) == 0)
+       {
+       /* Disable debugging if it's required only for the daemon process. We
+       leave the above message, because it ties up with the "child ended"
+       debugging messages. */
 
-    signal(SIGHUP,  SIG_DFL);
-    signal(SIGCHLD, SIG_DFL);
-    signal(SIGTERM, SIG_DFL);
-    signal(SIGINT, SIG_DFL);
+       if (f.debug_daemon) debug_selector = 0;
 
-    /* Re-exec if privilege has been given up, unless deliver_drop_
-    privilege is set. Reset SIGALRM before exec(). */
+       /* Close any open listening sockets in the child */
 
-    if (geteuid() != root_uid && !deliver_drop_privilege)
-      {
-      uschar opt[8];
-      uschar *p = opt;
-      uschar *extra[7];
-      int extracount = 1;
-
-      signal(SIGALRM, SIG_DFL);
-      *p++ = '-';
-      *p++ = 'q';
-      if (  f.queue_2stage
+       close_daemon_sockets(daemon_notifier_fd,
+         fd_polls, listen_socket_count);
+
+       /* Reset SIGHUP and SIGCHLD in the child in both cases. */
+
+       signal(SIGHUP,  SIG_DFL);
+       signal(SIGCHLD, SIG_DFL);
+       signal(SIGTERM, SIG_DFL);
+       signal(SIGINT, SIG_DFL);
+
+       /* Re-exec if privilege has been given up, unless deliver_drop_
+       privilege is set. Reset SIGALRM before exec(). */
+
+       if (geteuid() != root_uid && !deliver_drop_privilege)
+         {
+         uschar opt[8];
+         uschar *p = opt;
+         uschar *extra[7];
+         int extracount = 1;
+
+         signal(SIGALRM, SIG_DFL);
+         queue_name = US"";
+
+         *p++ = '-';
+         *p++ = 'q';
+         if (  q->queue_2stage
 #ifndef DISABLE_QUEUE_RAMP
-        && !*queuerun_msgid
+            && !*queuerun_msgid
 #endif
-        ) *p++ = 'q';
-      if (f.queue_run_first_delivery) *p++ = 'i';
-      if (f.queue_run_force) *p++ = 'f';
-      if (f.deliver_force_thaw) *p++ = 'f';
-      if (f.queue_run_local) *p++ = 'l';
-      *p = 0;
-      extra[0] = *queue_name
-       ? string_sprintf("%sG%s", opt, queue_name) : opt;
+            ) *p++ = 'q';
+         if (q->queue_run_first_delivery) *p++ = 'i';
+         if (q->queue_run_force) *p++ = 'f';
+         if (q->deliver_force_thaw) *p++ = 'f';
+         if (q->queue_run_local) *p++ = 'l';
+         *p = 0;
+
+         extra[0] = q->name
+           ? string_sprintf("%sG%s", opt, q->name) : opt;
 
 #ifndef DISABLE_QUEUE_RAMP
-      if (*queuerun_msgid)
-       {
-       log_write(0, LOG_MAIN, "notify triggered queue run");
-       extra[extracount++] = queuerun_msgid;   /* Trigger only the */
-       extra[extracount++] = queuerun_msgid;   /* one message      */
-       }
+         if (*queuerun_msgid)
+           {
+           log_write(0, LOG_MAIN, "notify triggered queue run");
+           extra[extracount++] = queuerun_msgid;       /* Trigger only the */
+           extra[extracount++] = queuerun_msgid;       /* one message      */
+           }
 #endif
 
-      /* If -R or -S were on the original command line, ensure they get
-      passed on. */
+         /* If -R or -S were on the original command line, ensure they get
+         passed on. */
 
-      if (deliver_selectstring)
-       {
-       extra[extracount++] = f.deliver_selectstring_regex ? US"-Rr" : US"-R";
-       extra[extracount++] = deliver_selectstring;
-       }
+         if (deliver_selectstring)
+           {
+           extra[extracount++] = f.deliver_selectstring_regex ? US"-Rr" : US"-R";
+           extra[extracount++] = deliver_selectstring;
+           }
 
-      if (deliver_selectstring_sender)
-       {
-       extra[extracount++] = f.deliver_selectstring_sender_regex
-         ? US"-Sr" : US"-S";
-       extra[extracount++] = deliver_selectstring_sender;
-       }
+         if (deliver_selectstring_sender)
+           {
+           extra[extracount++] = f.deliver_selectstring_sender_regex
+             ? US"-Sr" : US"-S";
+           extra[extracount++] = deliver_selectstring_sender;
+           }
 
-      /* Overlay this process with a new execution. */
+         /* Overlay this process with a new execution. */
 
-      (void)child_exec_exim(CEE_EXEC_PANIC, FALSE, NULL, FALSE, extracount,
-       extra[0], extra[1], extra[2], extra[3], extra[4], extra[5], extra[6]);
+         (void)child_exec_exim(CEE_EXEC_PANIC, FALSE, NULL, FALSE, extracount,
+           extra[0], extra[1], extra[2], extra[3], extra[4], extra[5], extra[6]);
 
-      /* Control never returns here. */
-      }
+         /* Control never returns here. */
+         }
 
-    /* No need to re-exec; SIGALRM remains set to the default handler */
+       /* No need to re-exec; SIGALRM remains set to the default handler */
 
 #ifndef DISABLE_QUEUE_RAMP
-    if (*queuerun_msgid)
-      {
-      log_write(0, LOG_MAIN, "notify triggered queue run");
-      f.queue_2stage = FALSE;
-      queue_run(queuerun_msgid, queuerun_msgid, FALSE);
-      }
-    else
+       if (*queuerun_msgid)
+         {
+         log_write(0, LOG_MAIN, "notify triggered queue run");
+         f.queue_2stage = FALSE;
+         queue_run(q, queuerun_msgid, queuerun_msgid, FALSE);
+         }
+       else
 #endif
-      queue_run(NULL, NULL, FALSE);
-    exim_underbar_exit(EXIT_SUCCESS);
-    }
+         queue_run(q, NULL, NULL, FALSE);
+       exim_underbar_exit(EXIT_SUCCESS);
+       }
 
-  if (pid < 0)
-    {
-    log_write(0, LOG_MAIN|LOG_PANIC, "daemon: fork of queue-runner "
-      "process failed: %s", strerror(errno));
-    log_close_all();
-    }
-  else
-    {
-    for (int i = 0; i < local_queue_run_max; ++i)
-      if (queue_pid_slots[i] <= 0)
+      if (pid < 0)
        {
-       queue_pid_slots[i] = pid;
-       queue_run_count++;
-       break;
+       log_write(0, LOG_MAIN|LOG_PANIC, "daemon: fork of queue-runner "
+         "process failed: %s", strerror(errno));
+       log_close_all();
        }
-    DEBUG(D_any) debug_printf("%d queue-runner process%s running\n",
-      queue_run_count, queue_run_count == 1 ? "" : "es");
+      else
+       {
+       for (int i = 0; i < local_queue_run_max; ++i)
+         if (queue_runner_slots[i].pid <= 0)
+           {
+           queue_runner_slots[i].pid = pid;
+           queue_runner_slots[i].queue_name = q->name;
+           q->run_count++;
+           queue_run_count++;
+           break;
+           }
+       DEBUG(D_any) debug_printf("%d queue-runner process%s running\n",
+         queue_run_count, queue_run_count == 1 ? "" : "es");
+       }
+      }
     }
-  }
-
-/* Reset the alarm clock */
 
 sigalrm_seen = FALSE;
 #ifndef DISABLE_QUEUE_RAMP
-if (*queuerun_msgid)
+if (*queuerun_msgid)           /* it was a fast-ramp kick */
   *queuerun_msgid = 0;
-else
+else                           /* periodic or one-time queue run */
 #endif
-  ALARM(queue_interval);
+  {            /* Impose a minimum 1s tick, even when a run was outstanding */
+  int interval = next_qrunner_interval();
+  if (interval <= 0) interval = 1;
+
+  if (qrunners)                        /* there are still periodic qrunners */
+    {
+    ALARM(interval);
+    return interval;
+    }
+  }
+return 0;
 }
 
+
+
+
+const uschar *
+describe_queue_runners(void)
+{
+gstring * g = NULL;
+
+if (!is_multiple_qrun()) return US"no queue runs";
+
+for (qrunner * q = qrunners; q; q = q->next)
+  {
+  g = string_catn(g, US"-q", 2);
+  if (q->name) g = string_append(g, 3, US"G", q->name, US"/");
+  g = string_cat(g, readconf_printtime(q->interval));
+  g = string_catn(g, US" ", 1);
+  }
+gstring_trim(g, 1);
+gstring_release_unused(g);
+return string_from_gstring(g);
+}
+
+
 /*************************************************
 *              Exim Daemon Mainline              *
 *************************************************/
@@ -1557,7 +1687,32 @@ struct pollfd * fd_polls, * tls_watch_poll = NULL, * dnotify_poll = NULL;
 int listen_socket_count = 0, poll_fd_count;
 ip_address_item * addresses = NULL;
 time_t last_connection_time = (time_t)0;
-int local_queue_run_max = atoi(CS expand_string(queue_run_max));
+int local_queue_run_max = 0;
+BOOL queue_run_max_has_dollar;
+
+if (is_multiple_qrun())
+
+  /* Nuber of runner-tracking structs needed:  If the option queue_run_max has
+  no expandable elements then it is the overall maximum; else we assume it
+  depends on the queue name, and add them up to get the maximum.
+  Evaluate both that and the individual limits. */
+
+  if (Ustrchr(queue_run_max, '$') != NULL)
+    {
+    for (qrunner * q = qrunners; q; q = q->next)
+      {
+      queue_name = q->name;
+      local_queue_run_max +=
+       (q->run_max = atoi(CS expand_string(queue_run_max)));
+      }
+    queue_name = US"";
+    }
+  else
+    {
+    local_queue_run_max = atoi(CS expand_string(queue_run_max));
+    for (qrunner * q = qrunners; q; q = q->next)
+      q->run_max = local_queue_run_max;
+    }
 
 process_purpose = US"daemon";
 
@@ -2216,10 +2371,11 @@ originator_login = (pw = getpwuid(exim_uid))
 /* Get somewhere to keep the list of queue-runner pids if we are keeping track
 of them (and also if we are doing queue runs). */
 
-if (queue_interval > 0 && local_queue_run_max > 0)
+if (is_multiple_qrun() && local_queue_run_max > 0)
   {
-  queue_pid_slots = store_get(local_queue_run_max * sizeof(pid_t), GET_UNTAINTED);
-  for (int i = 0; i < local_queue_run_max; i++) queue_pid_slots[i] = 0;
+  queue_runner_slot_count = local_queue_run_max;
+  queue_runner_slots = store_get(local_queue_run_max * sizeof(runner_slot), GET_UNTAINTED);
+  memset(queue_runner_slots, 0, local_queue_run_max * sizeof(runner_slot));
   }
 
 /* Set up the handler for termination of child processes, and the one
@@ -2233,9 +2389,12 @@ os_non_restarting_signal(SIGTERM, main_sigterm_handler);
 os_non_restarting_signal(SIGINT, main_sigterm_handler);
 
 /* If we are to run the queue periodically, pretend the alarm has just gone
-off. This will cause the first queue-runner to get kicked off straight away. */
+off. This will cause the first queue-runner to get kicked off straight away.
+Get an initial sort of the list of queues, to prioritize the initial q-runs */
 
-sigalrm_seen = (queue_interval > 0);
+
+if ((sigalrm_seen = is_multiple_qrun()))
+  (void) next_qrunner_interval();
 
 /* Log the start up of a daemon - at least one of listening or queue running
 must be set up. */
@@ -2264,20 +2423,16 @@ else if (f.daemon_listen)
   int smtps_ports = 0;
   ip_address_item * ipa;
   uschar * p;
-  uschar * qinfo = queue_interval > 0
-    ? string_sprintf("-q%s%s",
-       f.queue_2stage ? "q" : "", readconf_printtime(queue_interval))
-    : US"no queue runs";
+  const uschar * qinfo = describe_queue_runners();
 
   /* Build a list of listening addresses in big_buffer, but limit it to 10
   items. The style is for backwards compatibility.
 
-  It is now possible to have some ports listening for SMTPS (the old,
-  deprecated protocol that starts TLS without using STARTTLS), and others
-  listening for standard SMTP. Keep their listings separate. */
+  It is possible to have some ports listening for SMTPS (as opposed to TLS
+  startted by STARTTLS), and others listening for standard SMTP. Keep their
+  listings separate. */
 
   for (int j = 0, i; j < 2; j++)
-    {
     for (i = 0, ipa = addresses; i < 10 && ipa; i++, ipa = ipa->next)
       {
       /* First time round, look for SMTP ports; second time round, look for
@@ -2315,7 +2470,7 @@ else if (f.daemon_listen)
               && Ustrcmp(ipa->address, i2->address) == 0
               )
              {                         /* found; append port to list */
-             for (p = i2->log; *p; ) p++;      /* end of existing string */
+             for (p = i2->log; *p; ) p++;      /* end of existing string   { */
              if (*--p == '}') *p = '\0';       /* drop EOL */
              while (isdigit(*--p)) ;           /* char before port */
 
@@ -2331,7 +2486,6 @@ else if (f.daemon_listen)
          }
        }
       }
-    }
 
   p = big_buffer;
   for (int j = 0, i; j < 2; j++)
@@ -2367,11 +2521,9 @@ else if (f.daemon_listen)
     version_string, qinfo, big_buffer);
   }
 
-else
+else   /* no listening sockets, only queue-runs */
   {
-  uschar * s = *queue_name
-    ? string_sprintf("-qG%s/%s", queue_name, readconf_printtime(queue_interval))
-    : string_sprintf("-q%s", readconf_printtime(queue_interval));
+  const uschar * s = describe_queue_runners();
   log_write(0, LOG_MAIN,
     "exim %s daemon started: pid=%d, %s, not listening for SMTP",
     version_string, getpid(), s);
@@ -2445,6 +2597,8 @@ report_time_since(&timestamp_startup, US"daemon loop start");     /* testcase 0022 *
 
 for (;;)
   {
+  int nolisten_sleep = 60;
+
   if (sigterm_seen)
     daemon_die();      /* Does not return */
 
@@ -2458,7 +2612,8 @@ for (;;)
     if (inetd_wait_timeout > 0)
       daemon_inetd_wtimeout(last_connection_time);     /* Might not return */
     else
-      daemon_qrun(local_queue_run_max, fd_polls, listen_socket_count);
+      nolisten_sleep =
+       daemon_qrun(local_queue_run_max, fd_polls, listen_socket_count);
 
 
   /* Sleep till a connection happens if listening, and handle the connection if
@@ -2663,7 +2818,7 @@ for (;;)
   else
     {
     struct pollfd p;
-    poll(&p, 0, queue_interval * 1000);
+    poll(&p, 0, nolisten_sleep * 1000);
     handle_ending_processes();
     }