}
}
-/*XXX prefer to do max_parallel check before we fork. Are we allowed to defer
-this late (we could be a shadow tpt)? */
-
/* Create the pipe for inter-process communication. */
if (pipe(pfd) != 0)
+
+/* Put the chain of addrs on the defer list. Retry will happen
+on the next queue run, earlier if triggered by a new message.
+Loop for the next set of addresses. */
+
+static void
+deferlist_chain(address_item * addr)
+{
+address_item * next;
+for (next = addr; next->next; next = next->next) ;
+next->next = addr_defer;
+addr_defer = addr;
+}
+
+
+
/*************************************************
* Do local deliveries *
*************************************************/
int logflags = LOG_MAIN;
int logchar = dont_deliver? '*' : '=';
transport_instance *tp;
+ uschar * serialize_key = NULL;
/* Pick the first undelivered address off the chain */
last = next;
batch_count++;
}
- else anchor = &(next->next); /* Skip the address */
+ else anchor = &next->next; /* Skip the address */
}
}
if (!addr) continue;
+ /* If the transport is limited for parallellism, enforce that here.
+ We use a hints DB entry, incremented here and decremented after
+ the transport (and any shadow transport) completes. */
+
+ if (tp->max_parallel)
+ {
+ int_eximarith_t max_parallel =
+ expand_string_integer(tp->max_parallel, TRUE);
+ if (expand_string_message)
+ {
+ logflags |= LOG_PANIC;
+ log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
+ "in %s transport (%s): %s", tp->name, addr->address,
+ expand_string_message);
+ for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next)
+ post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0);
+ continue;
+ }
+ if ( max_parallel > 0
+ && !enq_start(
+ serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
+ (unsigned) max_parallel)
+ )
+ {
+ DEBUG(D_transport)
+ debug_printf("skipping tpt %s because parallelism limit %u reached\n",
+ tp->name, (unsigned) max_parallel);
+
+ deferlist_chain(addr);
+ continue;
+ }
+ }
+
+
/* So, finally, we do have some addresses that can be passed to the
transport. Before doing so, set up variables that are relevant to a
single delivery. */
deliver_set_expansions(NULL);
+ /* If the transport was parallelism-limited, decrement the hints DB record. */
+
+ if (serialize_key) enq_end(serialize_key);
+
/* Now we can process the results of the real transport. We must take each
address off the chain first, because post_process_one() puts it on another
chain. */
"remote delivery process count got out of step");
parcount = 0;
}
- else remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+ else
+ {
+ transport_instance * tp = doneaddr->transport;
+ if (tp->max_parallel)
+ enq_end(string_sprintf("tpt-serialize-%s", tp->name));
+
+ remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+ }
}
}
address_item *last = addr;
address_item *next;
uschar * panicmsg;
+ uschar * serialize_key = NULL;
/* Pull the first address right off the list. */
return FALSE;
}
+ /* If the transport is limited for parallellism, enforce that here.
+ The hints DB entry is decremented in par_reduce(), when we reap the
+ transport process. */
+
+ if (tp->max_parallel)
+ {
+ int_eximarith_t max_parallel =
+ expand_string_integer(tp->max_parallel, TRUE);
+ if (expand_string_message)
+ {
+ panicmsg = expand_string_message;
+ goto panic_continue;
+ }
+ if ( max_parallel > 0
+ && !enq_start(
+ serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
+ (unsigned) max_parallel)
+ )
+ {
+ DEBUG(D_transport)
+ debug_printf("skipping tpt %s because parallelism limit %u reached\n",
+ tp->name, (unsigned) max_parallel);
+
+ deferlist_chain(addr);
+ continue;
+ }
+ }
+
/* Set up the expansion variables for this set of addresses */
deliver_set_expansions(addr);
{
panicmsg = string_sprintf("Failed to expand return path \"%s\": %s",
tp->return_path, expand_string_message);
- goto panic_continue;
+ goto enq_continue;
}
}
if (!findugid(addr, tp, &uid, &gid, &use_initgroups))
{
panicmsg = NULL;
- goto panic_continue;
+ goto enq_continue;
}
/* If this transport has a setup function, call it now so that it gets
if (!ok)
{
DEBUG(D_deliver) debug_printf("not suitable for continue_transport\n");
- next = addr;
+ if (serialize_key) enq_end(serialize_key);
if (addr->fallback_hosts && !fallback)
{
- for (;; next = next->next)
+ for (next = addr; ; next = next->next)
{
next->host_list = next->fallback_hosts;
DEBUG(D_deliver) debug_printf("%s queued for fallback host(s)\n", next->address);
}
else
- {
- while (next->next) next = next->next;
- next->next = addr_defer;
- addr_defer = addr;
- }
+ deferlist_chain(addr);
continue;
}
if (!pipe_done)
{
panicmsg = string_sprintf("unable to create pipe: %s", strerror(errno));
- goto panic_continue;
+ goto enq_continue;
}
/* Find a free slot in the pardata list. Must do this after the possible
(void)close(pfd[pipe_write]);
(void)close(pfd[pipe_read]);
panicmsg = US"Unexpectedly no free subprocess slot";
- goto panic_continue;
+ goto enq_continue;
}
/* Now fork a subprocess to do the remote delivery, but before doing so,
(void)close(pfd[pipe_read]);
panicmsg = string_sprintf("fork failed for remote delivery to %s: %s",
addr->domain, strerror(errno));
- goto panic_continue;
+ goto enq_continue;
}
/* Fork succeeded; increment the count, and remember relevant data for
continue;
+enq_continue:
+ if (serialize_key) enq_end(serialize_key);
panic_continue:
remote_post_process(addr, LOG_MAIN|LOG_PANIC, panicmsg, fallback);
continue;
#ifdef EXPERIMENTAL_EVENT
(void) event_raise(event_action, US"msg:complete", NULL);
#endif
-}
+ }
/* If there are deferred addresses, we are keeping this message because it is
not yet completed. Lose any temporary files that were catching output from