Provide readn() as a wrapper around read()
[exim.git] / src / src / deliver.c
index 94dabab02a6eaed5188b1b9ffa5d04b72734a199..a8012804d16913809e0dd8289333c289f2682a63 100644 (file)
@@ -3270,6 +3270,9 @@ small items (less than PIPE_BUF, which seems to be at least 512 in any Unix and
 often bigger) so even if we are reading while the subprocess is still going, we
 should never have only a partial item in the buffer.
 
+hs12: This assumption is not true anymore, since we got quit large items (certificate
+information and such)
+
 Argument:
   poffset     the offset of the parlist item
   eop         TRUE if the process has completed
@@ -3288,138 +3291,97 @@ address_item *addrlist = p->addrlist;
 address_item *addr = p->addr;
 pid_t pid = p->pid;
 int fd = p->fd;
-uschar *endptr = big_buffer;
-uschar *ptr = endptr;
+int required = PIPE_HEADER_SIZE; /* size including id, subid and length */
+
 uschar *msg = p->msg;
 BOOL done = p->done;
 BOOL finished = FALSE;
-/* minimum size to read is header size including id, subid and length */
-int required = PIPE_HEADER_SIZE;
 
 /* Loop through all items, reading from the pipe when necessary. The pipe
-is set up to be non-blocking, but there are two different Unix mechanisms in
-use. Exim uses O_NONBLOCK if it is defined. This returns 0 for end of file,
-and EAGAIN for no more data. If O_NONBLOCK is not defined, Exim uses O_NDELAY,
-which returns 0 for both end of file and no more data. We distinguish the
-two cases by taking 0 as end of file only when we know the process has
-completed.
-
-Each separate item is written to the pipe in a single write(), and as they are
-all short items, the writes will all be atomic and we should never find
-ourselves in the position of having read an incomplete item. "Short" in this
-case can mean up to about 1K in the case when there is a long error message
-associated with an address.
-
-write(3) [Linux] says that atomic writes are not interleaved with each other.
-Not more.
+used to be non-blocking. But I do not see a reason for using non-blocking I/O
+here, as the preceding select() tells us, if data is available for reading.
+
+A read() on a "selected" handle should never block, but(!) it may return
+less data then we expected. (The buffer size we pass to read() shouldn't be
+understood as a "request", but as a "limit".)
+
+Each separate item is written to the pipe in a timely manner. But, especially for
+larger items, the read(2) may already return partial data from the write(2).
+
+The write is atomic mostly (depending on the amount written), but atomic does
+not imply "all or noting", it just is "not intermixed" with other writes on the
+same channel (pipe).
 
 */
 
 DEBUG(D_deliver) debug_printf("reading pipe for subprocess %d (%s)\n",
-  (int)p->pid, eop? "ended" : "not ended");
+  (int)p->pid, eop? "ended" : "not ended yet");
 
 while (!done)
   {
   retry_item *r, **rp;
-  int remaining = endptr - ptr;
-  uschar header[PIPE_HEADER_SIZE + 1];
-  uschar id, subid;
-  uschar *endc;
-
-  /* Read (first time) or top up the chars in the buffer if necessary.
-  There will be only one read if we get all the available data (i.e. don't
-  fill the buffer completely). */
-
-  if (remaining < required && !finished)
-    {
-    int len;
-    int available = big_buffer_size - remaining;
-
-    if (remaining > 0) memmove(big_buffer, ptr, remaining);
-
-    ptr = big_buffer;
-    endptr = big_buffer + remaining;
-    len = read(fd, endptr, available);
-
-    DEBUG(D_deliver) debug_printf("read() yielded %d\n", len);
-
-    /* If the result is EAGAIN and the process is not complete, just
-    stop reading any more and process what we have already. */
-
-    if (len < 0)
-      {
-      if (!eop && errno == EAGAIN) len = 0; else
-        {
-        msg = string_sprintf("failed to read pipe from transport process "
-          "%d for transport %s: %s", pid, addr->transport->driver_name,
-          strerror(errno));
-        break;
-        }
-      }
-
-    /* If the length is zero (eof or no-more-data), just process what we
-    already have. Note that if the process is still running and we have
-    read all the data in the pipe (but less that "available") then we
-    won't read any more, as "finished" will get set. */
-
-    endptr += len;
-    remaining += len;
-    finished = len != available;
+  uschar pipeheader[PIPE_HEADER_SIZE+1];
+  uschar *id = &pipeheader[0];
+  uschar *subid = &pipeheader[1];
+  uschar *ptr = big_buffer;
+  size_t required = PIPE_HEADER_SIZE; /* first the pipehaeder, later the data */
+  ssize_t got;
+
+  DEBUG(D_deliver) debug_printf("expect %d bytes (pipeheader) from transport process %d\n", required, pid);
+
+  /* We require(!) all the PIPE_HEADER_SIZE bytes here, as we know,
+  they're written in a timely manner, so waiting for the write shouldn't hurt a lot.
+  If we get less, we can assume the subprocess do be done and do not expect any further
+  information from it. */
+  got = readn(fd, pipeheader, required);
+  if (got != required)
+    {
+      msg = string_sprintf("got %d of %d bytes (pipeheader) "
+        "from transport process %d for transport %s",
+        got, PIPE_HEADER_SIZE, pid, addr->transport->driver_name);
+      done = TRUE;
+      break;
     }
 
-  /* If we are at the end of the available data, exit the loop. */
-  if (ptr >= endptr) break;
+  pipeheader[PIPE_HEADER_SIZE] = '\0';
+  DEBUG(D_deliver)
+    debug_printf("got %d bytes (pipeheader) from transport process %d\n", got, pid);
 
-  /* copy and read header */
-  memcpy(header, ptr, PIPE_HEADER_SIZE);
-  header[PIPE_HEADER_SIZE] = '\0';
-  id = header[0];
-  subid = header[1];
-  required = Ustrtol(header + 2, &endc, 10) + PIPE_HEADER_SIZE;     /* header + data */
+  {
+  /* If we can't decode the pipeheader, the subprocess seems to have a
+  problem, we do not expect any furher information from it. */
+  char *endc;
+  required = strtol(pipeheader+2, &endc, 10);
   if (*endc)
     {
-    msg = string_sprintf("failed to read pipe from transport process "
-      "%d for transport %s: error reading size from header", pid, addr->transport->driver_name);
+    msg = string_sprintf("failed to read pipe "
+      "from transport process %d for transport %s: error decoding size from header",
+      pid, addr->transport->driver_name);
     done = TRUE;
     break;
     }
+  }
 
   DEBUG(D_deliver)
-    debug_printf("header read  id:%c,subid:%c,size:%s,required:%d,remaining:%d,finished:%d\n",
-                    id, subid, header+2, required, remaining, finished);
-
-  /* is there room for the dataset we want to read ? */
-  if (required > big_buffer_size - PIPE_HEADER_SIZE)
-    {
-    msg = string_sprintf("failed to read pipe from transport process "
-      "%d for transport %s: big_buffer too small! required size=%d buffer size=%d", pid, addr->transport->driver_name,
-      required, big_buffer_size - PIPE_HEADER_SIZE);
-    done = TRUE;
-    break;
-    }
+    debug_printf("expect %d bytes (pipedata) from transport process %d\n", required, pid);
 
-  /* We wrote all datasets with atomic write() calls.  Remaining < required only
-  happens if big_buffer was too small to get all available data from pipe;
-  finished has to be false as well. */
-
-  if (remaining < required)
+  /* Same as above, the transport process will write the bytes announced
+  in a timely manner, so we can just wait for the bytes, getting less than expected
+  is considered a problem of the subprocess, we do not expect anything else from it. */
+  got = readn(fd, big_buffer, required);
+  if (got != required)
     {
-    if (!finished)
-      continue;
-    msg = string_sprintf("failed to read pipe from transport process "
-      "%d for transport %s: required size=%d > remaining size=%d and finished=true",
-      pid, addr->transport->driver_name, required, remaining);
-    done = TRUE;
-    break;
+      msg = string_sprintf("got only %d of %d bytes (pipedata) "
+        "from transport process %d for transport %s",
+        got, required, pid, addr->transport->driver_name);
+      done = TRUE;
+      break;
     }
 
-  /* Step past the header */
-  ptr += PIPE_HEADER_SIZE;
-
   /* Handle each possible type of item, assuming the complete item is
   available in store. */
 
-  switch (id)
+  switch (*id)
     {
     /* Host items exist only if any hosts were marked unusable. Match
     up by checking the IP address. */
@@ -3513,7 +3475,7 @@ while (!done)
 #ifdef SUPPORT_TLS
     case 'X':
       if (!addr) goto ADDR_MISMATCH;          /* Below, in 'A' handler */
-      switch (subid)
+      switch (*subid)
        {
        case '1':
          addr->cipher = NULL;
@@ -3551,7 +3513,7 @@ while (!done)
 #endif /*SUPPORT_TLS*/
 
     case 'C':  /* client authenticator information */
-      switch (subid)
+      switch (*subid)
        {
        case '1': addr->authenticator = *ptr ? string_copy(ptr) : NULL; break;
        case '2': addr->auth_id = *ptr ? string_copy(ptr) : NULL;       break;
@@ -3592,7 +3554,7 @@ while (!done)
        break;
        }
 
-      switch (subid)
+      switch (*subid)
        {
   #ifdef SUPPORT_SOCKS
        case '2':       /* proxy information; must arrive before A0 and applies to that addr XXX oops*/
@@ -3703,7 +3665,7 @@ call the function again when the process finishes. */
 p->done = done;
 
 /* If the process hadn't finished, and we haven't seen the end of the data
-or suffered a disaster, update the rest of the state, and return FALSE to
+or if we suffered a disaster, update the rest of the state, and return FALSE to
 indicate "not finished". */
 
 if (!eop && !done)
@@ -3736,6 +3698,7 @@ if (msg)
     addr->transport_return = DEFER;
     addr->special_action = SPECIAL_FREEZE;
     addr->message = msg;
+    log_write(0, LOG_MAIN|LOG_PANIC, "Delivery status for %s: %s\n", addr->address, addr->message);
     }
 
 /* Return TRUE to indicate we have got all we need from this process, even
@@ -4133,10 +4096,8 @@ while (parcount > max)
   }
 }
 
-
-
 static void
-rmt_dlv_checked_write(int fd, char id, char subid, void * buf, int size)
+rmt_dlv_checked_write(int fd, char id, char subid, void * buf, ssize_t size)
 {
 uschar pipe_header[PIPE_HEADER_SIZE+1];
 size_t total_len = PIPE_HEADER_SIZE + size;
@@ -4146,7 +4107,7 @@ struct iovec iov[2] = {
   { buf, size }                       /* *the* data */
 };
 
-int ret;
+ssize_t ret;
 
 /* we assume that size can't get larger then BIG_BUFFER_SIZE which currently is set to 16k */
 /* complain to log if someone tries with buffer sizes we can't handle*/
@@ -4171,7 +4132,7 @@ DEBUG(D_deliver) debug_printf("header write id:%c,subid:%c,size:%d,final:%s\n",
 
 if ((ret = writev(fd, iov, 2)) != total_len)
   log_write(0, LOG_MAIN|LOG_PANIC_DIE, "Failed writing transport result to pipe (%d of %d bytes): %s",
-    ret == -1 ? 0 : ret, total_len,
+    ret, total_len,
     ret == -1 ? strerror(errno) : "short write");
 }
 
@@ -4670,10 +4631,12 @@ all pipes, so I do not see a reason to use non-blocking IO here
 
   search_tidyup();
 
+
   if ((pid = fork()) == 0)
     {
     int fd = pfd[pipe_write];
     host_item *h;
+    DEBUG(D_deliver) debug_selector |= D_pid;  // hs12
 
     /* Setting this global in the subprocess means we need never clear it */
     transport_name = tp->name;
@@ -5007,7 +4970,7 @@ all pipes, so I do not see a reason to use non-blocking IO here
 
   /* Fork failed; defer with error message */
 
-  if (pid < 0)
+  if (pid == -1)
     {
     (void)close(pfd[pipe_read]);
     panicmsg = string_sprintf("fork failed for remote delivery to %s: %s",