max_parallel transport option
[exim.git] / src / src / deliver.c
index a1d16ecedb815d4b186c09f7e9c9d37aa3db22c6..6a3df89bb2170abbfac927b4e8b22c0ee0993a9d 100644 (file)
@@ -1945,9 +1945,6 @@ if (  !shadowing
     }
   }
 
-/*XXX prefer to do max_parallel check before we fork. Are we allowed to defer
-this late (we could be a shadow tpt)? */
-
 /* Create the pipe for inter-process communication. */
 
 if (pipe(pfd) != 0)
@@ -2317,6 +2314,22 @@ if (addr->special_action == SPECIAL_WARN && addr->transport->warn_message)
 
 
 
+
+/* Put the chain of addrs on the defer list.  Retry will happen
+on the next queue run, earlier if triggered by a new message.
+Loop for the next set of addresses. */
+
+static void
+deferlist_chain(address_item * addr)
+{
+address_item * next;
+for (next = addr; next->next; next = next->next) ;
+next->next = addr_defer;
+addr_defer = addr;
+}
+
+
+
 /*************************************************
 *              Do local deliveries               *
 *************************************************/
@@ -2348,6 +2361,7 @@ while (addr_local)
   int logflags = LOG_MAIN;
   int logchar = dont_deliver? '*' : '=';
   transport_instance *tp;
+  uschar * serialize_key = NULL;
 
   /* Pick the first undelivered address off the chain */
 
@@ -2483,7 +2497,7 @@ while (addr_local)
         last = next;
         batch_count++;
         }
-      else anchor = &(next->next);      /* Skip the address */
+      else anchor = &next->next;        /* Skip the address */
       }
     }
 
@@ -2614,6 +2628,40 @@ while (addr_local)
 
   if (!addr) continue;
 
+  /* If the transport is limited for parallellism, enforce that here.
+  We use a hints DB entry, incremented here and decremented after
+  the transport (and any shadow transport) completes. */
+
+  if (tp->max_parallel)
+    {
+    int_eximarith_t max_parallel =
+      expand_string_integer(tp->max_parallel, TRUE);
+    if (expand_string_message)
+      {
+      logflags |= LOG_PANIC;
+      log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
+            "in %s transport (%s): %s", tp->name, addr->address,
+            expand_string_message);
+      for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next)
+       post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0);
+      continue;
+      }
+    if (  max_parallel > 0
+       && !enq_start(
+           serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
+           (unsigned) max_parallel)
+       )
+      {
+      DEBUG(D_transport)
+       debug_printf("skipping tpt %s because parallelism limit %u reached\n",
+                   tp->name, (unsigned) max_parallel);
+
+      deferlist_chain(addr);
+      continue;
+      }
+    }
+
+
   /* So, finally, we do have some addresses that can be passed to the
   transport. Before doing so, set up variables that are relevant to a
   single delivery. */
@@ -2719,6 +2767,10 @@ while (addr_local)
 
   deliver_set_expansions(NULL);
 
+  /* If the transport was parallelism-limited, decrement the hints DB record. */
+
+  if (serialize_key) enq_end(serialize_key);
+
   /* Now we can process the results of the real transport. We must take each
   address off the chain first, because post_process_one() puts it on another
   chain. */
@@ -3730,7 +3782,14 @@ while (parcount > max)
       "remote delivery process count got out of step");
     parcount = 0;
     }
-  else remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+  else
+    {
+    transport_instance * tp = doneaddr->transport;
+    if (tp->max_parallel)
+      enq_end(string_sprintf("tpt-serialize-%s", tp->name));
+
+    remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+    }
   }
 }
 
@@ -3853,6 +3912,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
   address_item *last = addr;
   address_item *next;
   uschar * panicmsg;
+  uschar * serialize_key = NULL;
 
   /* Pull the first address right off the list. */
 
@@ -4027,6 +4087,34 @@ for (delivery_count = 0; addr_remote; delivery_count++)
     return FALSE;
     }
 
+  /* If the transport is limited for parallellism, enforce that here.
+  The hints DB entry is decremented in par_reduce(), when we reap the
+  transport process. */
+
+  if (tp->max_parallel)
+    {
+    int_eximarith_t max_parallel =
+      expand_string_integer(tp->max_parallel, TRUE);
+    if (expand_string_message)
+      {
+      panicmsg = expand_string_message;
+      goto panic_continue;
+      }
+    if (  max_parallel > 0
+       && !enq_start(
+           serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
+           (unsigned) max_parallel)
+       )
+      {
+      DEBUG(D_transport)
+       debug_printf("skipping tpt %s because parallelism limit %u reached\n",
+                   tp->name, (unsigned) max_parallel);
+
+      deferlist_chain(addr);
+      continue;
+      }
+    }
+
   /* Set up the expansion variables for this set of addresses */
 
   deliver_set_expansions(addr);
@@ -4055,7 +4143,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
       {
       panicmsg = string_sprintf("Failed to expand return path \"%s\": %s",
        tp->return_path, expand_string_message);
-      goto panic_continue;
+      goto enq_continue;
       }
     }
 
@@ -4066,7 +4154,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
   if (!findugid(addr, tp, &uid, &gid, &use_initgroups))
     {
     panicmsg = NULL;
-    goto panic_continue;
+    goto enq_continue;
     }
 
   /* If this transport has a setup function, call it now so that it gets
@@ -4104,11 +4192,11 @@ for (delivery_count = 0; addr_remote; delivery_count++)
     if (!ok)
       {
       DEBUG(D_deliver) debug_printf("not suitable for continue_transport\n");
-      next = addr;
+      if (serialize_key) enq_end(serialize_key);
 
       if (addr->fallback_hosts && !fallback)
         {
-        for (;; next = next->next)
+       for (next = addr; ; next = next->next)
           {
           next->host_list = next->fallback_hosts;
           DEBUG(D_deliver) debug_printf("%s queued for fallback host(s)\n", next->address);
@@ -4119,11 +4207,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
         }
 
       else
-        {
-        while (next->next) next = next->next;
-        next->next = addr_defer;
-        addr_defer = addr;
-        }
+       deferlist_chain(addr);
 
       continue;
       }
@@ -4185,7 +4269,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
   if (!pipe_done)
     {
     panicmsg = string_sprintf("unable to create pipe: %s", strerror(errno));
-    goto panic_continue;
+    goto enq_continue;
     }
 
   /* Find a free slot in the pardata list. Must do this after the possible
@@ -4203,7 +4287,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
     (void)close(pfd[pipe_write]);
     (void)close(pfd[pipe_read]);
     panicmsg = US"Unexpectedly no free subprocess slot";
-    goto panic_continue;
+    goto enq_continue;
     }
 
   /* Now fork a subprocess to do the remote delivery, but before doing so,
@@ -4532,7 +4616,7 @@ for (delivery_count = 0; addr_remote; delivery_count++)
     (void)close(pfd[pipe_read]);
     panicmsg = string_sprintf("fork failed for remote delivery to %s: %s",
         addr->domain, strerror(errno));
-    goto panic_continue;
+    goto enq_continue;
     }
 
   /* Fork succeeded; increment the count, and remember relevant data for
@@ -4567,6 +4651,8 @@ for (delivery_count = 0; addr_remote; delivery_count++)
 
   continue;
 
+enq_continue:
+  if (serialize_key) enq_end(serialize_key);
 panic_continue:
   remote_post_process(addr, LOG_MAIN|LOG_PANIC, panicmsg, fallback);
   continue;
@@ -7448,7 +7534,7 @@ if (!addr_defer)
 #ifdef EXPERIMENTAL_EVENT
   (void) event_raise(event_action, US"msg:complete", NULL);
 #endif
-}
+  }
 
 /* If there are deferred addresses, we are keeping this message because it is
 not yet completed. Lose any temporary files that were catching output from