Pipe transport: fix log_defer_output option. Bug 840
[exim.git] / src / src / deliver.c
index 6a3df89bb2170abbfac927b4e8b22c0ee0993a9d..3dfa84261e1f468f9b4515a609a8ad66f0f0d3ec 100644 (file)
@@ -1929,7 +1929,7 @@ address. This feature is not available for shadow transports. */
 
 if (  !shadowing
    && (  tp->return_output || tp->return_fail_output
-      || tp->log_output || tp->log_fail_output
+      || tp->log_output || tp->log_fail_output || tp->log_defer_output
    )  )
   {
   uschar *error;
@@ -2315,17 +2315,47 @@ if (addr->special_action == SPECIAL_WARN && addr->transport->warn_message)
 
 
 
-/* Put the chain of addrs on the defer list.  Retry will happen
-on the next queue run, earlier if triggered by a new message.
-Loop for the next set of addresses. */
+/* Check transport for the given concurrency limit.  Return TRUE if over
+the limit (or an expansion failure), else FALSE and if there was a limit,
+the key for the hints database used for the concurrency count. */
 
-static void
-deferlist_chain(address_item * addr)
+static BOOL
+tpt_parallel_check(transport_instance * tp, address_item * addr, uschar ** key)
 {
-address_item * next;
-for (next = addr; next->next; next = next->next) ;
-next->next = addr_defer;
-addr_defer = addr;
+unsigned max_parallel;
+
+if (!tp->max_parallel) return FALSE;
+
+max_parallel = (unsigned) expand_string_integer(tp->max_parallel, TRUE);
+if (expand_string_message)
+  {
+  log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
+       "in %s transport (%s): %s", tp->name, addr->address,
+       expand_string_message);
+  return TRUE;
+  }
+
+if (max_parallel > 0)
+  {
+  uschar * serialize_key = string_sprintf("tpt-serialize-%s", tp->name);
+  if (!enq_start(serialize_key, max_parallel))
+    {
+    address_item * next;
+    DEBUG(D_transport)
+      debug_printf("skipping tpt %s because concurrency limit %u reached\n",
+                 tp->name, max_parallel);
+    do
+      {
+      next = addr->next;
+      addr->message = US"concurrency limit reached for transport";
+      addr->basic_errno = ERRNO_TRETRY;
+      post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0);
+      } while ((addr = next));
+    return TRUE;
+    }
+  *key = serialize_key;
+  }
+return FALSE;
 }
 
 
@@ -2632,33 +2662,18 @@ while (addr_local)
   We use a hints DB entry, incremented here and decremented after
   the transport (and any shadow transport) completes. */
 
-  if (tp->max_parallel)
+  if (tpt_parallel_check(tp, addr, &serialize_key))
     {
-    int_eximarith_t max_parallel =
-      expand_string_integer(tp->max_parallel, TRUE);
     if (expand_string_message)
       {
       logflags |= LOG_PANIC;
-      log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
-            "in %s transport (%s): %s", tp->name, addr->address,
-            expand_string_message);
-      for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next)
+      do
+       {
+       addr = addr->next;
        post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0);
-      continue;
-      }
-    if (  max_parallel > 0
-       && !enq_start(
-           serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
-           (unsigned) max_parallel)
-       )
-      {
-      DEBUG(D_transport)
-       debug_printf("skipping tpt %s because parallelism limit %u reached\n",
-                   tp->name, (unsigned) max_parallel);
-
-      deferlist_chain(addr);
-      continue;
+       } while ((addr = addr2));
       }
+    continue;                  /* Loop for the next set of addresses. */
     }
 
 
@@ -4091,29 +4106,11 @@ for (delivery_count = 0; addr_remote; delivery_count++)
   The hints DB entry is decremented in par_reduce(), when we reap the
   transport process. */
 
-  if (tp->max_parallel)
-    {
-    int_eximarith_t max_parallel =
-      expand_string_integer(tp->max_parallel, TRUE);
-    if (expand_string_message)
-      {
-      panicmsg = expand_string_message;
+  if (tpt_parallel_check(tp, addr, &serialize_key))
+    if ((panicmsg = expand_string_message))
       goto panic_continue;
-      }
-    if (  max_parallel > 0
-       && !enq_start(
-           serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
-           (unsigned) max_parallel)
-       )
-      {
-      DEBUG(D_transport)
-       debug_printf("skipping tpt %s because parallelism limit %u reached\n",
-                   tp->name, (unsigned) max_parallel);
-
-      deferlist_chain(addr);
-      continue;
-      }
-    }
+    else
+      continue;                        /* Loop for the next set of addresses. */
 
   /* Set up the expansion variables for this set of addresses */
 
@@ -4207,7 +4204,11 @@ for (delivery_count = 0; addr_remote; delivery_count++)
         }
 
       else
-       deferlist_chain(addr);
+       {
+       while (next->next) next = next->next;
+       next->next = addr_defer;
+       addr_defer = addr;
+       }
 
       continue;
       }
@@ -6674,7 +6675,6 @@ if (addr_local)
 so just queue them all. */
 
 if (queue_run_local)
-  {
   while (addr_remote)
     {
     address_item *addr = addr_remote;
@@ -6684,7 +6684,6 @@ if (queue_run_local)
     addr->message = US"remote deliveries suppressed";
     (void)post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0);
     }
-  }
 
 /* Handle remote deliveries */