Named queues: support multiple queue-runners from single daemon
[exim.git] / src / src / queue.c
index f86e24b42e85019304a4c6fe802c9f566111893a..d01cde655d6b0dfdd126684c3c33c585b8a093e8 100644 (file)
@@ -325,8 +325,8 @@ previous lexically lesser one if the given stop message doesn't exist. Because
 a queue run can take some time, stat each file before forking, in case it has
 been delivered in the meantime by some other means.
 
-The global variables queue_run_force and queue_run_local may be set to cause
-forced deliveries or local-only deliveries, respectively.
+The qrun descriptor  variables queue_run_force and queue_run_local may be set to
+cause forced deliveries or local-only deliveries, respectively.
 
 If deliver_selectstring[_sender] is not NULL, skip messages whose recipients do
 not contain the string. As this option is typically used when a machine comes
@@ -339,6 +339,7 @@ is set so that routing is done for all messages. Thus in the second run those
 that are routed to the same host should go down the same SMTP connection.
 
 Arguments:
+  q         queue-runner descriptor
   start_id   message id to start at, or NULL for all
   stop_id    message id to end at, or NULL for all
   recurse    TRUE if recursing for 2-stage run
@@ -347,10 +348,10 @@ Returns:     nothing
 */
 
 void
-queue_run(uschar *start_id, uschar *stop_id, BOOL recurse)
+queue_run(qrunner * q, uschar * start_id, uschar * stop_id, BOOL recurse)
 {
-BOOL force_delivery = f.queue_run_force || deliver_selectstring != NULL ||
-  deliver_selectstring_sender != NULL;
+BOOL force_delivery = q->queue_run_force
+  || deliver_selectstring || deliver_selectstring_sender;
 const pcre2_code *selectstring_regex = NULL;
 const pcre2_code *selectstring_regex_sender = NULL;
 uschar *log_detail = NULL;
@@ -363,6 +364,13 @@ BOOL single_id = FALSE;
 report_time_since(&timestamp_startup, US"queue_run start");
 #endif
 
+/* Copy the legacy globals from the newer per-qrunner-desc */
+
+queue_name =           q->name ? q->name : US"";
+f.queue_2stage =        q->queue_2stage;
+f.deliver_force_thaw =  q->deliver_force_thaw;
+f.queue_run_local =     q->queue_run_local;
+
 /* Cancel any specific queue domains. Turn off the flag that causes SMTP
 deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag
 gets set. Save the queue_runner's pid and the flag that indicates any
@@ -371,7 +379,7 @@ on TCP/IP channels have queue_run_pid set, but not queue_running. */
 
 queue_domains = NULL;
 queue_smtp_domains = NULL;
-f.queue_smtp = f.queue_2stage;
+f.queue_smtp = q->queue_2stage;
 
 queue_run_pid = getpid();
 f.queue_running = TRUE;
@@ -383,11 +391,11 @@ if (!recurse)
   uschar extras[8];
   uschar *p = extras;
 
-  if (f.queue_2stage) *p++ = 'q';
-  if (f.queue_run_first_delivery) *p++ = 'i';
-  if (f.queue_run_force) *p++ = 'f';
-  if (f.deliver_force_thaw) *p++ = 'f';
-  if (f.queue_run_local) *p++ = 'l';
+  if (q->queue_2stage)         *p++ = 'q';
+  if (q->queue_run_first_delivery) *p++ = 'i';
+  if (q->queue_run_force)      *p++ = 'f';
+  if (q->deliver_force_thaw)   *p++ = 'f';
+  if (q->queue_run_local)      *p++ = 'l';
   *p = 0;
 
   p = big_buffer;
@@ -399,25 +407,25 @@ if (!recurse)
   if (deliver_selectstring)
     {
     snprintf(CS p, big_buffer_size - (p - big_buffer), " -R%s %s",
-      f.deliver_selectstring_regex? "r" : "", deliver_selectstring);
+      f.deliver_selectstring_regex ? "r" : "", deliver_selectstring);
     p += Ustrlen(CCS p);
     }
 
   if (deliver_selectstring_sender)
     {
     snprintf(CS p, big_buffer_size - (p - big_buffer), " -S%s %s",
-      f.deliver_selectstring_sender_regex? "r" : "", deliver_selectstring_sender);
+      f.deliver_selectstring_sender_regex ? "r" : "", deliver_selectstring_sender);
     p += Ustrlen(CCS p);
     }
 
   log_detail = string_copy(big_buffer);
-  if (*queue_name)
+  if (q->name)
     log_write(L_queue_run, LOG_MAIN, "Start '%s' queue run: %s",
-      queue_name, log_detail);
+      q->name, log_detail);
   else
     log_write(L_queue_run, LOG_MAIN, "Start queue run: %s", log_detail);
 
-  single_id = start_id && stop_id && !f.queue_2stage
+  single_id = start_id && stop_id && !q->queue_2stage
              && Ustrcmp(start_id, stop_id) == 0;
   }
 
@@ -474,7 +482,7 @@ for (int i = queue_run_in_order ? -1 : 0;
     /* Unless deliveries are forced, if deliver_queue_load_max is non-negative,
     check that the load average is low enough to permit deliveries. */
 
-    if (!f.queue_run_force && deliver_queue_load_max >= 0)
+    if (!q->queue_run_force && deliver_queue_load_max >= 0)
       if ((load_average = os_getloadavg()) > deliver_queue_load_max)
         {
         log_write(L_queue_run, LOG_MAIN, "Abandon queue run: %s (load %.2f, max %.2f)",
@@ -492,7 +500,7 @@ for (int i = queue_run_in_order ? -1 : 0;
     /* If initial of a 2-phase run, maintain a set of child procs
     to get disk parallelism */
 
-    if (f.queue_2stage && !queue_run_in_order)
+    if (q->queue_2stage && !queue_run_in_order)
       {
       int i;
       if (qpid[f.running_in_test_harness ? 0 : nelem(qpid) - 1])
@@ -530,7 +538,7 @@ for (int i = queue_run_in_order ? -1 : 0;
     message when many are not going to be delivered. */
 
     if (deliver_selectstring || deliver_selectstring_sender ||
-        f.queue_run_first_delivery)
+        q->queue_run_first_delivery)
       {
       BOOL wanted = TRUE;
       BOOL orig_dont_deliver = f.dont_deliver;
@@ -548,7 +556,7 @@ for (int i = queue_run_in_order ? -1 : 0;
       header file, we might as well do the freeze test now, and save forking
       another process. */
 
-      if (f.deliver_freeze && !f.deliver_force_thaw)
+      if (f.deliver_freeze && !q->deliver_force_thaw)
         {
         log_write(L_skip_delivery, LOG_MAIN, "Message is frozen");
         wanted = FALSE;
@@ -556,7 +564,7 @@ for (int i = queue_run_in_order ? -1 : 0;
 
       /* Check first_delivery in the case when there are no message logs. */
 
-      else if (f.queue_run_first_delivery && !f.deliver_firsttime)
+      else if (q->queue_run_first_delivery && !f.deliver_firsttime)
         {
         DEBUG(D_queue_run) debug_printf("%s: not first delivery\n", fq->text);
         wanted = FALSE;
@@ -688,7 +696,7 @@ single_item_retry:
     /* A zero return means a delivery was attempted; turn off the force flag
     for any subsequent calls unless queue_force is set. */
 
-    if (!(status & 0xffff)) force_delivery = f.queue_run_force;
+    if (!(status & 0xffff)) force_delivery = q->queue_run_force;
 
     /* If the process crashed, tell somebody */
 
@@ -723,13 +731,13 @@ single_item_retry:
     set_process_info("running queue");
 
     /* If initial of a 2-phase run, we are a child - so just exit */
-    if (f.queue_2stage && !queue_run_in_order)
+    if (q->queue_2stage && !queue_run_in_order)
       exim_exit(EXIT_SUCCESS);
 
     /* If we are in the test harness, and this is not the first of a 2-stage
     queue run, update fudged queue times. */
 
-    if (f.running_in_test_harness && !f.queue_2stage)
+    if (f.running_in_test_harness && !q->queue_2stage)
       {
       uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
       if (fqtnext) fudged_queue_times = fqtnext + 1;
@@ -740,7 +748,7 @@ single_item_retry:
 
   go_around:
     /* If initial of a 2-phase run, we are a child - so just exit */
-    if (f.queue_2stage && !queue_run_in_order)
+    if (q->queue_2stage && !queue_run_in_order)
       exim_exit(EXIT_SUCCESS);
     }                                  /* End loop for list of messages */
 
@@ -767,7 +775,7 @@ single_item_retry:
 /* If queue_2stage is true, we do it all again, with the 2stage flag
 turned off. */
 
-if (f.queue_2stage)
+if (q->queue_2stage)
   {
 
   /* wait for last children */
@@ -782,22 +790,40 @@ if (f.queue_2stage)
 #ifdef MEASURE_TIMING
   report_time_since(&timestamp_startup, US"queue_run 1st phase done");
 #endif
-  f.queue_2stage = FALSE;
-  queue_run(start_id, stop_id, TRUE);
+  q->queue_2stage = f.queue_2stage = FALSE;
+  queue_run(q, start_id, stop_id, TRUE);
   }
 
 /* At top level, log the end of the run. */
 
 if (!recurse)
-  if (*queue_name)
+  if (q->name)
     log_write(L_queue_run, LOG_MAIN, "End '%s' queue run: %s",
-      queue_name, log_detail);
+      q->name, log_detail);
   else
     log_write(L_queue_run, LOG_MAIN, "End queue run: %s", log_detail);
 }
 
 
 
+void
+single_queue_run(qrunner * q, uschar * start_id, uschar * stop_id)
+{
+DEBUG(D_queue_run) debug_printf("Single queue run%s%s%s%s\n",
+  start_id ? US" starting at " : US"",
+  start_id ? start_id: US"",
+  stop_id ?  US" stopping at " : US"",
+  stop_id ?  stop_id : US"");
+
+if (*queue_name)
+  set_process_info("running the '%s' queue (single queue run)", queue_name);
+else
+  set_process_info("running the queue (single queue run)");
+queue_run(q, start_id, stop_id, FALSE);
+}
+
+
+
 
 /************************************************
 *         Count messages on the queue           *
@@ -1552,20 +1578,22 @@ if (s)
 void
 queue_notify_daemon(const uschar * msgid)
 {
-uschar buf[MESSAGE_ID_LENGTH + 2];
+int bsize = 1 + MESSAGE_ID_LENGTH + 1 + Ustrlen(queue_name) + 1;
+uschar * buf = store_get(bsize, GET_UNTAINTED);
 int fd;
 
 DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid);
 
 buf[0] = NOTIFY_MSG_QRUN;
 memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1);
+Ustrcpy(buf+1+MESSAGE_ID_LENGTH+1, queue_name);
 
 if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0)
   {
   struct sockaddr_un sa_un = {.sun_family = AF_UNIX};
   ssize_t len = daemon_notifier_sockname(&sa_un);
 
-  if (sendto(fd, buf, sizeof(buf), 0, (struct sockaddr *)&sa_un, (socklen_t)len) < 0)
+  if (sendto(fd, buf, bsize, 0, (struct sockaddr *)&sa_un, (socklen_t)len) < 0)
     DEBUG(D_queue_run)
       debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno));
   close(fd);