Transports: pass back next id for continued-transport
authorJeremy Harris <jgh146exb@wizmail.org>
Mon, 17 Jun 2024 14:47:20 +0000 (15:47 +0100)
committerJeremy Harris <jgh146exb@wizmail.org>
Mon, 17 Jun 2024 18:33:17 +0000 (19:33 +0100)
doc/doc-txt/ChangeLog
src/src/deliver.c
src/src/globals.c
src/src/globals.h
src/src/transport.c
src/src/transports/smtp.c

index f154430f3d2481f823120fa17e3a6f9bfec2db80..dc1a02c837036008fa2f8c16fa4cdc1d4c8f4db7 100644 (file)
@@ -2,6 +2,14 @@ This document describes *changes* to previous versions, that might
 affect Exim's operation, with an unchanged configuration file.  For new
 options, and new features, see the NewStuff file next to this ChangeLog.
 
+Since version 4.98
+------------------
+
+JH/01 Use fewer forks & execs for sending many messages to a single host.
+      By passing back the next message-id from the transport to the delivery
+      process, we can loop there.  A two-phase queue run will benefit,
+      particularly for mailinglist and smarthost cases.
+
 Exim version 4.98
 -----------------
 
index eadc96d22e8b6890be4f76a6a494a57f553e3807..d35c20260497a15a1d7a1e97c01a94231c945ae2 100644 (file)
@@ -3684,20 +3684,34 @@ while (!done)
       while (*ptr++) ;
       break;
 
-    /* Z marks the logical end of the data. It is followed by '0' if
+    /* Z0 marks the logical end of the data. It is followed by '0' if
     continue_transport was NULL at the end of transporting, otherwise '1'.
     We need to know when it becomes NULL during a delivery down a passed SMTP
     channel so that we don't try to pass anything more down it. Of course, for
-    most normal messages it will remain NULL all the time. */
+    most normal messages it will remain NULL all the time.
+
+    Z1 is a suggested message_id to handle next, used during a
+    continued-transport sequence. */
 
     case 'Z':
-      if (*ptr == '0')
+      switch (*subid)
        {
-       continue_transport = NULL;
-       continue_hostname = NULL;
+       case '0':
+         if (*ptr == '0')
+           {
+           continue_transport = NULL;
+           continue_hostname = NULL;
+           }
+         done = TRUE;
+         DEBUG(D_deliver) debug_printf("Z0%c item read\n", *ptr);
+         break;
+       case '1':
+         if (continue_hostname)
+           Ustrncpy(continue_next_id, ptr, MESSAGE_ID_LENGTH);
+         DEBUG(D_deliver) debug_printf("continue_next_id: %s%s\n",
+           continue_next_id, continue_hostname ? "" : " (ignored)");
+         break;
        }
-      done = TRUE;
-      DEBUG(D_deliver) debug_printf("Z0%c item read\n", *ptr);
       break;
 
     /* Anything else is a disaster. */
@@ -4775,7 +4789,11 @@ all pipes, so I do not see a reason to use non-blocking IO here
     is flagged by an identifying byte, and is then in a fixed format (with
     strings terminated by zeros), and there is a final terminator at the
     end. The host information and retry information is all attached to
-    the first address, so that gets sent at the start. */
+    the first address, so that gets sent at the start.
+
+    Result item tags:
+      A C D H I K L P R S T X Z
+    */
 
     /* Host unusability information: for most success cases this will
     be null. */
@@ -4784,7 +4802,7 @@ all pipes, so I do not see a reason to use non-blocking IO here
       {
       if (!h->address || h->status < hstatus_unusable) continue;
       sprintf(CS big_buffer, "%c%c%s", h->status, h->why, h->address);
-      rmt_dlv_checked_write(fd, 'H', '0', big_buffer, Ustrlen(big_buffer+2) + 3);
+      rmt_dlv_checked_write(fd, 'H','0', big_buffer, Ustrlen(big_buffer+2) + 3);
       }
 
     /* The number of bytes written. This is the same for each address. Even
@@ -5017,8 +5035,12 @@ all pipes, so I do not see a reason to use non-blocking IO here
       rmt_dlv_checked_write(fd, 'I', '0', big_buffer, ptr - big_buffer);
       }
 
+    /* Continuation message-id */
+    if (*continue_next_id)
+      rmt_dlv_checked_write(fd, 'Z', '1', continue_next_id, MESSAGE_ID_LENGTH);
+
     /* Add termination flag, close the pipe, and that's it. The character
-    after 'Z' indicates whether continue_transport is now NULL or not.
+    after "Z0" indicates whether continue_transport is now NULL or not.
     A change from non-NULL to NULL indicates a problem with a continuing
     connection. */
 
@@ -5501,16 +5523,14 @@ Returns:      nothing
 */
 
 static void
-do_duplicate_check(address_item **anchor)
+do_duplicate_check(address_item ** anchor)
 {
-address_item *addr;
+address_item * addr;
 while ((addr = *anchor))
   {
-  tree_node *tnode;
+  tree_node * tnode;
   if (testflag(addr, af_pfr))
-    {
-    anchor = &(addr->next);
-    }
+    anchor = &addr->next;
   else if ((tnode = tree_search(tree_duplicates, addr->unique)))
     {
     DEBUG(D_deliver|D_route)
@@ -5523,7 +5543,7 @@ while ((addr = *anchor))
   else
     {
     tree_add_duplicate(addr->unique, addr);
-    anchor = &(addr->next);
+    anchor = &addr->next;
     }
   }
 }
@@ -6433,16 +6453,19 @@ Returns:      When the global variable mua_wrapper is FALSE:
 int
 deliver_message(const uschar * id, BOOL forced, BOOL give_up)
 {
-int i, rc;
-int final_yield = DELIVER_ATTEMPTED_NORMAL;
-time_t now = time(NULL);
-address_item *addr_last = NULL;
-uschar *filter_message = NULL;
-int process_recipients = RECIP_ACCEPT;
-open_db dbblock;
-open_db *dbm_file;
+int i, rc, final_yield, process_recipients;
+time_t now;
+address_item * addr_last;
+uschar * filter_message, * info;
+open_db dbblock, * dbm_file;
 extern int acl_where;
-uschar *info;
+CONTINUED_ID:
+
+final_yield = DELIVER_ATTEMPTED_NORMAL;
+now = time(NULL);
+addr_last = NULL;
+filter_message = NULL;
+process_recipients = RECIP_ACCEPT;
 
 #ifdef MEASURE_TIMING
 report_time_since(&timestamp_startup, US"delivery start");     /* testcase 0022, 2100 */
@@ -8626,6 +8649,16 @@ DEBUG(D_deliver) debug_printf("end delivery of %s\n", id);
 report_time_since(&timestamp_startup, US"delivery end"); /* testcase 0005 */
 #endif
 
+/* If the transport suggested another message to deliver, go round again. */
+
+if (final_yield == DELIVER_ATTEMPTED_NORMAL && *continue_next_id)
+  {
+  tree_duplicates = NULL;      /* discard dups info from old message */
+  id = string_copyn(continue_next_id, MESSAGE_ID_LENGTH);
+  continue_next_id[0] = '\0';
+  goto CONTINUED_ID;
+  }
+
 /* It is unlikely that there will be any cached resources, since they are
 released after routing, and in the delivery subprocesses. However, it's
 possible for an expansion for something afterwards (for example,
index d51644e05b235263b78eeb8fc2a573b8280e32a4..0f9d5b54f4719626f40c9961526aeaba2789891c 100644 (file)
@@ -746,6 +746,7 @@ BOOL    continue_proxy_dane    = FALSE;
 uschar *continue_proxy_sni     = NULL;
 uschar *continue_hostname      = NULL;
 uschar *continue_host_address  = NULL;
+uschar  continue_next_id[MESSAGE_ID_LENGTH +1] = {[0]='\0'};
 int     continue_sequence      = 1;
 uschar *continue_transport     = NULL;
 #ifndef DISABLE_ESMTP_LIMITS
index dc9d384dbf0e94c3ebba262cc87bb8d567214e58..a82d529c014e14b29bd35f3a0d36ffb8e4fdf8d7 100644 (file)
@@ -450,6 +450,7 @@ extern BOOL    continue_proxy_dane;    /* proxied conn is DANE */
 extern uschar *continue_proxy_sni;     /* proxied conn SNI */
 extern uschar *continue_hostname;      /* Host for continued delivery */
 extern uschar *continue_host_address;  /* IP address for ditto */
+extern uschar  continue_next_id[];     /* Next message_id from hintsdb */
 extern int     continue_sequence;      /* Sequence num for continued delivery */
 extern uschar *continue_transport;     /* Transport for continued delivery */
 #ifndef DISABLE_ESMTP_LIMITS
index 84397e9cdd2af1c2def03bb7f77d06c68469f877..741ffd4547eab6f4f7de0fc8e086a9488f1b884a 100644 (file)
@@ -1499,9 +1499,8 @@ Returns:    nothing
 void
 transport_update_waiting(host_item * hostlist, uschar * tpname)
 {
-const uschar *prevname = US"";
-open_db dbblock;
-open_db *dbm_file;
+const uschar * prevname = US"";
+open_db dbblock, * dbp;
 
 if (!is_new_message_id(message_id))
   {
@@ -1514,7 +1513,7 @@ DEBUG(D_transport) debug_printf("updating wait-%s database\n", tpname);
 
 /* Open the database for this transport */
 
-if (!(dbm_file = dbfn_open(string_sprintf("wait-%.200s", tpname),
+if (!(dbp = dbfn_open(string_sprintf("wait-%.200s", tpname),
                      O_RDWR, &dbblock, TRUE, TRUE)))
   return;
 
@@ -1536,7 +1535,7 @@ for (host_item * host = hostlist; host; host = host->next)
 
   /* Look up the host record; if there isn't one, make an empty one. */
 
-  if (!(host_record = dbfn_read(dbm_file, host->name)))
+  if (!(host_record = dbfn_read(dbp, host->name)))
     {
     host_record = store_get(sizeof(dbdata_wait) + MESSAGE_ID_LENGTH, GET_UNTAINTED);
     host_record->count = host_record->sequence = 0;
@@ -1560,9 +1559,9 @@ for (host_item * host = hostlist; host; host = host->next)
        debug_printf_indent("NOTE: old or corrupt message-id found in wait=%.200s"
          " hints DB; deleting records for %s\n", tpname, host->name);
 
-      (void) dbfn_delete(dbm_file, host->name);
+      (void) dbfn_delete(dbp, host->name);
       for (int i = host_record->sequence - 1; i >= 0; i--)
-       (void) dbfn_delete(dbm_file,
+       (void) dbfn_delete(dbp,
                    (sprintf(CS buffer, "%.200s:%d", host->name, i), buffer));
 
       host_record->count = host_record->sequence = 0;
@@ -1579,7 +1578,7 @@ for (host_item * host = hostlist; host; host = host->next)
     {
     dbdata_wait *cont;
     sprintf(CS buffer, "%.200s:%d", host->name, i);
-    if ((cont = dbfn_read(dbm_file, buffer)))
+    if ((cont = dbfn_read(dbp, buffer)))
       {
       int clen = cont->count * MESSAGE_ID_LENGTH;
       for (uschar * s = cont->text; s < cont->text + clen; s += MESSAGE_ID_LENGTH)
@@ -1605,7 +1604,7 @@ for (host_item * host = hostlist; host; host = host->next)
   if (host_record->count >= WAIT_NAME_MAX)
     {
     sprintf(CS buffer, "%.200s:%d", host->name, host_record->sequence);
-    dbfn_write(dbm_file, buffer, host_record, sizeof(dbdata_wait) + host_length);
+    dbfn_write(dbp, buffer, host_record, sizeof(dbdata_wait) + host_length);
 #ifndef DISABLE_QUEUE_RAMP
     if (f.queue_2stage && queue_fast_ramp && !queue_run_in_order)
       queue_notify_daemon(message_id);
@@ -1634,14 +1633,14 @@ for (host_item * host = hostlist; host; host = host->next)
 
   /* Update the database */
 
-  dbfn_write(dbm_file, host->name, host_record, sizeof(dbdata_wait) + host_length);
+  dbfn_write(dbp, host->name, host_record, sizeof(dbdata_wait) + host_length);
   DEBUG(D_transport) debug_printf("added %.*s to queue for %s\n",
                                  MESSAGE_ID_LENGTH, message_id, host->name);
   }
 
 /* All now done */
 
-dbfn_close(dbm_file);
+dbfn_close(dbp);
 }
 
 
@@ -1688,8 +1687,7 @@ transport_check_waiting(const uschar * transport_name, const uschar * hostname,
 {
 dbdata_wait * host_record;
 int host_length;
-open_db dbblock;
-open_db * dbm_file;
+open_db dbblock, * dbp;
 
 int         i;
 struct stat statbuf;
@@ -1715,17 +1713,16 @@ if (local_message_max > 0 && continue_sequence >= local_message_max)
 
 /* Open the waiting information database. */
 
-if (!(dbm_file = dbfn_open(string_sprintf("wait-%.200s", transport_name),
+if (!(dbp = dbfn_open(string_sprintf("wait-%.200s", transport_name),
                          O_RDWR, &dbblock, TRUE, TRUE)))
   goto retfalse;
 
 /* See if there is a record for this host; if not, there's nothing to do. */
 
-if (!(host_record = dbfn_read(dbm_file, hostname)))
+if (!(host_record = dbfn_read(dbp, hostname)))
   {
-  dbfn_close(dbm_file);
   DEBUG(D_transport) debug_printf_indent("no messages waiting for %s\n", hostname);
-  goto retfalse;
+  goto dbclose_false;
   }
 
 /* If the data in the record looks corrupt, just log something and
@@ -1733,10 +1730,9 @@ don't try to use it. */
 
 if (host_record->count > WAIT_NAME_MAX)
   {
-  dbfn_close(dbm_file);
   log_write(0, LOG_MAIN|LOG_PANIC, "smtp-wait database entry for %s has bad "
     "count=%d (max=%d)", hostname, host_record->count, WAIT_NAME_MAX);
-  goto retfalse;
+  goto dbclose_false;
   }
 
 /* Scan the message ids in the record from the end towards the beginning,
@@ -1775,12 +1771,11 @@ while (1)
       DEBUG(D_hints_lookup)
        debug_printf_indent("NOTE: old or corrupt message-id found in wait=%.200s"
          " hints DB; deleting records for %s\n", transport_name, hostname);
-      (void) dbfn_delete(dbm_file, hostname);
+      (void) dbfn_delete(dbp, hostname);
       for (int i = host_record->sequence - 1; i >= 0; i--)
-       (void) dbfn_delete(dbm_file,
+       (void) dbfn_delete(dbp,
                    (sprintf(CS buffer, "%.200s:%d", hostname, i), buffer));
-      dbfn_close(dbm_file);
-      goto retfalse;
+      goto dbclose_false;
       }
     msgq[i].bKeep = TRUE;
 
@@ -1860,20 +1855,20 @@ while (1)
     for (int i = host_record->sequence - 1; i >= 0 && !newr; i--)
       {
       sprintf(CS buffer, "%.200s:%d", hostname, i);
-      newr = dbfn_read(dbm_file, buffer);
+      newr = dbfn_read(dbp, buffer);
       }
 
     /* If no continuation, delete the current and break the loop */
 
     if (!newr)
       {
-      dbfn_delete(dbm_file, hostname);
+      dbfn_delete(dbp, hostname);
       break;
       }
 
     /* Else replace the current with the continuation */
 
-    dbfn_delete(dbm_file, buffer);
+    dbfn_delete(dbp, buffer);
     host_record = newr;
     host_length = host_record->count * MESSAGE_ID_LENGTH;
 
@@ -1889,9 +1884,8 @@ while (1)
 
   if (host_length <= 0)
     {
-    dbfn_close(dbm_file);
     DEBUG(D_transport) debug_printf_indent("waiting messages already delivered\n");
-    goto retfalse;
+    goto dbclose_false;
     }
 
   /* we were not able to find an acceptable message, nor was there a
@@ -1901,8 +1895,7 @@ while (1)
   if (!bContinuation)
     {
     Ustrcpy(new_message_id, message_id);
-    dbfn_close(dbm_file);
-    goto retfalse;
+    goto dbclose_false;
     }
   }            /* we need to process a continuation record */
 
@@ -1914,10 +1907,11 @@ record if required, close the database, and return TRUE. */
 if (host_length > 0)
   {
   host_record->count = host_length/MESSAGE_ID_LENGTH;
-  dbfn_write(dbm_file, hostname, host_record, (int)sizeof(dbdata_wait) + host_length);
+  dbfn_write(dbp, hostname, host_record, (int)sizeof(dbdata_wait) + host_length);
   }
 
-dbfn_close(dbm_file);
+dbfn_close(dbp);
+
 DEBUG(D_transport)
   {
   acl_level--;
@@ -1925,9 +1919,13 @@ DEBUG(D_transport)
   }
 return TRUE;
 
+dbclose_false:
+  dbfn_close(dbp);
+
 retfalse:
-DEBUG(D_transport) {acl_level--; debug_printf("transport_check_waiting: FALSE\n"); }
-return FALSE;
+  DEBUG(D_transport)
+    {acl_level--; debug_printf("transport_check_waiting: FALSE\n"); }
+  return FALSE;
 }
 
 /*************************************************
index a5caf3de67017010f986e52563d3dc52eb0d9c40..8e4480e1295df5502e30835a0b8af0e5f5ef48b3 100644 (file)
@@ -3800,7 +3800,6 @@ int save_errno;
 int rc;
 
 uschar *message = NULL;
-uschar new_message_id[MESSAGE_ID_LENGTH + 1];
 smtp_context * sx = store_get(sizeof(*sx), GET_TAINTED);       /* tainted, for the data buffers */
 BOOL pass_message = FALSE;
 #ifndef DISABLE_ESMTP_LIMITS
@@ -3809,9 +3808,10 @@ BOOL mail_limit = FALSE;
 #ifdef SUPPORT_DANE
 BOOL dane_held;
 #endif
-BOOL tcw_done = FALSE, tcw = FALSE;
+BOOL tcw_done = FALSE, tcw = FALSE, passback_tcw = FALSE;
 
 *message_defer = FALSE;
+continue_next_id[0] = '\0';
 
 memset(sx, 0, sizeof(*sx));
 sx->addrlist = addrlist;
@@ -4132,7 +4132,7 @@ else
         &&
 #endif
            transport_check_waiting(tblock->name, host->name,
-             tblock->connection_max_messages, new_message_id,
+             tblock->connection_max_messages, continue_next_id,
             (oicf)smtp_are_same_identities, (void*)&t_compare);
     if (!tcw)
       {
@@ -4699,9 +4699,9 @@ if (sx->completed_addr && sx->ok && sx->send_quit)
     smtp_compare_t t_compare =
       {.tblock = tblock, .current_sender_address = sender_address};
 
-    if (  sx->first_addr                       /* more addrs for this message */
-       || f.continue_more                      /* more addrs for continued-host */
-       || tcw_done && tcw                      /* more messages for host */
+    if (  sx->first_addr               /* more addrs for this message */
+       || f.continue_more              /* more addrs for continued-host */
+       || tcw_done && tcw              /* more messages for host */
        || (
 #ifndef DISABLE_TLS
             (  tls_out.active.sock < 0  &&  !continue_proxy_cipher
@@ -4710,7 +4710,7 @@ if (sx->completed_addr && sx->ok && sx->send_quit)
          &&
 #endif
             transport_check_waiting(tblock->name, host->name,
-              sx->max_mail, new_message_id,
+              sx->max_mail, continue_next_id,
               (oicf)smtp_are_same_identities, (void*)&t_compare)
        )  )
       {
@@ -4761,6 +4761,20 @@ if (sx->completed_addr && sx->ok && sx->send_quit)
          goto SEND_MESSAGE;
          }
 
+       /* If there is a next-message-id from the wait-transport hintsdb,
+       pretend caller said it has further message for us.  Note that we lose
+       the TLS session (below), and that our caller will pass back the id to
+       the delivery process.  If not, remember to later cancel the
+       next-message-id so that the transport-caller code (in deliver.c) does
+       not report it back up the pipe to the delivery process.
+       XXX It would be feasible to also report the other continue_* with the
+       _id - taking out the exec for the first continued-transport. But the
+       actual conn, and it's fd, is a problem. Maybe replace the transport
+       pipe with a unix-domain socket? */
+
+       if (!f.continue_more && continue_hostname && *continue_next_id)
+         f.continue_more = passback_tcw = TRUE;
+
        /* Unless caller said it already has more messages listed for this host,
        pass the connection on to a new Exim process (below, the call to
        transport_pass_socket).  If the caller has more ready, just return with
@@ -4780,17 +4794,17 @@ if (sx->completed_addr && sx->ok && sx->send_quit)
            been used, which we do under TLSv1.3 for the gsasl SCRAM*PLUS methods.
            But we were always doing it anyway. */
 
-         tls_close(sx->cctx.tls_ctx,
-           sx->send_tlsclose ? TLS_SHUTDOWN_WAIT : TLS_SHUTDOWN_WONLY);
-         sx->send_tlsclose = FALSE;
-         sx->cctx.tls_ctx = NULL;
-         tls_out.active.sock = -1;
-         smtp_peer_options = smtp_peer_options_wrap;
-         sx->ok = !sx->smtps
-           && smtp_write_command(sx, SCMD_FLUSH, "EHLO %s\r\n", sx->helo_data)
-               >= 0
-           && smtp_read_response(sx, sx->buffer, sizeof(sx->buffer),
-                                     '2', ob->command_timeout);
+           tls_close(sx->cctx.tls_ctx,
+             sx->send_tlsclose ? TLS_SHUTDOWN_WAIT : TLS_SHUTDOWN_WONLY);
+           sx->send_tlsclose = FALSE;
+           sx->cctx.tls_ctx = NULL;
+           tls_out.active.sock = -1;
+           smtp_peer_options = smtp_peer_options_wrap;
+           sx->ok = !sx->smtps
+             && smtp_write_command(sx, SCMD_FLUSH, "EHLO %s\r\n", sx->helo_data)
+                 >= 0
+             && smtp_read_response(sx, sx->buffer, sizeof(sx->buffer),
+                                       '2', ob->command_timeout);
 
            if (sx->ok && f.continue_more)
              goto TIDYUP;              /* More addresses for another run */
@@ -4822,7 +4836,7 @@ if (sx->completed_addr && sx->ok && sx->send_quit)
   propagate it from the initial
   */
        if (sx->ok && transport_pass_socket(tblock->name, host->name,
-             host->address, new_message_id, socket_fd
+             host->address, continue_next_id, socket_fd
 #ifndef DISABLE_ESMTP_LIMITS
              , sx->peer_limit_mail, sx->peer_limit_rcpt, sx->peer_limit_rcptdom
 #endif
@@ -5014,15 +5028,17 @@ if (mail_limit && sx->first_addr)
   }
 #endif
 
-return yield;
+OUT:
+  if (!passback_tcw) continue_next_id[0] = '\0';
+  return yield;
 
 TIDYUP:
 #ifdef SUPPORT_DANE
-if (dane_held) for (address_item * a = sx->addrlist->next; a; a = a->next)
-  if (a->transport_return == DANE)
-    a->transport_return = PENDING_DEFER;
+  if (dane_held) for (address_item * a = sx->addrlist->next; a; a = a->next)
+    if (a->transport_return == DANE)
+      a->transport_return = PENDING_DEFER;
 #endif
-return yield;
+  goto OUT;
 }