waiting for it by the time it recovers, and sending them in a single SMTP
connection is clearly beneficial. Whenever a delivery to a remote host is
deferred,
-.cindex "hints database"
+.cindex "hints database" "deferred deliveries"
Exim makes a note in its hints database, and whenever a successful
SMTP delivery has happened, it looks to see if any other messages are waiting
for the same host. If any are found, they are sent over the same SMTP
to ensure that any additional groups associated with the uid are set up.
+.new
+.option max_parallel transports integer&!! unset
+.cindex limit "transport parallelism"
+.cindex transport "parallel processes"
+.cindex transport "concurrency limit"
+.cindex "delivery" "parallelism for transport"
+If this option is set and expands to an integer greater than zero
+it limits the number of concurrent runs of the transport.
+The control does not apply to shadow transports.
+
+.cindex "hints database" "transport concurrency control"
+Exim implements this control by means of a hints database in which a record is
+incremented whenever a transport process is beaing created. The record
+is decremented and possibly removed when the process terminates.
+Obviously there is scope for
+records to get left lying around if there is a system or program crash. To
+guard against this, Exim ignores any records that are more than six hours old.
+
+If you use this option, you should also arrange to delete the
+relevant hints database whenever your system reboots. The names of the files
+start with &_misc_& and they are kept in the &_spool/db_& directory. There
+may be one or two files, depending on the type of DBM in use. The same files
+are used for ETRN and smtp transport serialization.
+.wen
+
+
.option message_size_limit transports string&!! 0
.cindex "limit" "message size per transport"
.cindex "size" "of message, limit"
delivery, the two pipe transports may be run concurrently. You must ensure that
any pipe commands you set up are robust against this happening. If the commands
write to a file, the &%exim_lock%& utility might be of use.
+.new
+Alternatively the &%max_parallel%& option could be used with a value
+of "1" to enforce serialization.
+.wen
may be one or two files, depending on the type of DBM in use. The same files
are used for ETRN serialization.
+.new
+See also the &%max_parallel%& generic transport option.
+.wen
+
.option size_addition smtp integer 1024
.cindex "SMTP" "SIZE"
.next
Serializing delivery to a specific host (when &%serialize_hosts%& is set in an
&(smtp)& transport)
+.next
+Limiting the concurrency of specific transports (when &%max_parallel%& is set
+in a transport)
.endlist
2. New $callout_address variable records the address used for a spam=,
malware= or verify= callout.
+ 3. Transports now take a "max_parallel" option, to limit concurrency.
+
Version 4.86
------------
}
}
-/*XXX prefer to do max_parallel check before we fork. Are we allowed to defer
-this late (we could be a shadow tpt)? */
-
/* Create the pipe for inter-process communication. */
if (pipe(pfd) != 0)
+
+/* Put the chain of addrs on the defer list. Retry will happen
+on the next queue run, earlier if triggered by a new message.
+Loop for the next set of addresses. */
+
+static void
+deferlist_chain(address_item * addr)
+{
+address_item * next;
+for (next = addr; next->next; next = next->next) ;
+next->next = addr_defer;
+addr_defer = addr;
+}
+
+
+
/*************************************************
* Do local deliveries *
*************************************************/
int logflags = LOG_MAIN;
int logchar = dont_deliver? '*' : '=';
transport_instance *tp;
+ uschar * serialize_key = NULL;
/* Pick the first undelivered address off the chain */
last = next;
batch_count++;
}
- else anchor = &(next->next); /* Skip the address */
+ else anchor = &next->next; /* Skip the address */
}
}
if (!addr) continue;
+ /* If the transport is limited for parallellism, enforce that here.
+ We use a hints DB entry, incremented here and decremented after
+ the transport (and any shadow transport) completes. */
+
+ if (tp->max_parallel)
+ {
+ int_eximarith_t max_parallel =
+ expand_string_integer(tp->max_parallel, TRUE);
+ if (expand_string_message)
+ {
+ logflags |= LOG_PANIC;
+ log_write(0, LOG_MAIN|LOG_PANIC, "Failed to expand max_parallel option "
+ "in %s transport (%s): %s", tp->name, addr->address,
+ expand_string_message);
+ for (addr2 = addr->next; addr; addr = addr2, addr2 = addr2->next)
+ post_process_one(addr, DEFER, logflags, DTYPE_TRANSPORT, 0);
+ continue;
+ }
+ if ( max_parallel > 0
+ && !enq_start(
+ serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
+ (unsigned) max_parallel)
+ )
+ {
+ DEBUG(D_transport)
+ debug_printf("skipping tpt %s because parallelism limit %u reached\n",
+ tp->name, (unsigned) max_parallel);
+
+ deferlist_chain(addr);
+ continue;
+ }
+ }
+
+
/* So, finally, we do have some addresses that can be passed to the
transport. Before doing so, set up variables that are relevant to a
single delivery. */
deliver_set_expansions(NULL);
+ /* If the transport was parallelism-limited, decrement the hints DB record. */
+
+ if (serialize_key) enq_end(serialize_key);
+
/* Now we can process the results of the real transport. We must take each
address off the chain first, because post_process_one() puts it on another
chain. */
"remote delivery process count got out of step");
parcount = 0;
}
- else remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+ else
+ {
+ transport_instance * tp = doneaddr->transport;
+ if (tp->max_parallel)
+ enq_end(string_sprintf("tpt-serialize-%s", tp->name));
+
+ remote_post_process(doneaddr, LOG_MAIN, NULL, fallback);
+ }
}
}
address_item *last = addr;
address_item *next;
uschar * panicmsg;
+ uschar * serialize_key = NULL;
/* Pull the first address right off the list. */
return FALSE;
}
+ /* If the transport is limited for parallellism, enforce that here.
+ The hints DB entry is decremented in par_reduce(), when we reap the
+ transport process. */
+
+ if (tp->max_parallel)
+ {
+ int_eximarith_t max_parallel =
+ expand_string_integer(tp->max_parallel, TRUE);
+ if (expand_string_message)
+ {
+ panicmsg = expand_string_message;
+ goto panic_continue;
+ }
+ if ( max_parallel > 0
+ && !enq_start(
+ serialize_key = string_sprintf("tpt-serialize-%s", tp->name),
+ (unsigned) max_parallel)
+ )
+ {
+ DEBUG(D_transport)
+ debug_printf("skipping tpt %s because parallelism limit %u reached\n",
+ tp->name, (unsigned) max_parallel);
+
+ deferlist_chain(addr);
+ continue;
+ }
+ }
+
/* Set up the expansion variables for this set of addresses */
deliver_set_expansions(addr);
{
panicmsg = string_sprintf("Failed to expand return path \"%s\": %s",
tp->return_path, expand_string_message);
- goto panic_continue;
+ goto enq_continue;
}
}
if (!findugid(addr, tp, &uid, &gid, &use_initgroups))
{
panicmsg = NULL;
- goto panic_continue;
+ goto enq_continue;
}
/* If this transport has a setup function, call it now so that it gets
if (!ok)
{
DEBUG(D_deliver) debug_printf("not suitable for continue_transport\n");
- next = addr;
+ if (serialize_key) enq_end(serialize_key);
if (addr->fallback_hosts && !fallback)
{
- for (;; next = next->next)
+ for (next = addr; ; next = next->next)
{
next->host_list = next->fallback_hosts;
DEBUG(D_deliver) debug_printf("%s queued for fallback host(s)\n", next->address);
}
else
- {
- while (next->next) next = next->next;
- next->next = addr_defer;
- addr_defer = addr;
- }
+ deferlist_chain(addr);
continue;
}
if (!pipe_done)
{
panicmsg = string_sprintf("unable to create pipe: %s", strerror(errno));
- goto panic_continue;
+ goto enq_continue;
}
/* Find a free slot in the pardata list. Must do this after the possible
(void)close(pfd[pipe_write]);
(void)close(pfd[pipe_read]);
panicmsg = US"Unexpectedly no free subprocess slot";
- goto panic_continue;
+ goto enq_continue;
}
/* Now fork a subprocess to do the remote delivery, but before doing so,
(void)close(pfd[pipe_read]);
panicmsg = string_sprintf("fork failed for remote delivery to %s: %s",
addr->domain, strerror(errno));
- goto panic_continue;
+ goto enq_continue;
}
/* Fork succeeded; increment the count, and remember relevant data for
continue;
+enq_continue:
+ if (serialize_key) enq_end(serialize_key);
panic_continue:
remote_post_process(addr, LOG_MAIN|LOG_PANIC, panicmsg, fallback);
continue;
#ifdef EXPERIMENTAL_EVENT
(void) event_raise(event_action, US"msg:complete", NULL);
#endif
-}
+ }
/* If there are deferred addresses, we are keeping this message because it is
not yet completed. Lose any temporary files that were catching output from
NULL, /* remove_headers */
NULL, /* return_path */
NULL, /* debug_string */
+ NULL, /* max_parallel */
NULL, /* message_size_limit */
NULL, /* headers_rewrite */
NULL, /* rewrite_rules */
uschar *remove_headers; /* Remove these headers */
uschar *return_path; /* Overriding (rewriting) return path */
uschar *debug_string; /* Debugging output */
+ uschar *max_parallel; /* Number of concurrent instances */
uschar *message_size_limit; /* Biggest message this transport handles */
uschar *headers_rewrite; /* Rules for rewriting headers */
rewrite_rule *rewrite_rules; /* Parsed rewriting rules */
(void *)offsetof(transport_instance, home_dir) },
{ "initgroups", opt_bool|opt_public,
(void *)offsetof(transport_instance, initgroups) },
+ { "max_parallel", opt_stringptr|opt_public,
+ (void *)offsetof(transport_instance, max_parallel) },
{ "message_size_limit", opt_stringptr|opt_public,
(void *)offsetof(transport_instance, message_size_limit) },
{ "rcpt_include_affixes", opt_bool|opt_public,
# Exim test configuration 0288
+# serialize_hosts option on smtp transport
exim_path = EXIM_PATH
host_lookup_order = bydns
--- /dev/null
+# Exim test configuration 0611
+# max_parallel on transport
+
+SERVER=
+
+exim_path = EXIM_PATH
+host_lookup_order = bydns
+primary_hostname = myhost.test.ex
+spool_directory = DIR/spool
+log_file_path = DIR/spool/log/%slog
+gecos_pattern = ""
+gecos_name = CALLER_NAME
+
+# ----- Main settings -----
+
+qualify_domain = test.ex
+queue_run_in_order
+log_selector = +received_recipients
+
+acl_smtp_rcpt = accept ${if eq {SERVER}{server} {delay = 2s}}
+
+# ----- Routers -----
+
+begin routers
+
+server:
+ condition = ${if eq {SERVER}{server} {yes}{no}}
+ driver = redirect
+ data = :blackhole:
+
+rmt_client:
+ local_parts = a:b:c
+ driver = manualroute
+ route_list = * 127.0.0.1
+ self = send
+ transport = smtp
+
+lcl_client:
+ local_parts = x:y:z
+ driver = accept
+ transport = pipe
+
+# ----- Transports -----
+
+begin transports
+
+smtp:
+ driver = smtp
+ port = PORT_D
+ max_rcpt = 1
+ connection_max_messages = 1
+ max_parallel = 2
+
+pipe:
+ driver = pipe
+ command = "sleep 2; cat > /dev/null"
+ use_shell = true
+ max_parallel = 1
+
+# ----- Retry -----
+
+
+begin retry
+
+* * F,1h,10m
+
+
+# End
1999-03-02 09:44:33 10HmaX-0005vi-00 == b@test.ex R=all T=smtp defer (-53): connection limit reached for all hosts
1999-03-02 09:44:33 10HmaX-0005vi-00 => a@test.ex R=all T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK"
1999-03-02 09:44:33 End queue run: pid=pppp
+1999-03-02 09:44:33 Start queue run: pid=pppp
+1999-03-02 09:44:33 10HmaX-0005vi-00 => b@test.ex R=all T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK"
+1999-03-02 09:44:33 10HmaX-0005vi-00 Completed
+1999-03-02 09:44:33 End queue run: pid=pppp
--- /dev/null
+1999-03-02 09:44:33 10HmaX-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for a b c
+1999-03-02 09:44:33 exim x.yz daemon started: pid=pppp, no queue runs, listening for SMTP on port 1225
+1999-03-02 09:44:33 Start queue run: pid=pppp
+1999-03-02 09:44:33 10HmaY-0005vi-00 <= CALLER@test.ex H=localhost (myhost.test.ex) [127.0.0.1] P=esmtp S=sss id=E10HmaX-0005vi-00@myhost.test.ex for a@test.ex
+1999-03-02 09:44:33 10HmaY-0005vi-00 => :blackhole: <a@test.ex> R=server
+1999-03-02 09:44:33 10HmaY-0005vi-00 Completed
+1999-03-02 09:44:33 10HmaX-0005vi-00 => a@test.ex R=rmt_client T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK id=10HmaY-0005vi-00"
+1999-03-02 09:44:33 10HmaZ-0005vi-00 <= CALLER@test.ex H=localhost (myhost.test.ex) [127.0.0.1] P=esmtp S=sss id=E10HmaX-0005vi-00@myhost.test.ex for b@test.ex
+1999-03-02 09:44:33 10HmaZ-0005vi-00 => :blackhole: <b@test.ex> R=server
+1999-03-02 09:44:33 10HmaZ-0005vi-00 Completed
+1999-03-02 09:44:33 10HmaX-0005vi-00 => b@test.ex R=rmt_client T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK id=10HmaZ-0005vi-00"
+1999-03-02 09:44:33 End queue run: pid=pppp
+1999-03-02 09:44:33 Start queue run: pid=pppp
+1999-03-02 09:44:33 10HmbA-0005vi-00 <= CALLER@test.ex H=localhost (myhost.test.ex) [127.0.0.1] P=esmtp S=sss id=E10HmaX-0005vi-00@myhost.test.ex for c@test.ex
+1999-03-02 09:44:33 10HmbA-0005vi-00 => :blackhole: <c@test.ex> R=server
+1999-03-02 09:44:33 10HmbA-0005vi-00 Completed
+1999-03-02 09:44:33 10HmaX-0005vi-00 => c@test.ex R=rmt_client T=smtp H=127.0.0.1 [127.0.0.1] C="250 OK id=10HmbA-0005vi-00"
+1999-03-02 09:44:33 10HmaX-0005vi-00 Completed
+1999-03-02 09:44:33 End queue run: pid=pppp
+1999-03-02 09:44:33 10HmbB-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for y
+1999-03-02 09:44:33 10HmbC-0005vi-00 <= CALLER@test.ex U=CALLER P=local S=sss for z
+1999-03-02 09:44:33 10HmbB-0005vi-00 => y <y@test.ex> R=lcl_client T=pipe
+1999-03-02 09:44:33 10HmbB-0005vi-00 Completed
+1999-03-02 09:44:33 Start queue run: pid=pppp
+1999-03-02 09:44:33 10HmbC-0005vi-00 => z <z@test.ex> R=lcl_client T=pipe
+1999-03-02 09:44:33 10HmbC-0005vi-00 Completed
+1999-03-02 09:44:33 End queue run: pid=pppp
# serialize_hosts
need_ipv4
#
+# preload the spool
exim -odq a b
.
****
+#
+# a slow server as a test target
server PORT_S
220 ESMTP
EHLO
QUIT
250 OK
****
+#
+# First message should go; second does not wait for 1st complete
+# on same conn due to connection_max_messages, then is deferred
+# as second transport run aborted by serialize_hosts.
+exim -q
+****
+#
+# a server as a test target
+server PORT_S
+220 ESMTP
+EHLO
+250-OK
+250 HELP
+MAIL FROM:
+250 Sender OK
+RCPT TO:
+250 Recipient OK
+DATA
+354 Send data
+.
+250 OK
+QUIT
+250 OK
+****
+#
+# Remaining message on queue should go immediately; no delay
+# associated with retry rules
exim -q
****
-no_msglog_check
--- /dev/null
+# max_parallel on transport
+need_ipv4
+#
+# Remote transport:
+# preload the spool
+exim -odq a b c
+.
+****
+#
+# a slow server as a test target
+exim -DSERVER=server -bd -oX PORT_D
+****
+#
+# First and second messages should go, as separate conns due to
+# connection_max_messages, third is deferred
+# as third transport run denied by max_parallel
+exim -q
+****
+#
+#
+# Remaining message on queue should go immediately; no delay
+# associated with retry rules
+exim -q
+****
+killdaemon
+#
+########
+#
+#
+# Local transport:
+# Only one message should go as the transport takes a long
+# time and we set max_parallel=1 to serialize it
+exim y
+****
+exim z
+****
+#
+#
+sleep 3
+#
+# Remaining message on queue should go immediately; no delay
+# associated with retry rules
+exim -q
QUIT
250 OK
End of script
+Listening on port 1224 ...
+Connection request from [127.0.0.1]
+220 ESMTP
+EHLO myhost.test.ex
+250-OK
+250 HELP
+MAIL FROM:<CALLER@test.ex>
+250 Sender OK
+RCPT TO:<b@test.ex>
+250 Recipient OK
+DATA
+354 Send data
+Received: from CALLER by myhost.test.ex with local (Exim x.yz)
+ (envelope-from <CALLER@test.ex>)
+ id 10HmaX-0005vi-00; Tue, 2 Mar 1999 09:44:33 +0000
+Message-Id: <E10HmaX-0005vi-00@myhost.test.ex>
+From: CALLER_NAME <CALLER@test.ex>
+Date: Tue, 2 Mar 1999 09:44:33 +0000
+
+.
+250 OK
+QUIT
+250 OK
+End of script
headers_rewrite =
home_directory =
no_initgroups
+max_parallel =
message_size_limit =
no_rcpt_include_affixes
retry_use_local_part
headers_rewrite =
home_directory =
no_initgroups
+max_parallel =
message_size_limit =
no_rcpt_include_affixes
retry_use_local_part