Pass back more info from transport to delivery process
[exim.git] / src / src / deliver.c
index 2ac56e171af631c755d69bc5703d22a1e75f47fb..b828f8094346ec5b3a13459b79511990642ace3f 100644 (file)
@@ -791,7 +791,7 @@ g = string_append(g, 3, US" [", h->address, US"]");
 if (LOGGING(outgoing_port))
   g = string_fmt_append(g, ":%d", h->port);
 
-if (continue_sequence > 1)             /*XXX this is wrong for a dropped proxyconn.  Would have to pass back from transport */
+if (testflag(addr, af_cont_conn))
   g = string_catn(g, US"*", 1);
 
 #ifdef SUPPORT_SOCKS
@@ -1588,12 +1588,6 @@ if (addr->return_file >= 0 && addr->return_filename)
   (void)close(addr->return_file);
   }
 
-/* Check if the transport notifed continue-conn status explicitly, and
-update our knowlege. */
-
-if (testflag(addr, af_new_conn))       continue_sequence = 1;
-else if (testflag(addr, af_cont_conn)) continue_sequence++;
-
 /* The success case happens only after delivery by a transport. */
 
 if (result == OK)
@@ -2717,8 +2711,7 @@ Returns:     Nothing
 static void
 do_local_deliveries(void)
 {
-open_db dbblock;
-open_db *dbm_file = NULL;
+open_db dbblock, * dbm_file = NULL;
 time_t now = time(NULL);
 
 /* Loop until we have exhausted the supply of local deliveries */
@@ -3320,6 +3313,9 @@ int fd = p->fd;
 uschar *msg = p->msg;
 BOOL done = p->done;
 
+continue_hostname = NULL;
+continue_transport = NULL;
+
 /* Loop through all items, reading from the pipe when necessary. The pipe
 used to be non-blocking. But I do not see a reason for using non-blocking I/O
 here, as the preceding poll() tells us, if data is available for reading.
@@ -3669,6 +3665,11 @@ while (!done)
            }
          else ptr++;
 
+         continue_flags = 0;
+         if (testflag(addr, af_cert_verified)) continue_flags |= CTF_CV;
+         if (testflag(addr, af_dane_verified)) continue_flags |= CTF_DV;
+         if (testflag(addr, af_tls_resume))    continue_flags |= CTF_TR;
+
          /* Finished with this address */
 
          addr = addr->next;
@@ -3686,9 +3687,10 @@ while (!done)
 
     /* Z0 marks the logical end of the data. It is followed by '0' if
     continue_transport was NULL at the end of transporting, otherwise '1'.
-    We need to know when it becomes NULL during a delivery down a passed SMTP
-    channel so that we don't try to pass anything more down it. Of course, for
-    most normal messages it will remain NULL all the time.
+    Those are now for historical reasons only; we always clear the continued
+    channel info, and then set it explicitly if the transport indicates it
+    is still open, because it could differ for each transport we are running in
+    parallel.
 
     Z1 is a suggested message_id to handle next, used during a
     continued-transport sequence. */
@@ -3696,24 +3698,68 @@ while (!done)
     case 'Z':
       switch (*subid)
        {
-       case '0':
-         if (*ptr == '0')
-           {
-           continue_transport = NULL;
-           continue_hostname = NULL;
-           }
+       case '0':                       /* End marker */
          done = TRUE;
          DEBUG(D_deliver) debug_printf("Z0%c item read\n", *ptr);
          break;
-       case '1':
-         if (continue_hostname)
-           {
-           Ustrncpy(continue_next_id, ptr, MESSAGE_ID_LENGTH);
-           continue_sequence++;
-           }
-         DEBUG(D_deliver) debug_printf("continue_next_id: %s%s\n",
-           continue_next_id, continue_hostname ? "" : " (ignored)");
+       case '1':                       /* Suggested continuation message */
+         Ustrncpy(continue_next_id, ptr, MESSAGE_ID_LENGTH);
+         continue_sequence = atoi(CS ptr + MESSAGE_ID_LENGTH + 1);
+         DEBUG(D_deliver) debug_printf("continue_next_id: %s seq %d\n",
+                                       continue_next_id, continue_sequence);
+         break;
+       case '2':                       /* Continued transport, host & addr */
+         {
+         int recvd_fd;
+
+         DEBUG(D_any) if (Ustrcmp(process_purpose, "continued-delivery") != 0)
+           debug_printf("%s becomes continued-delivery\n", process_purpose);
+         process_purpose = US"continued-delivery";
+         continue_transport = string_copy(ptr);        while (*ptr++) ;
+         continue_hostname = string_copy(ptr);         while (*ptr++) ;
+         continue_host_address = string_copy(ptr);     while (*ptr++) ;
+         continue_sequence = atoi(CS ptr);
+
+         dup2((recvd_fd = recv_fd_from_sock(fd)), 0);
+         close(recvd_fd);
+
+         DEBUG(D_deliver)
+           debug_printf("continue: tpt '%s' host '%s' addr '%s' seq %d\n",
+                         continue_transport, continue_hostname,
+                         continue_host_address, continue_sequence);
+         break;
+         }
+       case '3':                               /* Continued conn info */
+         smtp_peer_options = ptr[0];
+         f.smtp_authenticated = ptr[1] & 1;
+         break;
+#ifndef DISABLE_TLS
+       case '4':                               /* Continued TLS info */
+         continue_proxy_cipher = string_copy(ptr);
+         break;
+       case '5':                               /* Continued DANE info */
+       case '6':                               /* Continued TLS info */
+# ifdef SUPPORT_DANE
+         continue_proxy_dane = *subid == '5';
+# endif
+         continue_proxy_sni = *ptr ? string_copy(ptr) : NULL;
+         break;
+#endif
+#ifndef DISABLE_ESMTP_LIMITS
+       case '7':                               /* Continued peer limits */
+         sscanf(CS ptr, "%u %u %u",
+                 &continue_limit_mail, &continue_limit_rcpt,
+                 &continue_limit_rcptdom);
          break;
+#endif
+#ifdef SUPPORT_SOCKS
+       case '8':                               /* Continued proxy info */
+         proxy_local_address = string_copy(ptr);       while (*ptr++) ;
+         proxy_local_port = atoi(CS ptr);              while (*ptr++) ;
+         proxy_external_address = string_copy(ptr);    while (*ptr++) ;
+         proxy_external_port = atoi(CS ptr);
+         break;
+#endif
        }
       break;
 
@@ -4341,6 +4387,7 @@ So look out for the place it gets used.
   transport splitting further by max_rcp.  So we potentially lose some
   parallellism. */
 
+  GET_OPTION("max_rcpt");
   address_count_max = mua_wrapper || Ustrchr(tp->max_addresses, '$')
     ? UNLIMITED_ADDRS : expand_max_rcpt(tp->max_addresses);
 
@@ -4388,8 +4435,9 @@ So look out for the place it gets used.
      && address_count_max < remote_delivery_count/remote_max_parallel
      )
     {
-    int new_max = remote_delivery_count/remote_max_parallel;
-    int message_max = tp->connection_max_messages;
+    int new_max = remote_delivery_count/remote_max_parallel, message_max;
+    GET_OPTION("connection_max_messages");
+    message_max = tp->connection_max_messages;
     if (connection_max_messages >= 0) message_max = connection_max_messages;
     message_max -= continue_sequence - 1;
     if (message_max > 0 && new_max > address_count_max * message_max)
@@ -4482,10 +4530,8 @@ nonmatch domains
   /* Compute the return path, expanding a new one if required. The old one
   must be set first, as it might be referred to in the expansion. */
 
-  if(addr->prop.errors_address)
-    return_path = addr->prop.errors_address;
-  else
-    return_path = sender_address;
+  return_path = addr->prop.errors_address
+               ? addr->prop.errors_address : sender_address;
 
   GET_OPTION("return_path");
   if (tp->return_path)
@@ -4611,19 +4657,30 @@ nonmatch domains
 
       continue;
       }
+    }
 
-    /* Set a flag indicating whether there are further addresses that list
-    the continued host. This tells the transport to leave the channel open,
-    but not to pass it to another delivery process. We'd like to do that
-    for non-continue_transport cases too but the knowlege of which host is
-    connected to is too hard to manage.  Perhaps we need a finer-grain
-    interface to the transport. */
+    /* Once we hit the max number of parallel transports set a flag indicating
+    whether there are further addresses that list the same host. This tells the
+    transport to leave the channel open for us. */
+/*XXX maybe we should *count* possible further's, and set continue_more if
+parmax * tpt-max is exceeded? */
 
-    for (next = addr_remote; next && !f.continue_more; next = next->next)
-      for (host_item * h = next->host_list; h; h = h->next)
-        if (Ustrcmp(h->name, continue_hostname) == 0)
-          { f.continue_more = TRUE; break; }
+  if (parcount+1 >= remote_max_parallel)
+    {
+    host_item * h1 = addr->host_list;
+    if (h1)
+      {
+      const uschar * name = continue_hostname ? continue_hostname : h1->name;
+      for (next = addr_remote; next && !f.continue_more; next = next->next)
+       for (host_item * h = next->host_list; h; h = h->next)
+         if (Ustrcmp(h->name, name) == 0)
+           { f.continue_more = TRUE; break; }
+      }
     }
+  else DEBUG(D_deliver)
+    debug_printf(
+      "not reached parallelism limit (%d/%d) so not setting continue_more\n",
+      parcount+1, remote_max_parallel);
 
   /* The transports set up the process info themselves as they may connect
   to more than one remote machine. They also have to set up the filter
@@ -4636,13 +4693,16 @@ nonmatch domains
   fails, it is probably because the value of remote_max_parallel is so
   large that too many file descriptors for pipes have been created. Arrange
   to wait for a process to finish, and then try again. If we still can't
-  create a pipe when all processes have finished, break the retry loop. */
+  create a pipe when all processes have finished, break the retry loop.
+  Use socketpair() rather than pipe() so we can pass an fd back from the
+  transport process.
+  */
 
   while (!pipe_done)
     {
-    if (pipe(pfd) == 0) pipe_done = TRUE;
-      else if (parcount > 0) parmax = parcount;
-        else break;
+    if (socketpair(AF_UNIX, SOCK_STREAM, 0, pfd) == 0) pipe_done = TRUE;
+    else if (parcount > 0) parmax = parcount;
+    else break;
 
     /* We need to make the reading end of the pipe non-blocking. There are
     two different options for this. Exim is cunningly (I hope!) coded so
@@ -4700,6 +4760,32 @@ all pipes, so I do not see a reason to use non-blocking IO here
 
   search_tidyup();
 
+/*
+A continued-tpt will, in the tpt parent here, call par_reduce for
+the one child. But we are hoping to never do continued-transport...
+SO.... we may have called par_reduce for a single child, above when we'd
+hit the limit on child-count. Possibly multiple times with different
+transports and target hosts.  Does it matter if several return a suggested
+next-id, and we lose all but the last?  Hmm.  Less parallel working would
+happen. Perhaps still do continued-tpt once one has been set? No, that won't
+work for all cases.
+BAH.
+Could take the initial continued-tpt hit, and then do the next-id thing?
+
+do_remote_deliveries par_reduce par_wait par_read_pipe
+*/
+
+  /*XXX what about firsttime? */
+  if (continue_transport && !exim_lockfile_needed())
+    if (!continue_wait_db)
+      {
+      continue_wait_db = dbfn_open_multi(
+                   string_sprintf("wait-%.200s", continue_transport),
+                   O_RDWR,
+                   (open_db *) store_get(sizeof(open_db), GET_UNTAINTED));
+      continue_next_id[0] = '\0';
+      }
+
   if ((pid = exim_fork(US"transport")) == 0)
     {
     int fd = pfd[pipe_write];
@@ -4819,20 +4905,21 @@ all pipes, so I do not see a reason to use non-blocking IO here
     /* Information about what happened to each address. Four item types are
     used: an optional 'X' item first, for TLS information, then an optional "C"
     item for any client-auth info followed by 'R' items for any retry settings,
-    and finally an 'A' item for the remaining data. */
+    and finally an 'A' item for the remaining data. The actual recipient address
+    is not sent but is implicit in the address-chain being handled. */
 
     for(; addr; addr = addr->next)
       {
-      uschar *ptr;
+      uschar * ptr;
 
-      /* The certificate verification status goes into the flags */
+      /* The certificate verification status goes into the flags, in A0 */
       if (tls_out.certificate_verified) setflag(addr, af_cert_verified);
 #ifdef SUPPORT_DANE
       if (tls_out.dane_verified)        setflag(addr, af_dane_verified);
 #endif
-# ifndef DISABLE_TLS_RESUME
+#ifndef DISABLE_TLS_RESUME
       if (tls_out.resumption & RESUME_USED) setflag(addr, af_tls_resume);
-# endif
+#endif
 
       /* Use an X item only if there's something to send */
 #ifndef DISABLE_TLS
@@ -4992,7 +5079,13 @@ all pipes, so I do not see a reason to use non-blocking IO here
 #endif
 
       /* The rest of the information goes in an 'A0' item. */
-
+#ifdef notdef
+      DEBUG(D_deliver)
+       debug_printf("%s %s for MAIL\n",
+         addr->special_action == '=' ? "initial RCPT"
+         : addr->special_action == '-' ? "additional RCPT" : "?",
+         addr->address);
+#endif
       sprintf(CS big_buffer, "%c%c", addr->transport_return, addr->special_action);
       ptr = big_buffer + 2;
       memcpy(ptr, &addr->basic_errno, sizeof(addr->basic_errno));
@@ -5032,15 +5125,73 @@ all pipes, so I do not see a reason to use non-blocking IO here
     if (LOGGING(incoming_interface) && sending_ip_address)
 #endif
       {
-      uschar * ptr;
-      ptr = big_buffer + sprintf(CS big_buffer, "%.128s", sending_ip_address) + 1;
+      uschar * ptr = big_buffer
+                   + sprintf(CS big_buffer, "%.128s", sending_ip_address) + 1;
       ptr += sprintf(CS ptr, "%d", sending_port) + 1;
       rmt_dlv_checked_write(fd, 'I', '0', big_buffer, ptr - big_buffer);
       }
 
-    /* Continuation message-id */
+    /* Continuation message-id, if a continuation is for that reason,
+    and the next sequence number (MAIL FROM count) for the connection. */
+
     if (*continue_next_id)
-      rmt_dlv_checked_write(fd, 'Z', '1', continue_next_id, MESSAGE_ID_LENGTH);
+      rmt_dlv_checked_write(fd, 'Z', '1', big_buffer,
+         sprintf(CS big_buffer, "%.*s %u",
+             MESSAGE_ID_LENGTH, continue_next_id, continue_sequence+1) + 1);
+
+    /* Connection details, only on the first suggested continuation for
+    wait-db ones, but for all continue-more ones (though any after the
+    delivery proc has the info are pointless). */
+
+    if (continue_hostname)
+      {
+       {
+       uschar * ptr = big_buffer;
+       ptr += sprintf(CS ptr, "%.128s", continue_transport) + 1;
+       ptr += sprintf(CS ptr, "%.128s", continue_hostname) + 1;
+       ptr += sprintf(CS ptr, "%.128s", continue_host_address) + 1;
+       ptr += sprintf(CS ptr, "%u", continue_sequence+1) + 1;
+       rmt_dlv_checked_write(fd, 'Z', '2', big_buffer, ptr - big_buffer);
+       send_fd_over_socket(fd, continue_fd);
+       }
+
+      big_buffer[0] = smtp_peer_options;
+      big_buffer[1] = f.smtp_authenticated ? 1 : 0;
+      rmt_dlv_checked_write(fd, 'Z', '3', big_buffer, 2);
+
+      if (tls_out.active.sock >= 0 || continue_proxy_cipher)
+       rmt_dlv_checked_write(fd, 'Z', '4', big_buffer,
+             sprintf(CS big_buffer, "%.128s", continue_proxy_cipher) + 1);
+
+      if (tls_out.sni)
+       rmt_dlv_checked_write(fd, 'Z',
+#ifdef SUPPORT_DANE
+         tls_out.dane_verified ? '5' : '6',
+#else
+         '6',
+#endif
+         tls_out.sni, Ustrlen(tls_out.sni)+1);
+
+#ifndef DISABLE_ESMTP_LIMITS
+      if (continue_limit_mail || continue_limit_rcpt || continue_limit_rcptdom)
+       rmt_dlv_checked_write(fd, 'Z', '7', big_buffer,
+             sprintf(CS big_buffer, "%u %u %u",
+                 continue_limit_mail, continue_limit_rcpt,
+                 continue_limit_rcptdom) + 1);
+#endif
+
+#ifdef SUPPORT_SOCKS
+      if (proxy_session)
+       {
+       uschar * ptr = big_buffer;
+       ptr += sprintf(CS ptr, "%.128s", proxy_local_address) + 1;
+       ptr += sprintf(CS ptr, "%u", proxy_local_port) + 1;
+       ptr += sprintf(CS ptr, "%.128s", proxy_external_address) + 1;
+       ptr += sprintf(CS ptr, "%u", proxy_external_port) + 1;
+       rmt_dlv_checked_write(fd, 'Z', '8', big_buffer, ptr - big_buffer);
+       }
+#endif
+      }
 
     /* Add termination flag, close the pipe, and that's it. The character
     after "Z0" indicates whether continue_transport is now NULL or not.
@@ -5103,14 +5254,20 @@ all pipes, so I do not see a reason to use non-blocking IO here
   (continue_transport gets set to NULL) before we consider any other addresses
   in this message. */
 
-  if (continue_transport) par_reduce(0, fallback);
+  if (continue_transport)
+    {
+    par_reduce(0, fallback);
+    if (!continue_next_id && continue_wait_db)
+       { dbfn_close_multi(continue_wait_db); continue_wait_db = NULL; }
+    }
 
   /* Otherwise, if we are running in the test harness, wait a bit, to let the
   newly created process get going before we create another process. This should
   ensure repeatability in the tests. Wait long enough for most cases to complete
   the transport. */
 
-  else testharness_pause_ms(600);
+  else
+    testharness_pause_ms(600);
 
   continue;
 
@@ -5300,6 +5457,7 @@ if (continue_transport)
     if (Ustrcmp(t->name, continue_transport) == 0)
       {
       if (t->info->closedown) (t->info->closedown)(t);
+      continue_transport = NULL;
       break;
       }
 return DELIVER_NOT_ATTEMPTED;
@@ -6462,8 +6620,8 @@ address_item * addr_last;
 uschar * filter_message, * info;
 open_db dbblock, * dbm_file;
 extern int acl_where;
-CONTINUED_ID:
 
+CONTINUED_ID:
 final_yield = DELIVER_ATTEMPTED_NORMAL;
 now = time(NULL);
 addr_last = NULL;
@@ -6543,7 +6701,7 @@ opening the data file, message_subdir gets set. */
 if ((deliver_datafile = spool_open_datafile(id)) < 0)
   return continue_closedown();  /* yields DELIVER_NOT_ATTEMPTED */
 
-/* tHe value of message_size at this point has been set to the data length,
+/* The value of message_size at this point has been set to the data length,
 plus one for the blank line that notionally precedes the data. */
 
 /* Now read the contents of the header file, which will set up the headers in
@@ -7033,10 +7191,7 @@ else if (system_filter && process_recipients != RECIP_FAIL_TIMEOUT)
           transport_instance *tp;
           for (tp = transports; tp; tp = tp->next)
             if (Ustrcmp(tp->name, tpname) == 0)
-              {
-              p->transport = tp;
-              break;
-              }
+              { p->transport = tp; break; }
           if (!tp)
             p->message = string_sprintf("failed to find \"%s\" transport "
               "for system filter delivery", tpname);
@@ -7271,11 +7426,36 @@ while (addr_new)           /* Loop until all addresses dealt with */
   address_item * addr, * parent;
 
   /* Failure to open the retry database is treated the same as if it does
-  not exist. In both cases, dbm_file is NULL. */
+  not exist. In both cases, dbm_file is NULL.  For the first stage of a 2-phase
+  queue run don't bother checking domain- or address-retry info; they will take
+  effect on the second stage. */
 
-  if (!(dbm_file = dbfn_open(US"retry", O_RDONLY, &dbblock, FALSE, TRUE)))
-    DEBUG(D_deliver|D_retry|D_route|D_hints_lookup)
-      debug_printf("no retry data available\n");
+  if (f.queue_2stage)
+    dbm_file = NULL;
+  else
+    {
+    /* If we have transaction-capable hintsdbs, open the retry db without
+    locking, and leave open for the transport process and for subsequent
+    deliveries. If the open fails, tag that explicitly for the transport but
+    retry the open next time around, in case it was created in the interim. */
+
+    if (continue_retry_db == (open_db *)-1)
+      continue_retry_db = NULL;
+
+    if (continue_retry_db)
+      dbm_file = continue_retry_db;
+    else if (!exim_lockfile_needed() && continue_transport)
+      {
+      dbm_file = dbfn_open_multi(US"retry", O_RDONLY, &dbblock);
+      continue_retry_db = dbm_file ? dbm_file : (open_db *)-1;
+      }
+    else
+      dbm_file = dbfn_open(US"retry", O_RDONLY, &dbblock, FALSE, TRUE);
+
+    if (!dbm_file)
+      DEBUG(D_deliver|D_retry|D_route|D_hints_lookup)
+       debug_printf("no retry data available\n");
+    }
 
   /* Scan the current batch of new addresses, to handle pipes, files and
   autoreplies, and determine which others are ready for routing. */
@@ -7683,7 +7863,8 @@ while (addr_new)           /* Loop until all addresses dealt with */
   /* The database is closed while routing is actually happening. Requests to
   update it are put on a chain and all processed together at the end. */
 
-  if (dbm_file) dbfn_close(dbm_file);
+  if (dbm_file && !continue_retry_db)
+    { dbfn_close(dbm_file); dbm_file = NULL; }
 
   /* If queue_domains is set, we don't even want to try routing addresses in
   those domains. During queue runs, queue_domains is forced to be unset.
@@ -7852,8 +8033,10 @@ while (addr_new)           /* Loop until all addresses dealt with */
       }
     }  /* Continue with routing the next address. */
   }    /* Loop to process any child addresses that the routers created, and
-          any rerouted addresses that got put back on the new chain. */
+       any rerouted addresses that got put back on the new chain. */
 
+if (dbm_file)          /* Can only be continue_retry_db */
+  { dbfn_close_multi(continue_retry_db); continue_retry_db = dbm_file = NULL; }
 
 /* Debugging: show the results of the routing */