Fix periodic queue runs. Bug 3046
authorJeremy Harris <jgh146exb@wizmail.org>
Fri, 15 Dec 2023 21:05:32 +0000 (21:05 +0000)
committerJeremy Harris <jgh146exb@wizmail.org>
Fri, 15 Dec 2023 21:12:20 +0000 (21:12 +0000)
Broken-by: 7d5055276a22
doc/doc-txt/ChangeLog
src/src/daemon.c
src/src/structs.h

index 85064cc8d8beea1651a86592099f635b49722f12..c46f3b8c02f4692640cbb08584cecbdcd7800785 100644 (file)
@@ -53,6 +53,11 @@ JH/10 Bug 3058: Ensure that a failing expansion in a router "set" option defers
       the routing operation.  Previously it would silently stop routing the
       message.
 
       the routing operation.  Previously it would silently stop routing the
       message.
 
+JH/11 Bug 3046: Fix queue-runs.  Previously, the arrivel of a notification or
+      info-request event close in time to a scheduled run timer could result in
+      the latter being missed, and no further queue scheduled runs being
+      initiated.  This ouwld be more likely on high-load systems.
+
 
 
 Exim version 4.97
 
 
 Exim version 4.97
index f2183c735f6acb3f9813cb8c01844491230cb9c0..aff05120ae870787fe3248fbf72c4d04dff10abe 100644 (file)
@@ -1258,10 +1258,9 @@ static const uschar * queuerun_msg_qname;
 
 /* The notifier socket has something to read. Pull the message from it, decode
 and do the action.
 
 /* The notifier socket has something to read. Pull the message from it, decode
 and do the action.
+*/
 
 
-Return TRUE if a sigalrm should be emulated */
-
-static BOOL
+static void
 daemon_notification(void)
 {
 uschar buf[256], cbuf[256];
 daemon_notification(void)
 {
 uschar buf[256], cbuf[256];
@@ -1277,8 +1276,8 @@ struct msghdr msg = { .msg_name = &sa_un,
 ssize_t sz;
 
 buf[sizeof(buf)-1] = 0;
 ssize_t sz;
 
 buf[sizeof(buf)-1] = 0;
-if ((sz = recvmsg(daemon_notifier_fd, &msg, 0)) <= 0) return FALSE;
-if (sz >= sizeof(buf)) return FALSE;
+if ((sz = recvmsg(daemon_notifier_fd, &msg, 0)) <= 0) return;
+if (sz >= sizeof(buf)) return;
 
 #ifdef notdef
 debug_printf("addrlen %d\n", msg.msg_namelen);
 
 #ifdef notdef
 debug_printf("addrlen %d\n", msg.msg_namelen);
@@ -1351,7 +1350,7 @@ switch (buf[0])
          : !buf[1+MESSAGE_ID_LENGTH+1]
         )
        { queuerun_msg_qname = q->name; break; }
          : !buf[1+MESSAGE_ID_LENGTH+1]
         )
        { queuerun_msg_qname = q->name; break; }
-    return TRUE;
+    return;
 #endif
 
   case NOTIFY_QUEUE_SIZE_REQ:
 #endif
 
   case NOTIFY_QUEUE_SIZE_REQ:
@@ -1373,7 +1372,7 @@ switch (buf[0])
     regex_at_daemon(buf);
     break;
   }
     regex_at_daemon(buf);
     break;
   }
-return FALSE;
+return;
 }
 
 
 }
 
 
@@ -1432,7 +1431,7 @@ for (qrunner * q = qrunners, * next; q; q = next)
   if (sorted)
     {
     qrunner ** p = &sorted;
   if (sorted)
     {
     qrunner ** p = &sorted;
-    for (qrunner * qq; qq = *p; p = &(qq->next))
+    for (qrunner * qq; qq = *p; p = &qq->next)
       if (  q->next_tick < qq->next_tick
         || q->next_tick == qq->next_tick && q->interval < qq->interval
         )
       if (  q->next_tick < qq->next_tick
         || q->next_tick == qq->next_tick && q->interval < qq->interval
         )
@@ -1451,6 +1450,13 @@ qrunners = sorted;
 return qrunners ? qrunners->next_tick - time(NULL) : 0;
 }
 
 return qrunners ? qrunners->next_tick - time(NULL) : 0;
 }
 
+/* See if we can do a queue run.  If policy limit permit, kick one off.
+If both notification and timer events are present, handle the former
+and leave the timer outstanding.
+
+Return the number of seconds until the next due runner.
+*/
+
 static int
 daemon_qrun(int local_queue_run_max, struct pollfd * fd_polls, int listen_socket_count)
 {
 static int
 daemon_qrun(int local_queue_run_max, struct pollfd * fd_polls, int listen_socket_count)
 {
@@ -1464,13 +1470,16 @@ DEBUG(D_any) debug_printf("%s received\n",
 enough queue runners on the go. If we are not running as root, a re-exec is
 required. In the calling process, restart the alamr timer for the next run.  */
 
 enough queue runners on the go. If we are not running as root, a re-exec is
 required. In the calling process, restart the alamr timer for the next run.  */
 
-if (is_multiple_qrun())
+if (is_multiple_qrun())                                /* we are managing periodic runs */
   if (local_queue_run_max <= 0 || queue_run_count < local_queue_run_max)
     {
     qrunner * q = NULL;
 
 #ifndef DISABLE_QUEUE_RAMP
   if (local_queue_run_max <= 0 || queue_run_count < local_queue_run_max)
     {
     qrunner * q = NULL;
 
 #ifndef DISABLE_QUEUE_RAMP
-    if (*queuerun_msgid)       /* See if we can start another runner for this queue */
+    /* If this is a triggered run for a specific message, see if we can start
+    another runner for this queue. */
+
+    if (*queuerun_msgid)
       {
       for (qrunner * qq = qrunners; qq; qq = qq->next)
        if (qq->name == queuerun_msg_qname)
       {
       for (qrunner * qq = qrunners; qq; qq = qq->next)
        if (qq->name == queuerun_msg_qname)
@@ -1481,13 +1490,13 @@ if (is_multiple_qrun())
       }
     else
 #endif
       }
     else
 #endif
-      /* In order of run priority, find the first queue for which we can start
-      a runner */
+      /* Normal periodic runL in order of run priority, find the first queue
+      for which we can start a runner */
 
       for (q = qrunners; q; q = q->next)
        if (q->run_count < q->run_max) break;
 
 
       for (q = qrunners; q; q = q->next)
        if (q->run_count < q->run_max) break;
 
-    if (q)
+    if (q)                                     /* found a queue to run */
       {
       pid_t pid;
 
       {
       pid_t pid;
 
@@ -1619,19 +1628,23 @@ if (is_multiple_qrun())
       }
     }
 
       }
     }
 
-sigalrm_seen = FALSE;
+/* The queue run has been initiated (unless we were already running enough) */
+
 #ifndef DISABLE_QUEUE_RAMP
 #ifndef DISABLE_QUEUE_RAMP
-if (*queuerun_msgid)           /* it was a fast-ramp kick */
+if (*queuerun_msgid)           /* it was a fast-ramp kick; dealt with */
   *queuerun_msgid = 0;
 else                           /* periodic or one-time queue run */
 #endif
   *queuerun_msgid = 0;
 else                           /* periodic or one-time queue run */
 #endif
-  {            /* Impose a minimum 1s tick, even when a run was outstanding */
+  /* Set up next timer callback. Impose a minimum 1s tick,
+  even when a run was outstanding */
+  {
   int interval = next_qrunner_interval();
   if (interval <= 0) interval = 1;
 
   int interval = next_qrunner_interval();
   if (interval <= 0) interval = 1;
 
+  sigalrm_seen = FALSE;
   if (qrunners)                        /* there are still periodic qrunners */
     {
   if (qrunners)                        /* there are still periodic qrunners */
     {
-    ALARM(interval);
+    ALARM(interval);           /* set up next qrun tick */
     return interval;
     }
   }
     return interval;
     }
   }
@@ -2612,7 +2625,7 @@ for (;;)
 
   The other option is that we have an inetd wait timeout specified to -bw. */
 
 
   The other option is that we have an inetd wait timeout specified to -bw. */
 
-  if (sigalrm_seen)
+  if (sigalrm_seen || *queuerun_msgid)
     if (inetd_wait_timeout > 0)
       daemon_inetd_wtimeout(last_connection_time);     /* Might not return */
     else
     if (inetd_wait_timeout > 0)
       daemon_inetd_wtimeout(last_connection_time);     /* Might not return */
     else
@@ -2641,7 +2654,9 @@ for (;;)
     select() was interrupted so that we reap the child. This might still leave
     a small window when a SIGCHLD could get lost. However, since we use SIGCHLD
     only to do the reaping more quickly, it shouldn't result in anything other
     select() was interrupted so that we reap the child. This might still leave
     a small window when a SIGCHLD could get lost. However, since we use SIGCHLD
     only to do the reaping more quickly, it shouldn't result in anything other
-    than a delay until something else causes a wake-up. */
+    than a delay until something else causes a wake-up.
+    For the normal case, wait for either a pollable fd (eg. new connection) or
+    or a SIGALRM (for a periodic queue run) */
 
     if (sigchld_seen)
       {
 
     if (sigchld_seen)
       {
@@ -2706,10 +2721,13 @@ for (;;)
          break;        /* to top of daemon loop */
          }
 #endif
          break;        /* to top of daemon loop */
          }
 #endif
+       /* Handle the daemon-notifier socket.  If it was a fast-ramp
+       notification then queuerun_msgid will have a nonzerolength string. */
+
        if (dnotify_poll && dnotify_poll->revents & POLLIN)
          {
          dnotify_poll->revents = 0;
        if (dnotify_poll && dnotify_poll->revents & POLLIN)
          {
          dnotify_poll->revents = 0;
-         sigalrm_seen = daemon_notification();
+         daemon_notification();
          break;        /* to top of daemon loop */
          }
        for (struct pollfd * p = fd_polls; p < fd_polls + listen_socket_count;
          break;        /* to top of daemon loop */
          }
        for (struct pollfd * p = fd_polls; p < fd_polls + listen_socket_count;
index 209d657c6294319a86303c8a0b0d0e4c1d77371a..256560ef897dd7fb59aa14dbbbe9e57bdb894ea1 100644 (file)
@@ -964,7 +964,7 @@ typedef struct qrunner {
   struct qrunner * next;       /* list sorted by next tick */
 
   uschar *     name;           /* NULL for the default queue */
   struct qrunner * next;       /* list sorted by next tick */
 
   uschar *     name;           /* NULL for the default queue */
-  unsigned     interval;       /* tick rate, seconds */
+  unsigned     interval;       /* tick rate, seconds. Zero for a one-time run */
   time_t       next_tick;      /* next run should, or should have, start(ed) */
   unsigned     run_max;        /* concurrent queue runner limit */
   unsigned     run_count;      /* current runners */
   time_t       next_tick;      /* next run should, or should have, start(ed) */
   unsigned     run_max;        /* concurrent queue runner limit */
   unsigned     run_count;      /* current runners */