From 3070ceeeed0574aab5e69f5026b99ca19bdf2fcc Mon Sep 17 00:00:00 2001 From: Jeremy Harris Date: Tue, 13 Oct 2015 11:43:34 +0100 Subject: [PATCH] Log deferred deliveries for transport max_parallel --- src/src/deliver.c | 109 +++++++++++++++++++++++----------------------- src/src/macros.h | 1 + test/log/0611 | 2 + 3 files changed, 57 insertions(+), 55 deletions(-) diff --git a/src/src/deliver.c b/src/src/deliver.c index 6a3df89bb..87f9cfb06 100644 --- a/src/src/deliver.c +++ b/src/src/deliver.c @@ -2315,17 +2315,47 @@ if (addr->special_action == SPECIAL_WARN && addr->transport->warn_message) -/* Put the chain of addrs on the defer list. Retry will happen -on the next queue run, earlier if triggered by a new message. -Loop for the next set of addresses. */ +/* Check transport for the given concurrency limit. Return TRUE if over +the limit (or an expansion failure), else FALSE and if there was a limit, +the key for the hints database used for the concurrency count. */ -static void -deferlist_chain(address_item * addr) +static BOOL +tpt_parallel_check(transport_instance * tp, address_item * addr, uschar ** key) { -address_item * next; -for (next = addr; next->next; next = next->next) ; -next->next = addr_defer; -addr_defer = addr; +unsigned max_parallel; + +if (!tp->max_parallel) return FALSE; + +max_parallel = (unsigned) expand_string_integer(tp->max_parallel, TRUE); +if (expand_string_message) + { + log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option " + "in %s transport (%s): %s", tp->name, addr->address, + expand_string_message); + return TRUE; + } + +if (max_parallel > 0) + { + uschar * serialize_key = string_sprintf("tpt-serialize-%s", tp->name); + if (!enq_start(serialize_key, max_parallel)) + { + address_item * next; + DEBUG(D_transport) + debug_printf("skipping tpt %s because concurrency limit %u reached\n", + tp->name, max_parallel); + do + { + next = addr->next; + addr->message = US"concurrency limit reached for transport"; + addr->basic_errno = ERRNO_TRETRY; + post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0); + } while ((addr = next)); + return TRUE; + } + *key = serialize_key; + } +return FALSE; } @@ -2632,33 +2662,18 @@ while (addr_local) We use a hints DB entry, incremented here and decremented after the transport (and any shadow transport) completes. */ - if (tp->max_parallel) + if (tpt_parallel_check(tp, addr, &serialize_key)) { - int_eximarith_t max_parallel = - expand_string_integer(tp->max_parallel, TRUE); if (expand_string_message) { logflags |= LOG_PANIC; - log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option " - "in %s transport (%s): %s", tp->name, addr->address, - expand_string_message); - for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next) + do + { + addr = addr->next; post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0); - continue; - } - if ( max_parallel > 0 - && !enq_start( - serialize_key = string_sprintf("tpt-serialize-%s", tp->name), - (unsigned) max_parallel) - ) - { - DEBUG(D_transport) - debug_printf("skipping tpt %s because parallelism limit %u reached\n", - tp->name, (unsigned) max_parallel); - - deferlist_chain(addr); - continue; + } while ((addr = addr2)); } + continue; /* Loop for the next set of addresses. */ } @@ -4091,29 +4106,11 @@ for (delivery_count = 0; addr_remote; delivery_count++) The hints DB entry is decremented in par_reduce(), when we reap the transport process. */ - if (tp->max_parallel) - { - int_eximarith_t max_parallel = - expand_string_integer(tp->max_parallel, TRUE); - if (expand_string_message) - { - panicmsg = expand_string_message; + if (tpt_parallel_check(tp, addr, &serialize_key)) + if ((panicmsg = expand_string_message)) goto panic_continue; - } - if ( max_parallel > 0 - && !enq_start( - serialize_key = string_sprintf("tpt-serialize-%s", tp->name), - (unsigned) max_parallel) - ) - { - DEBUG(D_transport) - debug_printf("skipping tpt %s because parallelism limit %u reached\n", - tp->name, (unsigned) max_parallel); - - deferlist_chain(addr); - continue; - } - } + else + continue; /* Loop for the next set of addresses. */ /* Set up the expansion variables for this set of addresses */ @@ -4207,7 +4204,11 @@ for (delivery_count = 0; addr_remote; delivery_count++) } else - deferlist_chain(addr); + { + while (next->next) next = next->next; + next->next = addr_defer; + addr_defer = addr; + } continue; } @@ -6674,7 +6675,6 @@ if (addr_local) so just queue them all. */ if (queue_run_local) - { while (addr_remote) { address_item *addr = addr_remote; @@ -6684,7 +6684,6 @@ if (queue_run_local) addr->message = US"remote deliveries suppressed"; (void)post_process_one(addr, DEFER, LOG_MAIN, DTYPE_TRANSPORT, 0); } - } /* Handle remote deliveries */ diff --git a/src/src/macros.h b/src/src/macros.h index 0ce24f8cb..5a35a9b56 100644 --- a/src/src/macros.h +++ b/src/src/macros.h @@ -545,6 +545,7 @@ to conflict with system errno values. */ #define ERRNO_HRETRY (-53) /* Not time for any remote host */ #define ERRNO_LOCAL_ONLY (-54) /* Local-only delivery */ #define ERRNO_QUEUE_DOMAIN (-55) /* Domain in queue_domains */ +#define ERRNO_TRETRY (-56) /* Transport concurrency limit */ /* Special actions to take after failure or deferment. */ diff --git a/test/log/0611 b/test/log/0611 index 39c10bc12..f158a20e9 100644 --- a/test/log/0611 +++ b/test/log/0611 @@ -1,6 +1,7 @@ 1999-03-02 09:44:33 10HmaX-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for a b c 1999-03-02 09:44:33 exim x.yz daemon started: pid=pppp, no queue runs, listening for SMTP on port 1225 1999-03-02 09:44:33 Start queue run: pid=pppp +1999-03-02 09:44:33 10HmaX-0005vi-00 == c@test.ex R=rmt_client T=smtp defer (-56): concurrency limit reached for transport 1999-03-02 09:44:33 10HmaY-0005vi-00 <= CALLER@test.ex H=localhost (myhost.test.ex) [127.0.0.1] P=esmtp S=sss id=E10HmaX-0005vi-00@myhost.test.ex for a@test.ex 1999-03-02 09:44:33 10HmaY-0005vi-00 => :blackhole: R=server 1999-03-02 09:44:33 10HmaY-0005vi-00 Completed @@ -19,6 +20,7 @@ 1999-03-02 09:44:33 End queue run: pid=pppp 1999-03-02 09:44:33 10HmbB-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for y 1999-03-02 09:44:33 10HmbC-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for z +1999-03-02 09:44:33 10HmbC-0005vi-00 == z@test.ex R=lcl_client T=pipe defer (-56): concurrency limit reached for transport 1999-03-02 09:44:33 10HmbB-0005vi-00 => y R=lcl_client T=pipe 1999-03-02 09:44:33 10HmbB-0005vi-00 Completed 1999-03-02 09:44:33 Start queue run: pid=pppp -- 2.30.2