Named queues: support multiple queue-runners from single daemon
authorJeremy Harris <jgh146exb@wizmail.org>
Mon, 13 Feb 2023 11:34:38 +0000 (11:34 +0000)
committerJeremy Harris <jgh146exb@wizmail.org>
Mon, 13 Feb 2023 11:34:38 +0000 (11:34 +0000)
12 files changed:
doc/doc-docbook/spec.xfpt
doc/doc-txt/NewStuff
src/src/daemon.c
src/src/exim.c
src/src/functions.h
src/src/globals.c
src/src/globals.h
src/src/macros.h
src/src/queue.c
src/src/structs.h
test/scripts/0000-Basic/0576
test/stdout/0576

index fd2b47f22722867095f89d94cb8313d9ae383a89..6199b5d8946e0fc071b86523f6438b48025c9ee2 100644 (file)
@@ -4615,6 +4615,15 @@ combined daemon at system boot time is to use a command such as
 Such a daemon listens for incoming SMTP calls, and also starts a queue runner
 process every 30 minutes.
 
+.new
+.cindex "named queues" "queue runners"
+It is possible to set up runners for multiple named queues within one daemon,
+For example:
+.code
+exim -qGhipri/2m -q10m -qqGmailinglist/1h
+.endd
+.wen
+
 When a daemon is started by &%-q%& with a time value, but without &%-bd%&, no
 pid file is written unless one is explicitly requested by the &%-oP%& option.
 
index c1e139e351c869000c09cb827354e8d31747c484..adad7ec57f7afe45e436d0e4660c71cfcf728470 100644 (file)
@@ -19,7 +19,9 @@ Version 4.97
 
  5. The smtp transport option "max_rcpt" is now expanded before use.
 
- 6. The tls_eccurve option for OpenSSL now takes a list of group names
+ 6. The tls_eccurve option for OpenSSL now takes a list of group names.
+
+ 7. Queue runners for several queues can now be started from one daemon.
 
 Version 4.96
 ------------
index 7666626ed39a7a4ee9f1833fb0e95ff3178523e3..b0533c28f41701a0bcc4af6249574e5acfb91379 100644 (file)
@@ -3,7 +3,7 @@
 *************************************************/
 
 /* Copyright (c) The Exim Maintainers 2020 - 2022 */
-/* Copyright (c) University of Cambridge 1995 - 2018 */
+/* Copyright (c) University of Cambridge 1995 - 2023 */
 /* See the file NOTICE for conditions of use and distribution. */
 /* SPDX-License-Identifier: GPL-2.0-or-later */
 
 /* Structure for holding data for each SMTP connection */
 
 typedef struct smtp_slot {
-  pid_t pid;                       /* pid of the spawned reception process */
-  uschar *host_address;            /* address of the client host */
+  pid_t                pid;            /* pid of the spawned reception process */
+  uschar *     host_address;   /* address of the client host */
 } smtp_slot;
 
+typedef struct runner_slot {
+  pid_t                pid;            /* pid of spawned queue-runner process */
+  const uschar *queue_name;    /* pointer to the name in the qrunner struct */
+} runner_slot;
+
 /* An empty slot for initializing (Standard C does not allow constructor
 expressions in assignments except as initializers in declarations). */
 
 static smtp_slot empty_smtp_slot = { .pid = 0, .host_address = NULL };
 
-
-
 /*************************************************
 *               Local static variables           *
 *************************************************/
@@ -39,9 +42,11 @@ static int   accept_retry_count = 0;
 static int   accept_retry_errno;
 static BOOL  accept_retry_select_failed;
 
-static int   queue_run_count = 0;
-static pid_t *queue_pid_slots = NULL;
-static smtp_slot *smtp_slots = NULL;
+static int   queue_run_count = 0;      /* current runners */
+
+static unsigned queue_runner_slot_count = 0;
+static runner_slot * queue_runner_slots = NULL;
+static smtp_slot * smtp_slots = NULL;
 
 static BOOL  write_pid = TRUE;
 
@@ -920,19 +925,30 @@ while ((pid = waitpid(-1, &status, WNOHANG)) > 0)
   /* If it wasn't an accepting process, see if it was a queue-runner
   process that we are tracking. */
 
-  if (queue_pid_slots)
-    {
-    int max = atoi(CS expand_string(queue_run_max));
-    for (int i = 0; i < max; i++)
-      if (queue_pid_slots[i] == pid)
+  if (queue_runner_slots)
+    for (unsigned i = 0; i < queue_runner_slot_count; i++)
+      {
+      runner_slot * r = queue_runner_slots + i;
+      if (r->pid == pid)
         {
-        queue_pid_slots[i] = 0;
+        r->pid = 0;                    /* free up the slot */
+
         if (--queue_run_count < 0) queue_run_count = 0;
         DEBUG(D_any) debug_printf("%d queue-runner process%s now running\n",
-          queue_run_count, (queue_run_count == 1)? "" : "es");
+          queue_run_count, queue_run_count == 1 ? "" : "es");
+
+       for (qrunner ** p = &qrunners, * q = qrunners; q; p = &q->next, q = *p)
+         if (q->name == r->queue_name)
+           {
+           if (q->interval)            /* a periodic queue run */
+             q->run_count--;
+           else                        /* a one-time run */
+             *p = q->next;             /* drop this qrunner */
+           break;
+           }
         break;
         }
-    }
+      }
   }
 }
 
@@ -1232,7 +1248,11 @@ bad:
 }
 
 
+/* Data for notifier-triggered queue runs */
+
 static uschar queuerun_msgid[MESSAGE_ID_LENGTH+1];
+static const uschar * queuerun_msg_qname;
+
 
 /* The notifier socket has something to read. Pull the message from it, decode
 and do the action.
@@ -1320,7 +1340,12 @@ switch (buf[0])
     /* this should be a message_id */
     DEBUG(D_queue_run)
       debug_printf("%s: qrunner trigger: %s\n", __FUNCTION__, buf+1);
+
     memcpy(queuerun_msgid, buf+1, MESSAGE_ID_LENGTH+1);
+
+    for (qrunner * q = qrunners; q; q = q->next)
+      if (Ustrcmp(q->name, buf+1+MESSAGE_ID_LENGTH+1) == 0)
+       { queuerun_msg_qname = q->name; break; }
     return TRUE;
 #endif
 
@@ -1383,7 +1408,45 @@ ALARM(resignal_interval);
 }
 
 
-static void
+/* Re-sort the qrunners list, and return the shortest interval.
+That could be negatime.
+The next-tick times should have been updated by any runs initiated,
+though will not be when the global limit on runners was reached.
+
+Unlikely to have many queues, so insertion-sort.
+*/
+
+static int
+next_qrunner_interval(void)
+{
+qrunner * sorted = NULL;
+for (qrunner * q = qrunners, * next; q; q = next)
+  {
+  next = q->next;
+  q->next = NULL;
+  if (sorted)
+    {
+    qrunner ** p = &sorted;
+    for (qrunner * qq; qq = *p; p = &(qq->next))
+      if (  q->next_tick < qq->next_tick
+        || q->next_tick == qq->next_tick && q->interval < qq->interval
+        )
+       {
+       *p = q;
+       q->next = qq;
+       goto INSERTED;
+       }
+    *p = q;
+  INSERTED: ;
+    }
+  else
+    sorted = q;
+  }
+qrunners = sorted;
+return qrunners ? qrunners->next_tick - time(NULL) : 0;
+}
+
+static int
 daemon_qrun(int local_queue_run_max, struct pollfd * fd_polls, int listen_socket_count)
 {
 DEBUG(D_any) debug_printf("%s received\n",
@@ -1392,140 +1455,207 @@ DEBUG(D_any) debug_printf("%s received\n",
 #endif
   "SIGALRM");
 
-/* Do a full queue run in a child process, if required, unless we already
-have enough queue runners on the go. If we are not running as root, a
-re-exec is required. */
-
-if (  queue_interval > 0
-   && (local_queue_run_max <= 0 || queue_run_count < local_queue_run_max))
-  {
-pid_t pid;
+/* Do a full queue run in a child process, if required, unless we already have
+enough queue runners on the go. If we are not running as root, a re-exec is
+required. In the calling process, restart the alamr timer for the next run.  */
 
-  if ((pid = exim_fork(US"queue-runner")) == 0)
+if (is_multiple_qrun())
+  if (local_queue_run_max <= 0 || queue_run_count < local_queue_run_max)
     {
-    /* Disable debugging if it's required only for the daemon process. We
-    leave the above message, because it ties up with the "child ended"
-    debugging messages. */
+    qrunner * q = NULL;
+
+#ifndef DISABLE_QUEUE_RAMP
+    if (*queuerun_msgid)       /* See if we can start another runner for this queue */
+      {
+      for (qrunner * qq = qrunners; qq; qq = qq->next)
+       if (qq->name == queuerun_msg_qname)
+         {
+         q = qq->run_count < qq->run_max ? qq : NULL;
+         break;
+         }
+      }
+    else
+#endif
+      /* In order of run priority, find the first queue for which we can start
+      a runner */
 
-    if (f.debug_daemon) debug_selector = 0;
+      for (q = qrunners; q; q = q->next)
+       if (q->run_count < q->run_max) break;
+
+    if (q)
+      {
+      pid_t pid;
 
-    /* Close any open listening sockets in the child */
+      /* Bump this queue's next-tick by it's interval */
 
-    close_daemon_sockets(daemon_notifier_fd,
-      fd_polls, listen_socket_count);
+      if (q->interval)
+       {
+       time_t now = time(NULL);
+       do ; while ((q->next_tick += q->interval) <= now);
+       }
 
-    /* Reset SIGHUP and SIGCHLD in the child in both cases. */
+      if ((pid = exim_fork(US"queue-runner")) == 0)
+       {
+       /* Disable debugging if it's required only for the daemon process. We
+       leave the above message, because it ties up with the "child ended"
+       debugging messages. */
 
-    signal(SIGHUP,  SIG_DFL);
-    signal(SIGCHLD, SIG_DFL);
-    signal(SIGTERM, SIG_DFL);
-    signal(SIGINT, SIG_DFL);
+       if (f.debug_daemon) debug_selector = 0;
 
-    /* Re-exec if privilege has been given up, unless deliver_drop_
-    privilege is set. Reset SIGALRM before exec(). */
+       /* Close any open listening sockets in the child */
 
-    if (geteuid() != root_uid && !deliver_drop_privilege)
-      {
-      uschar opt[8];
-      uschar *p = opt;
-      uschar *extra[7];
-      int extracount = 1;
-
-      signal(SIGALRM, SIG_DFL);
-      *p++ = '-';
-      *p++ = 'q';
-      if (  f.queue_2stage
+       close_daemon_sockets(daemon_notifier_fd,
+         fd_polls, listen_socket_count);
+
+       /* Reset SIGHUP and SIGCHLD in the child in both cases. */
+
+       signal(SIGHUP,  SIG_DFL);
+       signal(SIGCHLD, SIG_DFL);
+       signal(SIGTERM, SIG_DFL);
+       signal(SIGINT, SIG_DFL);
+
+       /* Re-exec if privilege has been given up, unless deliver_drop_
+       privilege is set. Reset SIGALRM before exec(). */
+
+       if (geteuid() != root_uid && !deliver_drop_privilege)
+         {
+         uschar opt[8];
+         uschar *p = opt;
+         uschar *extra[7];
+         int extracount = 1;
+
+         signal(SIGALRM, SIG_DFL);
+         queue_name = US"";
+
+         *p++ = '-';
+         *p++ = 'q';
+         if (  q->queue_2stage
 #ifndef DISABLE_QUEUE_RAMP
-        && !*queuerun_msgid
+            && !*queuerun_msgid
 #endif
-        ) *p++ = 'q';
-      if (f.queue_run_first_delivery) *p++ = 'i';
-      if (f.queue_run_force) *p++ = 'f';
-      if (f.deliver_force_thaw) *p++ = 'f';
-      if (f.queue_run_local) *p++ = 'l';
-      *p = 0;
-      extra[0] = *queue_name
-       ? string_sprintf("%sG%s", opt, queue_name) : opt;
+            ) *p++ = 'q';
+         if (q->queue_run_first_delivery) *p++ = 'i';
+         if (q->queue_run_force) *p++ = 'f';
+         if (q->deliver_force_thaw) *p++ = 'f';
+         if (q->queue_run_local) *p++ = 'l';
+         *p = 0;
+
+         extra[0] = q->name
+           ? string_sprintf("%sG%s", opt, q->name) : opt;
 
 #ifndef DISABLE_QUEUE_RAMP
-      if (*queuerun_msgid)
-       {
-       log_write(0, LOG_MAIN, "notify triggered queue run");
-       extra[extracount++] = queuerun_msgid;   /* Trigger only the */
-       extra[extracount++] = queuerun_msgid;   /* one message      */
-       }
+         if (*queuerun_msgid)
+           {
+           log_write(0, LOG_MAIN, "notify triggered queue run");
+           extra[extracount++] = queuerun_msgid;       /* Trigger only the */
+           extra[extracount++] = queuerun_msgid;       /* one message      */
+           }
 #endif
 
-      /* If -R or -S were on the original command line, ensure they get
-      passed on. */
+         /* If -R or -S were on the original command line, ensure they get
+         passed on. */
 
-      if (deliver_selectstring)
-       {
-       extra[extracount++] = f.deliver_selectstring_regex ? US"-Rr" : US"-R";
-       extra[extracount++] = deliver_selectstring;
-       }
+         if (deliver_selectstring)
+           {
+           extra[extracount++] = f.deliver_selectstring_regex ? US"-Rr" : US"-R";
+           extra[extracount++] = deliver_selectstring;
+           }
 
-      if (deliver_selectstring_sender)
-       {
-       extra[extracount++] = f.deliver_selectstring_sender_regex
-         ? US"-Sr" : US"-S";
-       extra[extracount++] = deliver_selectstring_sender;
-       }
+         if (deliver_selectstring_sender)
+           {
+           extra[extracount++] = f.deliver_selectstring_sender_regex
+             ? US"-Sr" : US"-S";
+           extra[extracount++] = deliver_selectstring_sender;
+           }
 
-      /* Overlay this process with a new execution. */
+         /* Overlay this process with a new execution. */
 
-      (void)child_exec_exim(CEE_EXEC_PANIC, FALSE, NULL, FALSE, extracount,
-       extra[0], extra[1], extra[2], extra[3], extra[4], extra[5], extra[6]);
+         (void)child_exec_exim(CEE_EXEC_PANIC, FALSE, NULL, FALSE, extracount,
+           extra[0], extra[1], extra[2], extra[3], extra[4], extra[5], extra[6]);
 
-      /* Control never returns here. */
-      }
+         /* Control never returns here. */
+         }
 
-    /* No need to re-exec; SIGALRM remains set to the default handler */
+       /* No need to re-exec; SIGALRM remains set to the default handler */
 
 #ifndef DISABLE_QUEUE_RAMP
-    if (*queuerun_msgid)
-      {
-      log_write(0, LOG_MAIN, "notify triggered queue run");
-      f.queue_2stage = FALSE;
-      queue_run(queuerun_msgid, queuerun_msgid, FALSE);
-      }
-    else
+       if (*queuerun_msgid)
+         {
+         log_write(0, LOG_MAIN, "notify triggered queue run");
+         f.queue_2stage = FALSE;
+         queue_run(q, queuerun_msgid, queuerun_msgid, FALSE);
+         }
+       else
 #endif
-      queue_run(NULL, NULL, FALSE);
-    exim_underbar_exit(EXIT_SUCCESS);
-    }
+         queue_run(q, NULL, NULL, FALSE);
+       exim_underbar_exit(EXIT_SUCCESS);
+       }
 
-  if (pid < 0)
-    {
-    log_write(0, LOG_MAIN|LOG_PANIC, "daemon: fork of queue-runner "
-      "process failed: %s", strerror(errno));
-    log_close_all();
-    }
-  else
-    {
-    for (int i = 0; i < local_queue_run_max; ++i)
-      if (queue_pid_slots[i] <= 0)
+      if (pid < 0)
        {
-       queue_pid_slots[i] = pid;
-       queue_run_count++;
-       break;
+       log_write(0, LOG_MAIN|LOG_PANIC, "daemon: fork of queue-runner "
+         "process failed: %s", strerror(errno));
+       log_close_all();
        }
-    DEBUG(D_any) debug_printf("%d queue-runner process%s running\n",
-      queue_run_count, queue_run_count == 1 ? "" : "es");
+      else
+       {
+       for (int i = 0; i < local_queue_run_max; ++i)
+         if (queue_runner_slots[i].pid <= 0)
+           {
+           queue_runner_slots[i].pid = pid;
+           queue_runner_slots[i].queue_name = q->name;
+           q->run_count++;
+           queue_run_count++;
+           break;
+           }
+       DEBUG(D_any) debug_printf("%d queue-runner process%s running\n",
+         queue_run_count, queue_run_count == 1 ? "" : "es");
+       }
+      }
     }
-  }
-
-/* Reset the alarm clock */
 
 sigalrm_seen = FALSE;
 #ifndef DISABLE_QUEUE_RAMP
-if (*queuerun_msgid)
+if (*queuerun_msgid)           /* it was a fast-ramp kick */
   *queuerun_msgid = 0;
-else
+else                           /* periodic or one-time queue run */
 #endif
-  ALARM(queue_interval);
+  {            /* Impose a minimum 1s tick, even when a run was outstanding */
+  int interval = next_qrunner_interval();
+  if (interval <= 0) interval = 1;
+
+  if (qrunners)                        /* there are still periodic qrunners */
+    {
+    ALARM(interval);
+    return interval;
+    }
+  }
+return 0;
 }
 
+
+
+
+const uschar *
+describe_queue_runners(void)
+{
+gstring * g = NULL;
+
+if (!is_multiple_qrun()) return US"no queue runs";
+
+for (qrunner * q = qrunners; q; q = q->next)
+  {
+  g = string_catn(g, US"-q", 2);
+  if (q->name) g = string_append(g, 3, US"G", q->name, US"/");
+  g = string_cat(g, readconf_printtime(q->interval));
+  g = string_catn(g, US" ", 1);
+  }
+gstring_trim(g, 1);
+gstring_release_unused(g);
+return string_from_gstring(g);
+}
+
+
 /*************************************************
 *              Exim Daemon Mainline              *
 *************************************************/
@@ -1557,7 +1687,32 @@ struct pollfd * fd_polls, * tls_watch_poll = NULL, * dnotify_poll = NULL;
 int listen_socket_count = 0, poll_fd_count;
 ip_address_item * addresses = NULL;
 time_t last_connection_time = (time_t)0;
-int local_queue_run_max = atoi(CS expand_string(queue_run_max));
+int local_queue_run_max = 0;
+BOOL queue_run_max_has_dollar;
+
+if (is_multiple_qrun())
+
+  /* Nuber of runner-tracking structs needed:  If the option queue_run_max has
+  no expandable elements then it is the overall maximum; else we assume it
+  depends on the queue name, and add them up to get the maximum.
+  Evaluate both that and the individual limits. */
+
+  if (Ustrchr(queue_run_max, '$') != NULL)
+    {
+    for (qrunner * q = qrunners; q; q = q->next)
+      {
+      queue_name = q->name;
+      local_queue_run_max +=
+       (q->run_max = atoi(CS expand_string(queue_run_max)));
+      }
+    queue_name = US"";
+    }
+  else
+    {
+    local_queue_run_max = atoi(CS expand_string(queue_run_max));
+    for (qrunner * q = qrunners; q; q = q->next)
+      q->run_max = local_queue_run_max;
+    }
 
 process_purpose = US"daemon";
 
@@ -2216,10 +2371,11 @@ originator_login = (pw = getpwuid(exim_uid))
 /* Get somewhere to keep the list of queue-runner pids if we are keeping track
 of them (and also if we are doing queue runs). */
 
-if (queue_interval > 0 && local_queue_run_max > 0)
+if (is_multiple_qrun() && local_queue_run_max > 0)
   {
-  queue_pid_slots = store_get(local_queue_run_max * sizeof(pid_t), GET_UNTAINTED);
-  for (int i = 0; i < local_queue_run_max; i++) queue_pid_slots[i] = 0;
+  queue_runner_slot_count = local_queue_run_max;
+  queue_runner_slots = store_get(local_queue_run_max * sizeof(runner_slot), GET_UNTAINTED);
+  memset(queue_runner_slots, 0, local_queue_run_max * sizeof(runner_slot));
   }
 
 /* Set up the handler for termination of child processes, and the one
@@ -2233,9 +2389,12 @@ os_non_restarting_signal(SIGTERM, main_sigterm_handler);
 os_non_restarting_signal(SIGINT, main_sigterm_handler);
 
 /* If we are to run the queue periodically, pretend the alarm has just gone
-off. This will cause the first queue-runner to get kicked off straight away. */
+off. This will cause the first queue-runner to get kicked off straight away.
+Get an initial sort of the list of queues, to prioritize the initial q-runs */
 
-sigalrm_seen = (queue_interval > 0);
+
+if ((sigalrm_seen = is_multiple_qrun()))
+  (void) next_qrunner_interval();
 
 /* Log the start up of a daemon - at least one of listening or queue running
 must be set up. */
@@ -2264,20 +2423,16 @@ else if (f.daemon_listen)
   int smtps_ports = 0;
   ip_address_item * ipa;
   uschar * p;
-  uschar * qinfo = queue_interval > 0
-    ? string_sprintf("-q%s%s",
-       f.queue_2stage ? "q" : "", readconf_printtime(queue_interval))
-    : US"no queue runs";
+  const uschar * qinfo = describe_queue_runners();
 
   /* Build a list of listening addresses in big_buffer, but limit it to 10
   items. The style is for backwards compatibility.
 
-  It is now possible to have some ports listening for SMTPS (the old,
-  deprecated protocol that starts TLS without using STARTTLS), and others
-  listening for standard SMTP. Keep their listings separate. */
+  It is possible to have some ports listening for SMTPS (as opposed to TLS
+  startted by STARTTLS), and others listening for standard SMTP. Keep their
+  listings separate. */
 
   for (int j = 0, i; j < 2; j++)
-    {
     for (i = 0, ipa = addresses; i < 10 && ipa; i++, ipa = ipa->next)
       {
       /* First time round, look for SMTP ports; second time round, look for
@@ -2315,7 +2470,7 @@ else if (f.daemon_listen)
               && Ustrcmp(ipa->address, i2->address) == 0
               )
              {                         /* found; append port to list */
-             for (p = i2->log; *p; ) p++;      /* end of existing string */
+             for (p = i2->log; *p; ) p++;      /* end of existing string   { */
              if (*--p == '}') *p = '\0';       /* drop EOL */
              while (isdigit(*--p)) ;           /* char before port */
 
@@ -2331,7 +2486,6 @@ else if (f.daemon_listen)
          }
        }
       }
-    }
 
   p = big_buffer;
   for (int j = 0, i; j < 2; j++)
@@ -2367,11 +2521,9 @@ else if (f.daemon_listen)
     version_string, qinfo, big_buffer);
   }
 
-else
+else   /* no listening sockets, only queue-runs */
   {
-  uschar * s = *queue_name
-    ? string_sprintf("-qG%s/%s", queue_name, readconf_printtime(queue_interval))
-    : string_sprintf("-q%s", readconf_printtime(queue_interval));
+  const uschar * s = describe_queue_runners();
   log_write(0, LOG_MAIN,
     "exim %s daemon started: pid=%d, %s, not listening for SMTP",
     version_string, getpid(), s);
@@ -2445,6 +2597,8 @@ report_time_since(&timestamp_startup, US"daemon loop start");     /* testcase 0022 *
 
 for (;;)
   {
+  int nolisten_sleep = 60;
+
   if (sigterm_seen)
     daemon_die();      /* Does not return */
 
@@ -2458,7 +2612,8 @@ for (;;)
     if (inetd_wait_timeout > 0)
       daemon_inetd_wtimeout(last_connection_time);     /* Might not return */
     else
-      daemon_qrun(local_queue_run_max, fd_polls, listen_socket_count);
+      nolisten_sleep =
+       daemon_qrun(local_queue_run_max, fd_polls, listen_socket_count);
 
 
   /* Sleep till a connection happens if listening, and handle the connection if
@@ -2663,7 +2818,7 @@ for (;;)
   else
     {
     struct pollfd p;
-    poll(&p, 0, queue_interval * 1000);
+    poll(&p, 0, nolisten_sleep * 1000);
     handle_ending_processes();
     }
 
index dcc71ea4534ab270fff63209d8e204dceb898e23..9cba8d51effb46a4552d58128e42b69d8e52f1d6 100644 (file)
@@ -1687,6 +1687,33 @@ else
 
 
 
+/*************************************************
+*          Queue-runner operations               *
+*************************************************/
+
+/* Prefix a new qrunner descriptor to the qrunners list */
+
+static qrunner *
+alloc_qrunner(void)
+{
+qrunner * q = qrunners;
+qrunners = store_get(sizeof(qrunner), GET_UNTAINTED);
+memset(qrunners, 0, sizeof(qrunner));          /* default queue, zero interval */
+qrunners->next = q;
+qrunners->next_tick = time(NULL);              /* run right away */
+return qrunners;
+}
+
+static qrunner *
+alloc_onetime_qrunner(void)
+{
+qrunners = store_get_perm(sizeof(qrunner), GET_UNTAINTED);
+memset(qrunners, 0, sizeof(qrunner));          /* default queue, zero interval */
+qrunners->next_tick = time(NULL);              /* run right away */
+qrunners->run_max = 1;
+}
+
+
 /*************************************************
 *          Entry point and high-level code       *
 *************************************************/
@@ -2051,7 +2078,7 @@ this is a smail convention. */
 if ((namelen == 4 && Ustrcmp(argv[0], "runq") == 0) ||
     (namelen  > 4 && Ustrncmp(argv[0] + namelen - 5, "/runq", 5) == 0))
   {
-  queue_interval = 0;
+  alloc_onetime_qrunner();
   receiving_message = FALSE;
   called_as = US"-runq";
   }
@@ -2110,7 +2137,7 @@ on the second character (the one after '-'), to save some effort. */
   BOOL badarg = FALSE;
   uschar * arg = argv[i];
   uschar * argrest;
-  int switchchar;
+  uschar switchchar;
 
   /* An argument not starting with '-' is the start of a recipients list;
   break out of the options-scanning loop. */
@@ -3504,87 +3531,100 @@ on the second character (the one after '-'), to save some effort. */
       }
     break;
 
+    /* -q:  set up queue runs */
 
     case 'q':
-    receiving_message = FALSE;
-    if (queue_interval >= 0)
-      exim_fail("exim: -q specified more than once\n");
-
-    /* -qq...: Do queue runs in a 2-stage manner */
-
-    if (*argrest == 'q')
       {
-      f.queue_2stage = TRUE;
-      argrest++;
-      }
+      BOOL two_stage, first_del, force, thaw = FALSE, local;
 
-    /* -qi...: Do only first (initial) deliveries */
+      receiving_message = FALSE;
 
-    if (*argrest == 'i')
-      {
-      f.queue_run_first_delivery = TRUE;
-      argrest++;
-      }
+      /* -qq...: Do queue runs in a 2-stage manner */
 
-    /* -qf...: Run the queue, forcing deliveries
-       -qff..: Ditto, forcing thawing as well */
+      if ((two_stage = *argrest == 'q'))
+       argrest++;
 
-    if (*argrest == 'f')
-      {
-      f.queue_run_force = TRUE;
-      if (*++argrest == 'f')
-        {
-        f.deliver_force_thaw = TRUE;
-        argrest++;
-        }
-      }
+      /* -qi...: Do only first (initial) deliveries */
 
-    /* -q[f][f]l...: Run the queue only on local deliveries */
+      if ((first_del = *argrest == 'i'))
+       argrest++;
 
-    if (*argrest == 'l')
-      {
-      f.queue_run_local = TRUE;
-      argrest++;
-      }
+      /* -qf...: Run the queue, forcing deliveries
+        -qff..: Ditto, forcing thawing as well */
 
-    /* -q[f][f][l][G<name>]... Work on the named queue */
+      if ((force = *argrest == 'f'))
+       if ((thaw = *++argrest == 'f'))
+         argrest++;
 
-    if (*argrest == 'G')
-      {
-      int i;
-      for (argrest++, i = 0; argrest[i] && argrest[i] != '/'; ) i++;
-      exim_len_fail_toolong(i, EXIM_DRIVERNAME_MAX, "-q*G<name>");
-      queue_name = string_copyn(argrest, i);
-      argrest += i;
-      if (*argrest == '/') argrest++;
-      }
+      /* -q[f][f]l...: Run the queue only on local deliveries */
+
+      if ((local = *argrest == 'l'))
+       argrest++;
 
-    /* -q[f][f][l][G<name>]: Run the queue, optionally forced, optionally local
-    only, optionally named, optionally starting from a given message id. */
+      /* -q[f][f][l][G<name>]... Work on the named queue */
 
-    if (!(list_queue || count_queue))
-      if (  !*argrest
-        && (i + 1 >= argc || argv[i+1][0] == '-' || mac_ismsgid(argv[i+1])))
+      if (*argrest == 'G')
        {
-       queue_interval = 0;
-       if (i+1 < argc && mac_ismsgid(argv[i+1]))
-         start_queue_run_id = string_copy_taint(argv[++i], GET_TAINTED);
-       if (i+1 < argc && mac_ismsgid(argv[i+1]))
-         stop_queue_run_id = string_copy_taint(argv[++i], GET_TAINTED);
+       int i;
+       for (argrest++, i = 0; argrest[i] && argrest[i] != '/'; ) i++;
+       exim_len_fail_toolong(i, EXIM_DRIVERNAME_MAX, "-q*G<name>");
+       queue_name = string_copyn(argrest, i);
+       argrest += i;
+       if (*argrest == '/') argrest++;
        }
 
-    /* -q[f][f][l][G<name>/]<n>: Run the queue at regular intervals, optionally
-    forced, optionally local only, optionally named. */
+      /* -q[f][f][l][G<name>]: Run the queue, optionally forced, optionally local
+      only, optionally named, optionally starting from a given message id. */
 
-      else if ((queue_interval = readconf_readtime(*argrest ? argrest : argv[++i],
-                                                 0, FALSE)) <= 0)
-       exim_fail("exim: bad time value %s: abandoned\n", argv[i]);
-    break;
+      if (!(list_queue || count_queue))
+       {
+       qrunner * q;
+
+       if (  !*argrest
+          && (i + 1 >= argc || argv[i+1][0] == '-' || mac_ismsgid(argv[i+1])))
+         {
+         q = alloc_onetime_qrunner();
+         if (i+1 < argc && mac_ismsgid(argv[i+1]))
+           start_queue_run_id = string_copy_taint(argv[++i], GET_TAINTED);
+         if (i+1 < argc && mac_ismsgid(argv[i+1]))
+           stop_queue_run_id = string_copy_taint(argv[++i], GET_TAINTED);
+         }
+
+      /* -q[f][f][l][G<name>/]<n>: Run the queue at regular intervals, optionally
+      forced, optionally local only, optionally named. */
+
+       else
+         {
+         int intvl = readconf_readtime(*argrest ? argrest : argv[++i], 0, FALSE);
+         if (intvl <= 0)
+           exim_fail("exim: bad time value %s: abandoned\n", argv[i]);
+
+         for (qrunner * qq = qrunners; qq; qq = qq->next)
+           if (  queue_name && qq->name && Ustrcmp(queue_name, qq->name) == 0
+              || !queue_name && !qq->name)
+             exim_fail("exim: queue-runner specified more than once\n");
+
+         q = alloc_qrunner();
+         q->interval = intvl;
+         }
+
+       q->name = *queue_name ? queue_name : NULL;      /* will be NULL for the default queue */
+       q->queue_run_force = force;
+       q->deliver_force_thaw = thaw;
+       q->queue_run_first_delivery = first_del;
+       q->queue_run_local = local;
+       q->queue_2stage = two_stage;
+       }
+
+      break;
+      }
 
 
     case 'R':   /* Synonymous with -qR... */
+    case 'S':   /* Synonymous with -qS... */
       {
-      const uschar *tainted_selectstr;
+      const uschar * tainted_selectstr;
+      uschar * s;
 
       receiving_message = FALSE;
 
@@ -3594,20 +3634,28 @@ on the second character (the one after '-'), to save some effort. */
        -Rrf:  Regex and force
        -Rrff: Regex and force and thaw
 
+       -S...: Like -R but works on sender.
+
     in all cases provided there are no further characters in this
     argument. */
 
+      alloc_onetime_qrunner();
+      qrunners->queue_2stage = f.queue_2stage;
       if (*argrest)
        for (int i = 0; i < nelem(rsopts); i++)
          if (Ustrcmp(argrest, rsopts[i]) == 0)
            {
-           if (i != 2) f.queue_run_force = TRUE;
-           if (i >= 2) f.deliver_selectstring_regex = TRUE;
-           if (i == 1 || i == 4) f.deliver_force_thaw = TRUE;
+           if (i != 2) qrunners->queue_run_force = TRUE;
+           if (i >= 2)
+             if (switchchar == 'R')
+               f.deliver_selectstring_regex = TRUE;
+             else
+               f.deliver_selectstring_sender_regex = TRUE;
+           if (i == 1 || i == 4) qrunners->deliver_force_thaw = TRUE;
            argrest += Ustrlen(rsopts[i]);
            }
 
-    /* -R: Set string to match in addresses for forced queue run to
+    /* -R or -S: Set string to match in addresses for forced queue run to
     pick out particular messages. */
 
       /* Avoid attacks from people providing very long strings, and do so before
@@ -3617,58 +3665,22 @@ on the second character (the one after '-'), to save some effort. */
       else if (i+1 < argc)
        tainted_selectstr = argv[++i];
       else
-       exim_fail("exim: string expected after -R\n");
-      deliver_selectstring = string_copy_taint(
+       exim_fail("exim: string expected after %s\n", switchchar == 'R' ? "-R" : "-S");
+
+      s = string_copy_taint(
        exim_str_fail_toolong(tainted_selectstr, EXIM_EMAILADDR_MAX, "-R"),
        GET_TAINTED);
-      }
-    break;
-
-    /* -r: an obsolete synonym for -f (see above) */
-
-
-    /* -S: Like -R but works on sender. */
-
-    case 'S':   /* Synonymous with -qS... */
-      {
-      const uschar *tainted_selectstr;
-
-      receiving_message = FALSE;
-
-    /* -Sf:   As -S (below) but force all deliveries,
-       -Sff:  Ditto, but also thaw all frozen messages,
-       -Sr:   String is regex
-       -Srf:  Regex and force
-       -Srff: Regex and force and thaw
-
-    in all cases provided there are no further characters in this
-    argument. */
-
-      if (*argrest)
-       for (int i = 0; i < nelem(rsopts); i++)
-         if (Ustrcmp(argrest, rsopts[i]) == 0)
-           {
-           if (i != 2) f.queue_run_force = TRUE;
-           if (i >= 2) f.deliver_selectstring_sender_regex = TRUE;
-           if (i == 1 || i == 4) f.deliver_force_thaw = TRUE;
-           argrest += Ustrlen(rsopts[i]);
-           }
-
-    /* -S: Set string to match in addresses for forced queue run to
-    pick out particular messages. */
 
-      if (*argrest)
-       tainted_selectstr = argrest;
-      else if (i+1 < argc)
-       tainted_selectstr = argv[++i];
+      if (switchchar == 'R')
+       deliver_selectstring = s;
       else
-       exim_fail("exim: string expected after -S\n");
-      deliver_selectstring_sender = string_copy_taint(
-       exim_str_fail_toolong(tainted_selectstr, EXIM_EMAILADDR_MAX, "-S"),
-       GET_TAINTED);
+       deliver_selectstring_sender = s;
       }
     break;
 
+
+    /* -r: an obsolete synonym for -f (see above) */
+
     /* -Tqt is an option that is exclusively for use by the testing suite.
     It is not recognized in other circumstances. It allows for the setting up
     of explicit "queue times" so that various warning/retry things can be
@@ -3777,9 +3789,8 @@ on the second character (the one after '-'), to save some effort. */
 
 /* If -R or -S have been specified without -q, assume a single queue run. */
 
- if (  (deliver_selectstring || deliver_selectstring_sender)
-    && queue_interval < 0)
-  queue_interval = 0;
+ if ((deliver_selectstring || deliver_selectstring_sender) && !qrunners)
+  alloc_onetime_qrunner();
 
 
 END_ARG:
@@ -3791,22 +3802,22 @@ if (usage_wanted) exim_usage(called_as);
 
 /* Arguments have been processed. Check for incompatibilities. */
 if (  (  (smtp_input || extract_recipients || recipients_arg < argc)
-      && (  f.daemon_listen || queue_interval >= 0 || bi_option
+      && (  f.daemon_listen || qrunners || bi_option
         || test_retry_arg >= 0 || test_rewrite_arg >= 0
         || filter_test != FTEST_NONE
         || msg_action_arg > 0 && !one_msg_action
       )  )
    || (  msg_action_arg > 0
-      && (  f.daemon_listen || queue_interval > 0 || list_options
+      && (  f.daemon_listen || is_multiple_qrun() || list_options
         || checking && msg_action != MSG_LOAD
         || bi_option || test_retry_arg >= 0 || test_rewrite_arg >= 0
       )  )
-   || (  (f.daemon_listen || queue_interval > 0)
+   || (  (f.daemon_listen || is_multiple_qrun())
       && (  sender_address || list_options || list_queue || checking
         || bi_option
       )  )
-   || f.daemon_listen && queue_interval == 0
-   || f.inetd_wait_mode && queue_interval >= 0
+   || f.daemon_listen && is_onetime_qrun()
+   || f.inetd_wait_mode && qrunners
    || (  list_options
       && (  checking || smtp_input || extract_recipients
         || filter_test != FTEST_NONE || bi_option
@@ -3822,7 +3833,7 @@ if (  (  (smtp_input || extract_recipients || recipients_arg < argc)
    || (  smtp_input
       && (sender_address || filter_test != FTEST_NONE || extract_recipients)
       )
-   || deliver_selectstring && queue_interval < 0
+   || deliver_selectstring && !qrunners
    || msg_action == MSG_LOAD && (!expansion_test || expansion_test_message)
    )
   exim_fail("exim: incompatible command-line options or arguments\n");
@@ -4444,7 +4455,7 @@ if (!f.admin_user)
   if (  deliver_give_up || f.daemon_listen || malware_test_file
      || count_queue && queue_list_requires_admin
      || list_queue && queue_list_requires_admin
-     || queue_interval >= 0 && prod_requires_admin
+     || qrunners && prod_requires_admin
      || queue_name_dest && prod_requires_admin
      || debugset && !f.running_in_test_harness
      )
@@ -4460,7 +4471,7 @@ regression testing. */
 if (  real_uid != root_uid && real_uid != exim_uid
    && (  continue_hostname
       || (  f.dont_deliver
-        && (queue_interval >= 0 || f.daemon_listen || msg_action_arg > 0)
+        && (qrunners || f.daemon_listen || msg_action_arg > 0)
       )  )
    && !f.running_in_test_harness
    )
@@ -4577,11 +4588,11 @@ to the state Exim usually runs in. */
 if (  !unprivileged                            /* originally had root AND */
    && !removed_privilege                       /* still got root AND      */
    && !f.daemon_listen                         /* not starting the daemon */
-   && queue_interval <= 0                      /* (either kind of daemon) */
+   && (!qrunners || is_onetime_qrun())         /* (either kind of daemon) */
    && (                                                /*    AND EITHER           */
          deliver_drop_privilege                        /* requested unprivileged  */
       || (                                     /*       OR                */
-            queue_interval < 0                 /* not running the queue   */
+            !qrunners                          /* not running the queue   */
          && (  msg_action_arg < 0              /*       and               */
             || msg_action != MSG_DELIVER       /* not delivering          */
            )                                   /*       and               */
@@ -4696,7 +4707,7 @@ if (msg_action_arg > 0 && msg_action != MSG_DELIVER && msg_action != MSG_LOAD)
   }
 
 /* We used to set up here to skip reading the ACL section, on
- (msg_action_arg > 0 || (queue_interval == 0 && !f.daemon_listen)
+ (msg_action_arg > 0 || (is_onetime_qrun() && !f.daemon_listen)
 Now, since the intro of the ${acl } expansion, ACL definitions may be
 needed in transports so we lost the optimisation. */
 
@@ -4947,18 +4958,9 @@ if (msg_action_arg > 0 && msg_action != MSG_LOAD)
 /* If only a single queue run is requested, without SMTP listening, we can just
 turn into a queue runner, with an optional starting message id. */
 
-if (queue_interval == 0 && !f.daemon_listen)
+if (is_onetime_qrun() && !f.daemon_listen)
   {
-  DEBUG(D_queue_run) debug_printf("Single queue run%s%s%s%s\n",
-    start_queue_run_id ? US" starting at " : US"",
-    start_queue_run_id ? start_queue_run_id: US"",
-    stop_queue_run_id ?  US" stopping at " : US"",
-    stop_queue_run_id ?  stop_queue_run_id : US"");
-  if (*queue_name)
-    set_process_info("running the '%s' queue (single queue run)", queue_name);
-  else
-    set_process_info("running the queue (single queue run)");
-  queue_run(start_queue_run_id, stop_queue_run_id, FALSE);
+  single_queue_run(qrunners, start_queue_run_id, stop_queue_run_id);
   exim_exit(EXIT_SUCCESS);
   }
 
@@ -5083,7 +5085,7 @@ returns. We leave this till here so that the originator_ fields are available
 for incoming messages via the daemon. The daemon cannot be run in mua_wrapper
 mode. */
 
-if (f.daemon_listen || f.inetd_wait_mode || queue_interval > 0)
+if (f.daemon_listen || f.inetd_wait_mode || is_multiple_qrun())
   {
   if (mua_wrapper)
     {
index 961db2dc03c4cb0a925f7f7a78c8bfb67a9c45e0..37f0a57bc6e8412ae73f75da03c9e44c4d34826c 100644 (file)
@@ -411,7 +411,7 @@ extern void    queue_list(int, uschar **, int);
 #ifndef DISABLE_QUEUE_RAMP
 extern void    queue_notify_daemon(const uschar * hostname);
 #endif
-extern void    queue_run(uschar *, uschar *, BOOL);
+extern void    queue_run(qrunner *, uschar *, uschar *, BOOL);
 
 extern int     random_number(int);
 extern const uschar *rc_to_string(int);
@@ -498,6 +498,7 @@ extern int     sieve_interpret(const uschar *, int, const uschar *,
                 const uschar *, const uschar *, const uschar *,
                 address_item **, uschar **);
 extern void    sigalrm_handler(int);
+extern void    single_queue_run(qrunner *, uschar *, uschar *);
 extern int     smtp_boundsock(smtp_connect_args *);
 extern void    smtp_closedown(uschar *);
 extern void    smtp_command_timeout_exit(void) NORETURN;
@@ -1368,6 +1369,22 @@ int res;
 return !s || !*s || (res = Uatoi(s)) == 0 ? UNLIMITED_ADDRS : res;
 }
 
+/******************************************************************************/
+/* Queue-runner operations */
+
+static inline BOOL
+is_onetime_qrun(void)
+{
+return qrunners && !qrunners->next && qrunners->interval == 0;
+}
+
+static inline BOOL
+is_multiple_qrun(void)
+{
+return qrunners && (qrunners->interval > 0 || qrunners->next);
+}
+
+
 # endif        /* !COMPILE_UTILITY */
 
 /******************************************************************************/
index 7af345465cdfd511df52b59da543bbba0f30f259..a4b2c6a9c21816826176e73912c2425432134d78 100644 (file)
@@ -291,8 +291,6 @@ struct global_flags f =
 
        .queue_2stage           = FALSE,
        .queue_only_policy      = FALSE,
-       .queue_run_first_delivery = FALSE,
-       .queue_run_force        = FALSE,
        .queue_run_local        = FALSE,
        .queue_running          = FALSE,
        .queue_smtp             = FALSE,
@@ -1248,6 +1246,8 @@ uschar *prvscheck_keynum       = NULL;
 uschar *prvscheck_result       = NULL;
 
 
+qrunner *qrunners             = NULL;
+
 const uschar *qualify_domain_recipient = NULL;
 uschar *qualify_domain_sender  = NULL;
 uschar *queue_domains          = NULL;
index f2e1476708034664107541f80ed97434cfe7783e..914e2d0f9d9a2d73db0bf57790ce2becda0929ab 100644 (file)
@@ -257,8 +257,6 @@ extern struct global_flags {
 
  BOOL   queue_2stage                   :1; /* Run queue in 2-stage manner */
  BOOL   queue_only_policy              :1; /* ACL or local_scan wants queue_only */
- BOOL   queue_run_first_delivery       :1; /* If TRUE, first deliveries only */
- BOOL   queue_run_force                        :1; /* TRUE to force during queue run */
  BOOL   queue_run_local                        :1; /* Local deliveries only in queue run */
  BOOL   queue_running                  :1; /* TRUE for queue running process and */
  BOOL   queue_smtp                     :1; /* Disable all immediate SMTP (-odqs)*/
@@ -837,6 +835,8 @@ extern uschar *prvscheck_address;      /* Set during prvscheck expansion item */
 extern uschar *prvscheck_keynum;       /* Set during prvscheck expansion item */
 extern uschar *prvscheck_result;       /* Set during prvscheck expansion item */
 
+extern qrunner *qrunners;             /* tracking data for queues */
+
 extern const uschar *qualify_domain_recipient; /* Domain to qualify recipients with */
 extern uschar *qualify_domain_sender;  /* Domain to qualify senders with */
 extern uschar *queue_domains;          /* Queue these domains */
index 585067fc9ea59c4ed69cd6b714145435571ea9b5..3b0293b971d5631569f3244601d18ff1c6a673aa 100644 (file)
@@ -1113,9 +1113,9 @@ should not be one active. */
 
 #define NOTIFIER_SOCKET_NAME   "exim_daemon_notify"
 /* Notify message types */
-#define NOTIFY_MSG_QRUN                1
-#define NOTIFY_QUEUE_SIZE_REQ  2
-#define NOTIFY_REGEX           3
+#define NOTIFY_MSG_QRUN                1       /* 2stage qrun fast-ramp trigger */
+#define NOTIFY_QUEUE_SIZE_REQ  2       /* obtain current queue count */
+#define NOTIFY_REGEX           3       /* an RE for caching */
 
 /* Flags for match_check_string() */
 typedef unsigned mcs_flags;
index f86e24b42e85019304a4c6fe802c9f566111893a..d01cde655d6b0dfdd126684c3c33c585b8a093e8 100644 (file)
@@ -325,8 +325,8 @@ previous lexically lesser one if the given stop message doesn't exist. Because
 a queue run can take some time, stat each file before forking, in case it has
 been delivered in the meantime by some other means.
 
-The global variables queue_run_force and queue_run_local may be set to cause
-forced deliveries or local-only deliveries, respectively.
+The qrun descriptor  variables queue_run_force and queue_run_local may be set to
+cause forced deliveries or local-only deliveries, respectively.
 
 If deliver_selectstring[_sender] is not NULL, skip messages whose recipients do
 not contain the string. As this option is typically used when a machine comes
@@ -339,6 +339,7 @@ is set so that routing is done for all messages. Thus in the second run those
 that are routed to the same host should go down the same SMTP connection.
 
 Arguments:
+  q         queue-runner descriptor
   start_id   message id to start at, or NULL for all
   stop_id    message id to end at, or NULL for all
   recurse    TRUE if recursing for 2-stage run
@@ -347,10 +348,10 @@ Returns:     nothing
 */
 
 void
-queue_run(uschar *start_id, uschar *stop_id, BOOL recurse)
+queue_run(qrunner * q, uschar * start_id, uschar * stop_id, BOOL recurse)
 {
-BOOL force_delivery = f.queue_run_force || deliver_selectstring != NULL ||
-  deliver_selectstring_sender != NULL;
+BOOL force_delivery = q->queue_run_force
+  || deliver_selectstring || deliver_selectstring_sender;
 const pcre2_code *selectstring_regex = NULL;
 const pcre2_code *selectstring_regex_sender = NULL;
 uschar *log_detail = NULL;
@@ -363,6 +364,13 @@ BOOL single_id = FALSE;
 report_time_since(&timestamp_startup, US"queue_run start");
 #endif
 
+/* Copy the legacy globals from the newer per-qrunner-desc */
+
+queue_name =           q->name ? q->name : US"";
+f.queue_2stage =        q->queue_2stage;
+f.deliver_force_thaw =  q->deliver_force_thaw;
+f.queue_run_local =     q->queue_run_local;
+
 /* Cancel any specific queue domains. Turn off the flag that causes SMTP
 deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag
 gets set. Save the queue_runner's pid and the flag that indicates any
@@ -371,7 +379,7 @@ on TCP/IP channels have queue_run_pid set, but not queue_running. */
 
 queue_domains = NULL;
 queue_smtp_domains = NULL;
-f.queue_smtp = f.queue_2stage;
+f.queue_smtp = q->queue_2stage;
 
 queue_run_pid = getpid();
 f.queue_running = TRUE;
@@ -383,11 +391,11 @@ if (!recurse)
   uschar extras[8];
   uschar *p = extras;
 
-  if (f.queue_2stage) *p++ = 'q';
-  if (f.queue_run_first_delivery) *p++ = 'i';
-  if (f.queue_run_force) *p++ = 'f';
-  if (f.deliver_force_thaw) *p++ = 'f';
-  if (f.queue_run_local) *p++ = 'l';
+  if (q->queue_2stage)         *p++ = 'q';
+  if (q->queue_run_first_delivery) *p++ = 'i';
+  if (q->queue_run_force)      *p++ = 'f';
+  if (q->deliver_force_thaw)   *p++ = 'f';
+  if (q->queue_run_local)      *p++ = 'l';
   *p = 0;
 
   p = big_buffer;
@@ -399,25 +407,25 @@ if (!recurse)
   if (deliver_selectstring)
     {
     snprintf(CS p, big_buffer_size - (p - big_buffer), " -R%s %s",
-      f.deliver_selectstring_regex? "r" : "", deliver_selectstring);
+      f.deliver_selectstring_regex ? "r" : "", deliver_selectstring);
     p += Ustrlen(CCS p);
     }
 
   if (deliver_selectstring_sender)
     {
     snprintf(CS p, big_buffer_size - (p - big_buffer), " -S%s %s",
-      f.deliver_selectstring_sender_regex? "r" : "", deliver_selectstring_sender);
+      f.deliver_selectstring_sender_regex ? "r" : "", deliver_selectstring_sender);
     p += Ustrlen(CCS p);
     }
 
   log_detail = string_copy(big_buffer);
-  if (*queue_name)
+  if (q->name)
     log_write(L_queue_run, LOG_MAIN, "Start '%s' queue run: %s",
-      queue_name, log_detail);
+      q->name, log_detail);
   else
     log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail);
 
-  single_id = start_id && stop_id && !f.queue_2stage
+  single_id = start_id && stop_id && !q->queue_2stage
              && Ustrcmp(start_id, stop_id) == 0;
   }
 
@@ -474,7 +482,7 @@ for (int i = queue_run_in_order ? -1 : 0;
     /* Unless deliveries are forced, if deliver_queue_load_max is non-negative,
     check that the load average is low enough to permit deliveries. */
 
-    if (!f.queue_run_force && deliver_queue_load_max >= 0)
+    if (!q->queue_run_force && deliver_queue_load_max >= 0)
       if ((load_average = os_getloadavg()) > deliver_queue_load_max)
         {
         log_write(L_queue_run, LOG_MAIN, "Abandon queue run: %s (load %.2f, max %.2f)",
@@ -492,7 +500,7 @@ for (int i = queue_run_in_order ? -1 : 0;
     /* If initial of a 2-phase run, maintain a set of child procs
     to get disk parallelism */
 
-    if (f.queue_2stage && !queue_run_in_order)
+    if (q->queue_2stage && !queue_run_in_order)
       {
       int i;
       if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1])
@@ -530,7 +538,7 @@ for (int i = queue_run_in_order ? -1 : 0;
     message when many are not going to be delivered. */
 
     if (deliver_selectstring || deliver_selectstring_sender ||
-        f.queue_run_first_delivery)
+        q->queue_run_first_delivery)
       {
       BOOL wanted = TRUE;
       BOOL orig_dont_deliver = f.dont_deliver;
@@ -548,7 +556,7 @@ for (int i = queue_run_in_order ? -1 : 0;
       header file, we might as well do the freeze test now, and save forking
       another process. */
 
-      if (f.deliver_freeze && !f.deliver_force_thaw)
+      if (f.deliver_freeze && !q->deliver_force_thaw)
         {
         log_write(L_skip_delivery, LOG_MAIN, "Message is frozen");
         wanted = FALSE;
@@ -556,7 +564,7 @@ for (int i = queue_run_in_order ? -1 : 0;
 
       /* Check first_delivery in the case when there are no message logs. */
 
-      else if (f.queue_run_first_delivery && !f.deliver_firsttime)
+      else if (q->queue_run_first_delivery && !f.deliver_firsttime)
         {
         DEBUG(D_queue_run) debug_printf("%s: not first delivery\n", fq->text);
         wanted = FALSE;
@@ -688,7 +696,7 @@ single_item_retry:
     /* A zero return means a delivery was attempted; turn off the force flag
     for any subsequent calls unless queue_force is set. */
 
-    if (!(status & 0xffff)) force_delivery = f.queue_run_force;
+    if (!(status & 0xffff)) force_delivery = q->queue_run_force;
 
     /* If the process crashed, tell somebody */
 
@@ -723,13 +731,13 @@ single_item_retry:
     set_process_info("running queue");
 
     /* If initial of a 2-phase run, we are a child - so just exit */
-    if (f.queue_2stage && !queue_run_in_order)
+    if (q->queue_2stage && !queue_run_in_order)
       exim_exit(EXIT_SUCCESS);
 
     /* If we are in the test harness, and this is not the first of a 2-stage
     queue run, update fudged queue times. */
 
-    if (f.running_in_test_harness && !f.queue_2stage)
+    if (f.running_in_test_harness && !q->queue_2stage)
       {
       uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
       if (fqtnext) fudged_queue_times = fqtnext + 1;
@@ -740,7 +748,7 @@ single_item_retry:
 
   go_around:
     /* If initial of a 2-phase run, we are a child - so just exit */
-    if (f.queue_2stage && !queue_run_in_order)
+    if (q->queue_2stage && !queue_run_in_order)
       exim_exit(EXIT_SUCCESS);
     }                                  /* End loop for list of messages */
 
@@ -767,7 +775,7 @@ single_item_retry:
 /* If queue_2stage is true, we do it all again, with the 2stage flag
 turned off. */
 
-if (f.queue_2stage)
+if (q->queue_2stage)
   {
 
   /* wait for last children */
@@ -782,22 +790,40 @@ if (f.queue_2stage)
 #ifdef MEASURE_TIMING
   report_time_since(&timestamp_startup, US"queue_run 1st phase done");
 #endif
-  f.queue_2stage = FALSE;
-  queue_run(start_id, stop_id, TRUE);
+  q->queue_2stage = f.queue_2stage = FALSE;
+  queue_run(q, start_id, stop_id, TRUE);
   }
 
 /* At top level, log the end of the run. */
 
 if (!recurse)
-  if (*queue_name)
+  if (q->name)
     log_write(L_queue_run, LOG_MAIN, "End '%s' queue run: %s",
-      queue_name, log_detail);
+      q->name, log_detail);
   else
     log_write(L_queue_run, LOG_MAIN, "End queue run: %s", log_detail);
 }
 
 
 
+void
+single_queue_run(qrunner * q, uschar * start_id, uschar * stop_id)
+{
+DEBUG(D_queue_run) debug_printf("Single queue run%s%s%s%s\n",
+  start_id ? US" starting at " : US"",
+  start_id ? start_id: US"",
+  stop_id ?  US" stopping at " : US"",
+  stop_id ?  stop_id : US"");
+
+if (*queue_name)
+  set_process_info("running the '%s' queue (single queue run)", queue_name);
+else
+  set_process_info("running the queue (single queue run)");
+queue_run(q, start_id, stop_id, FALSE);
+}
+
+
+
 
 /************************************************
 *         Count messages on the queue           *
@@ -1552,20 +1578,22 @@ if (s)
 void
 queue_notify_daemon(const uschar * msgid)
 {
-uschar buf[MESSAGE_ID_LENGTH + 2];
+int bsize = 1 + MESSAGE_ID_LENGTH + 1 + Ustrlen(queue_name) + 1;
+uschar * buf = store_get(bsize, GET_UNTAINTED);
 int fd;
 
 DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid);
 
 buf[0] = NOTIFY_MSG_QRUN;
 memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1);
+Ustrcpy(buf+1+MESSAGE_ID_LENGTH+1, queue_name);
 
 if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0)
   {
   struct sockaddr_un sa_un = {.sun_family = AF_UNIX};
   ssize_t len = daemon_notifier_sockname(&sa_un);
 
-  if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sa_un, (socklen_t)len) < 0)
+  if (sendto(fd, buf, bsize, 0, (struct sockaddr *)&sa_un, (socklen_t)len) < 0)
     DEBUG(D_queue_run)
       debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno));
   close(fd);
index eae66e88de5922d55ab56fbde466deed88857b61..3f237fce5a6bf1b34f57916a99c690f2d2d4a27d 100644 (file)
@@ -958,4 +958,22 @@ struct ob_dkim {
 #endif
 };
 
+
+/* per-queue-runner info */
+typedef struct qrunner {
+  struct qrunner * next;       /* list sorted by next tick */
+
+  uschar *     name;           /* NULL for the default queue */
+  unsigned     interval;       /* tick rate, seconds */
+  time_t       next_tick;      /* next run should, or should have, start(ed) */
+  unsigned     run_max;        /* concurrent queue runner limit */
+  unsigned     run_count;      /* current runners */
+
+  BOOL queue_run_force :1;
+  BOOL deliver_force_thaw :1;
+  BOOL queue_run_first_delivery :1;
+  BOOL queue_run_local :1;
+  BOOL queue_2stage :1;
+} qrunner;
+
 /* End of structs.h */
index 5d6e8fc2187d1a25f5951f041225a30c3465968a..90d87c927907e35bdf7e1edf691b296cfc163d2e 100644 (file)
@@ -104,8 +104,10 @@ exim -bp
 exim -bp -qGalternate
 ****
 #
+### move msg from default to third q
 exim -MG third $msg1
 ****
+### move msg from alternate q to third q
 exim -qGalternate -MG third $msg1
 ****
 ### third q
index 48e29b7614d24f005012acc06a3970f297b5a137..48cc6fd710f2b1639f4a3929683e03bfc4248662 100644 (file)
@@ -53,7 +53,9 @@
  0m   sss 10HmbC-0005vi-00 <CALLER@the.local.host.name>
           alternate@test.ex
 
+### move msg from default to third q
 Message 10HmbB-0005vi-00 
+### move msg from alternate q to third q
 Message 10HmbC-0005vi-00 
 ### third q
  0m   sss 10HmbB-0005vi-00 <CALLER@the.local.host.name>
@@ -79,6 +81,8 @@ Message 10HmbB-0005vi-00 Message 10HmbC-0005vi-00
 ### load messages
 ### default q
 ### alternate q
+### move msg from default to third q
+### move msg from alternate q to third q
 ### third q
 ### default q
 ### alternate q