affect Exim's operation, with an unchanged configuration file. For new
options, and new features, see the NewStuff file next to this ChangeLog.
+Since version 4.98
+------------------
+
+JH/01 Use fewer forks & execs for sending many messages to a single host.
+ By passing back the next message-id from the transport to the delivery
+ process, we can loop there. A two-phase queue run will benefit,
+ particularly for mailinglist and smarthost cases.
+
Exim version 4.98
-----------------
while (*ptr++) ;
break;
- /* Z marks the logical end of the data. It is followed by '0' if
+ /* Z0 marks the logical end of the data. It is followed by '0' if
continue_transport was NULL at the end of transporting, otherwise '1'.
We need to know when it becomes NULL during a delivery down a passed SMTP
channel so that we don't try to pass anything more down it. Of course, for
- most normal messages it will remain NULL all the time. */
+ most normal messages it will remain NULL all the time.
+
+ Z1 is a suggested message_id to handle next, used during a
+ continued-transport sequence. */
case 'Z':
- if (*ptr == '0')
+ switch (*subid)
{
- continue_transport = NULL;
- continue_hostname = NULL;
+ case '0':
+ if (*ptr == '0')
+ {
+ continue_transport = NULL;
+ continue_hostname = NULL;
+ }
+ done = TRUE;
+ DEBUG(D_deliver) debug_printf("Z0%c item read\n", *ptr);
+ break;
+ case '1':
+ if (continue_hostname)
+ Ustrncpy(continue_next_id, ptr, MESSAGE_ID_LENGTH);
+ DEBUG(D_deliver) debug_printf("continue_next_id: %s%s\n",
+ continue_next_id, continue_hostname ? "" : " (ignored)");
+ break;
}
- done = TRUE;
- DEBUG(D_deliver) debug_printf("Z0%c item read\n", *ptr);
break;
/* Anything else is a disaster. */
is flagged by an identifying byte, and is then in a fixed format (with
strings terminated by zeros), and there is a final terminator at the
end. The host information and retry information is all attached to
- the first address, so that gets sent at the start. */
+ the first address, so that gets sent at the start.
+
+ Result item tags:
+ A C D H I K L P R S T X Z
+ */
/* Host unusability information: for most success cases this will
be null. */
{
if (!h->address || h->status < hstatus_unusable) continue;
sprintf(CS big_buffer, "%c%c%s", h->status, h->why, h->address);
- rmt_dlv_checked_write(fd, 'H', '0', big_buffer, Ustrlen(big_buffer+2) + 3);
+ rmt_dlv_checked_write(fd, 'H','0', big_buffer, Ustrlen(big_buffer+2) + 3);
}
/* The number of bytes written. This is the same for each address. Even
rmt_dlv_checked_write(fd, 'I', '0', big_buffer, ptr - big_buffer);
}
+ /* Continuation message-id */
+ if (*continue_next_id)
+ rmt_dlv_checked_write(fd, 'Z', '1', continue_next_id, MESSAGE_ID_LENGTH);
+
/* Add termination flag, close the pipe, and that's it. The character
- after 'Z' indicates whether continue_transport is now NULL or not.
+ after "Z0" indicates whether continue_transport is now NULL or not.
A change from non-NULL to NULL indicates a problem with a continuing
connection. */
*/
static void
-do_duplicate_check(address_item **anchor)
+do_duplicate_check(address_item ** anchor)
{
-address_item *addr;
+address_item * addr;
while ((addr = *anchor))
{
- tree_node *tnode;
+ tree_node * tnode;
if (testflag(addr, af_pfr))
- {
- anchor = &(addr->next);
- }
+ anchor = &addr->next;
else if ((tnode = tree_search(tree_duplicates, addr->unique)))
{
DEBUG(D_deliver|D_route)
else
{
tree_add_duplicate(addr->unique, addr);
- anchor = &(addr->next);
+ anchor = &addr->next;
}
}
}
int
deliver_message(const uschar * id, BOOL forced, BOOL give_up)
{
-int i, rc;
-int final_yield = DELIVER_ATTEMPTED_NORMAL;
-time_t now = time(NULL);
-address_item *addr_last = NULL;
-uschar *filter_message = NULL;
-int process_recipients = RECIP_ACCEPT;
-open_db dbblock;
-open_db *dbm_file;
+int i, rc, final_yield, process_recipients;
+time_t now;
+address_item * addr_last;
+uschar * filter_message, * info;
+open_db dbblock, * dbm_file;
extern int acl_where;
-uschar *info;
+CONTINUED_ID:
+
+final_yield = DELIVER_ATTEMPTED_NORMAL;
+now = time(NULL);
+addr_last = NULL;
+filter_message = NULL;
+process_recipients = RECIP_ACCEPT;
#ifdef MEASURE_TIMING
report_time_since(×tamp_startup, US"delivery start"); /* testcase 0022, 2100 */
report_time_since(×tamp_startup, US"delivery end"); /* testcase 0005 */
#endif
+/* If the transport suggested another message to deliver, go round again. */
+
+if (final_yield == DELIVER_ATTEMPTED_NORMAL && *continue_next_id)
+ {
+ tree_duplicates = NULL; /* discard dups info from old message */
+ id = string_copyn(continue_next_id, MESSAGE_ID_LENGTH);
+ continue_next_id[0] = '\0';
+ goto CONTINUED_ID;
+ }
+
/* It is unlikely that there will be any cached resources, since they are
released after routing, and in the delivery subprocesses. However, it's
possible for an expansion for something afterwards (for example,
uschar *continue_proxy_sni = NULL;
uschar *continue_hostname = NULL;
uschar *continue_host_address = NULL;
+uschar continue_next_id[MESSAGE_ID_LENGTH +1] = {[0]='\0'};
int continue_sequence = 1;
uschar *continue_transport = NULL;
#ifndef DISABLE_ESMTP_LIMITS
extern uschar *continue_proxy_sni; /* proxied conn SNI */
extern uschar *continue_hostname; /* Host for continued delivery */
extern uschar *continue_host_address; /* IP address for ditto */
+extern uschar continue_next_id[]; /* Next message_id from hintsdb */
extern int continue_sequence; /* Sequence num for continued delivery */
extern uschar *continue_transport; /* Transport for continued delivery */
#ifndef DISABLE_ESMTP_LIMITS
void
transport_update_waiting(host_item * hostlist, uschar * tpname)
{
-const uschar *prevname = US"";
-open_db dbblock;
-open_db *dbm_file;
+const uschar * prevname = US"";
+open_db dbblock, * dbp;
if (!is_new_message_id(message_id))
{
/* Open the database for this transport */
-if (!(dbm_file = dbfn_open(string_sprintf("wait-%.200s", tpname),
+if (!(dbp = dbfn_open(string_sprintf("wait-%.200s", tpname),
O_RDWR, &dbblock, TRUE, TRUE)))
return;
/* Look up the host record; if there isn't one, make an empty one. */
- if (!(host_record = dbfn_read(dbm_file, host->name)))
+ if (!(host_record = dbfn_read(dbp, host->name)))
{
host_record = store_get(sizeof(dbdata_wait) + MESSAGE_ID_LENGTH, GET_UNTAINTED);
host_record->count = host_record->sequence = 0;
debug_printf_indent("NOTE: old or corrupt message-id found in wait=%.200s"
" hints DB; deleting records for %s\n", tpname, host->name);
- (void) dbfn_delete(dbm_file, host->name);
+ (void) dbfn_delete(dbp, host->name);
for (int i = host_record->sequence - 1; i >= 0; i--)
- (void) dbfn_delete(dbm_file,
+ (void) dbfn_delete(dbp,
(sprintf(CS buffer, "%.200s:%d", host->name, i), buffer));
host_record->count = host_record->sequence = 0;
{
dbdata_wait *cont;
sprintf(CS buffer, "%.200s:%d", host->name, i);
- if ((cont = dbfn_read(dbm_file, buffer)))
+ if ((cont = dbfn_read(dbp, buffer)))
{
int clen = cont->count * MESSAGE_ID_LENGTH;
for (uschar * s = cont->text; s < cont->text + clen; s += MESSAGE_ID_LENGTH)
if (host_record->count >= WAIT_NAME_MAX)
{
sprintf(CS buffer, "%.200s:%d", host->name, host_record->sequence);
- dbfn_write(dbm_file, buffer, host_record, sizeof(dbdata_wait) + host_length);
+ dbfn_write(dbp, buffer, host_record, sizeof(dbdata_wait) + host_length);
#ifndef DISABLE_QUEUE_RAMP
if (f.queue_2stage && queue_fast_ramp && !queue_run_in_order)
queue_notify_daemon(message_id);
/* Update the database */
- dbfn_write(dbm_file, host->name, host_record, sizeof(dbdata_wait) + host_length);
+ dbfn_write(dbp, host->name, host_record, sizeof(dbdata_wait) + host_length);
DEBUG(D_transport) debug_printf("added %.*s to queue for %s\n",
MESSAGE_ID_LENGTH, message_id, host->name);
}
/* All now done */
-dbfn_close(dbm_file);
+dbfn_close(dbp);
}
{
dbdata_wait * host_record;
int host_length;
-open_db dbblock;
-open_db * dbm_file;
+open_db dbblock, * dbp;
int i;
struct stat statbuf;
/* Open the waiting information database. */
-if (!(dbm_file = dbfn_open(string_sprintf("wait-%.200s", transport_name),
+if (!(dbp = dbfn_open(string_sprintf("wait-%.200s", transport_name),
O_RDWR, &dbblock, TRUE, TRUE)))
goto retfalse;
/* See if there is a record for this host; if not, there's nothing to do. */
-if (!(host_record = dbfn_read(dbm_file, hostname)))
+if (!(host_record = dbfn_read(dbp, hostname)))
{
- dbfn_close(dbm_file);
DEBUG(D_transport) debug_printf_indent("no messages waiting for %s\n", hostname);
- goto retfalse;
+ goto dbclose_false;
}
/* If the data in the record looks corrupt, just log something and
if (host_record->count > WAIT_NAME_MAX)
{
- dbfn_close(dbm_file);
log_write(0, LOG_MAIN|LOG_PANIC, "smtp-wait database entry for %s has bad "
"count=%d (max=%d)", hostname, host_record->count, WAIT_NAME_MAX);
- goto retfalse;
+ goto dbclose_false;
}
/* Scan the message ids in the record from the end towards the beginning,
DEBUG(D_hints_lookup)
debug_printf_indent("NOTE: old or corrupt message-id found in wait=%.200s"
" hints DB; deleting records for %s\n", transport_name, hostname);
- (void) dbfn_delete(dbm_file, hostname);
+ (void) dbfn_delete(dbp, hostname);
for (int i = host_record->sequence - 1; i >= 0; i--)
- (void) dbfn_delete(dbm_file,
+ (void) dbfn_delete(dbp,
(sprintf(CS buffer, "%.200s:%d", hostname, i), buffer));
- dbfn_close(dbm_file);
- goto retfalse;
+ goto dbclose_false;
}
msgq[i].bKeep = TRUE;
for (int i = host_record->sequence - 1; i >= 0 && !newr; i--)
{
sprintf(CS buffer, "%.200s:%d", hostname, i);
- newr = dbfn_read(dbm_file, buffer);
+ newr = dbfn_read(dbp, buffer);
}
/* If no continuation, delete the current and break the loop */
if (!newr)
{
- dbfn_delete(dbm_file, hostname);
+ dbfn_delete(dbp, hostname);
break;
}
/* Else replace the current with the continuation */
- dbfn_delete(dbm_file, buffer);
+ dbfn_delete(dbp, buffer);
host_record = newr;
host_length = host_record->count * MESSAGE_ID_LENGTH;
if (host_length <= 0)
{
- dbfn_close(dbm_file);
DEBUG(D_transport) debug_printf_indent("waiting messages already delivered\n");
- goto retfalse;
+ goto dbclose_false;
}
/* we were not able to find an acceptable message, nor was there a
if (!bContinuation)
{
Ustrcpy(new_message_id, message_id);
- dbfn_close(dbm_file);
- goto retfalse;
+ goto dbclose_false;
}
} /* we need to process a continuation record */
if (host_length > 0)
{
host_record->count = host_length/MESSAGE_ID_LENGTH;
- dbfn_write(dbm_file, hostname, host_record, (int)sizeof(dbdata_wait) + host_length);
+ dbfn_write(dbp, hostname, host_record, (int)sizeof(dbdata_wait) + host_length);
}
-dbfn_close(dbm_file);
+dbfn_close(dbp);
+
DEBUG(D_transport)
{
acl_level--;
}
return TRUE;
+dbclose_false:
+ dbfn_close(dbp);
+
retfalse:
-DEBUG(D_transport) {acl_level--; debug_printf("transport_check_waiting: FALSE\n"); }
-return FALSE;
+ DEBUG(D_transport)
+ {acl_level--; debug_printf("transport_check_waiting: FALSE\n"); }
+ return FALSE;
}
/*************************************************
int rc;
uschar *message = NULL;
-uschar new_message_id[MESSAGE_ID_LENGTH + 1];
smtp_context * sx = store_get(sizeof(*sx), GET_TAINTED); /* tainted, for the data buffers */
BOOL pass_message = FALSE;
#ifndef DISABLE_ESMTP_LIMITS
#ifdef SUPPORT_DANE
BOOL dane_held;
#endif
-BOOL tcw_done = FALSE, tcw = FALSE;
+BOOL tcw_done = FALSE, tcw = FALSE, passback_tcw = FALSE;
*message_defer = FALSE;
+continue_next_id[0] = '\0';
memset(sx, 0, sizeof(*sx));
sx->addrlist = addrlist;
&&
#endif
transport_check_waiting(tblock->name, host->name,
- tblock->connection_max_messages, new_message_id,
+ tblock->connection_max_messages, continue_next_id,
(oicf)smtp_are_same_identities, (void*)&t_compare);
if (!tcw)
{
smtp_compare_t t_compare =
{.tblock = tblock, .current_sender_address = sender_address};
- if ( sx->first_addr /* more addrs for this message */
- || f.continue_more /* more addrs for continued-host */
- || tcw_done && tcw /* more messages for host */
+ if ( sx->first_addr /* more addrs for this message */
+ || f.continue_more /* more addrs for continued-host */
+ || tcw_done && tcw /* more messages for host */
|| (
#ifndef DISABLE_TLS
( tls_out.active.sock < 0 && !continue_proxy_cipher
&&
#endif
transport_check_waiting(tblock->name, host->name,
- sx->max_mail, new_message_id,
+ sx->max_mail, continue_next_id,
(oicf)smtp_are_same_identities, (void*)&t_compare)
) )
{
goto SEND_MESSAGE;
}
+ /* If there is a next-message-id from the wait-transport hintsdb,
+ pretend caller said it has further message for us. Note that we lose
+ the TLS session (below), and that our caller will pass back the id to
+ the delivery process. If not, remember to later cancel the
+ next-message-id so that the transport-caller code (in deliver.c) does
+ not report it back up the pipe to the delivery process.
+ XXX It would be feasible to also report the other continue_* with the
+ _id - taking out the exec for the first continued-transport. But the
+ actual conn, and it's fd, is a problem. Maybe replace the transport
+ pipe with a unix-domain socket? */
+
+ if (!f.continue_more && continue_hostname && *continue_next_id)
+ f.continue_more = passback_tcw = TRUE;
+
/* Unless caller said it already has more messages listed for this host,
pass the connection on to a new Exim process (below, the call to
transport_pass_socket). If the caller has more ready, just return with
been used, which we do under TLSv1.3 for the gsasl SCRAM*PLUS methods.
But we were always doing it anyway. */
- tls_close(sx->cctx.tls_ctx,
- sx->send_tlsclose ? TLS_SHUTDOWN_WAIT : TLS_SHUTDOWN_WONLY);
- sx->send_tlsclose = FALSE;
- sx->cctx.tls_ctx = NULL;
- tls_out.active.sock = -1;
- smtp_peer_options = smtp_peer_options_wrap;
- sx->ok = !sx->smtps
- && smtp_write_command(sx, SCMD_FLUSH, "EHLO %s\r\n", sx->helo_data)
- >= 0
- && smtp_read_response(sx, sx->buffer, sizeof(sx->buffer),
- '2', ob->command_timeout);
+ tls_close(sx->cctx.tls_ctx,
+ sx->send_tlsclose ? TLS_SHUTDOWN_WAIT : TLS_SHUTDOWN_WONLY);
+ sx->send_tlsclose = FALSE;
+ sx->cctx.tls_ctx = NULL;
+ tls_out.active.sock = -1;
+ smtp_peer_options = smtp_peer_options_wrap;
+ sx->ok = !sx->smtps
+ && smtp_write_command(sx, SCMD_FLUSH, "EHLO %s\r\n", sx->helo_data)
+ >= 0
+ && smtp_read_response(sx, sx->buffer, sizeof(sx->buffer),
+ '2', ob->command_timeout);
if (sx->ok && f.continue_more)
goto TIDYUP; /* More addresses for another run */
propagate it from the initial
*/
if (sx->ok && transport_pass_socket(tblock->name, host->name,
- host->address, new_message_id, socket_fd
+ host->address, continue_next_id, socket_fd
#ifndef DISABLE_ESMTP_LIMITS
, sx->peer_limit_mail, sx->peer_limit_rcpt, sx->peer_limit_rcptdom
#endif
}
#endif
-return yield;
+OUT:
+ if (!passback_tcw) continue_next_id[0] = '\0';
+ return yield;
TIDYUP:
#ifdef SUPPORT_DANE
-if (dane_held) for (address_item * a = sx->addrlist->next; a; a = a->next)
- if (a->transport_return == DANE)
- a->transport_return = PENDING_DEFER;
+ if (dane_held) for (address_item * a = sx->addrlist->next; a; a = a->next)
+ if (a->transport_return == DANE)
+ a->transport_return = PENDING_DEFER;
#endif
-return yield;
+ goto OUT;
}