Move notifier socket to general availability
[exim.git] / src / src / queue.c
index 89ac87f94f02ba710868f157c6be9d4f2ef53598..5f75470e0358066f05ff6d6b246d69887fcd0065 100644 (file)
 
 
 
-/* Routines with knowledge of spool layout */
-
-#ifndef COMPILE_UTILITY
-static void
-spool_pname_buf(uschar * buf, int len)
-{
-snprintf(CS buf, len, "%s/%s/input", spool_directory, queue_name);
-}
-
-uschar *
-spool_dname(const uschar * purpose, uschar * subdir)
-{
-return string_sprintf("%s/%s/%s/%s",
-       spool_directory, queue_name, purpose, subdir);
-}
-#endif
-
-uschar *
-spool_sname(const uschar * purpose, uschar * subdir)
-{
-return string_sprintf("%s%s%s%s%s",
-                   queue_name, *queue_name ? "/" : "",
-                   purpose,
-                   *subdir ? "/" : "", subdir);
-}
-
-uschar *
-spool_fname(const uschar * purpose, const uschar * subdir, const uschar * fname,
-               const uschar * suffix)
-{
-return string_sprintf("%s/%s/%s/%s/%s%s",
-       spool_directory, queue_name, purpose, subdir, fname, suffix);
-}
 
 
 
@@ -239,7 +206,7 @@ for (; i <= *subcount; i++)
         Ustrcmp(name + SPOOL_NAME_LENGTH - 2, "-H") == 0)
       {
       queue_filename *next =
-        store_get(sizeof(queue_filename) + Ustrlen(name));
+        store_get(sizeof(queue_filename) + Ustrlen(name), is_tainted(name));
       Ustrcpy(next->text, name);
       next->dir_uschar = subdirchar;
 
@@ -379,6 +346,11 @@ const pcre *selectstring_regex_sender = NULL;
 uschar *log_detail = NULL;
 int subcount = 0;
 uschar subdirs[64];
+pid_t qpid[4] = {0};   /* Parallelism factor for q2stage 1st phase */
+
+#ifdef MEASURE_TIMING
+report_time_since(&timestamp_startup, US"queue_run start");
+#endif
 
 /* Cancel any specific queue domains. Turn off the flag that causes SMTP
 deliveries not to happen, unless doing a 2-stage queue run, when the SMTP flag
@@ -457,7 +429,7 @@ for (int i = queue_run_in_order ? -1 : 0;
      i <= (queue_run_in_order ? -1 : subcount);
      i++)
   {
-  void *reset_point1 = store_get(0);
+  rmark reset_point1 = store_mark();
 
   DEBUG(D_queue_run)
     {
@@ -497,18 +469,40 @@ for (int i = queue_run_in_order ? -1 : 0;
           (double)load_average/1000.0,
           (double)deliver_queue_load_max/1000.0);
 
+    /* If initial of a 2-phase run, maintain a set of child procs
+    to get disk parallelism */
+
+    if (f.queue_2stage && !queue_run_in_order)
+      {
+      int i;
+      if (qpid[nelem(qpid) - 1])
+       {
+       DEBUG(D_queue_run) debug_printf("q2stage waiting for child\n");
+       waitpid(qpid[0], NULL, 0);
+       DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[0]);
+       for (i = 0; i < nelem(qpid) - 1; i++) qpid[i] = qpid[i+1];
+       qpid[i] = 0;
+       }
+      else
+       for (i = 0; qpid[i]; ) i++;
+      DEBUG(D_queue_run) debug_printf("q2stage forking\n");
+      if ((qpid[i] = fork()))
+       continue;       /* parent loops around */
+      DEBUG(D_queue_run) debug_printf("q2stage child\n");
+      }
+
     /* Skip this message unless it's within the ID limits */
 
     if (stop_id && Ustrncmp(fq->text, stop_id, MESSAGE_ID_LENGTH) > 0)
-      continue;
+      goto go_around;
     if (start_id && Ustrncmp(fq->text, start_id, MESSAGE_ID_LENGTH) < 0)
-      continue;
+      goto go_around;
 
     /* Check that the message still exists */
 
     message_subdir[0] = fq->dir_uschar;
     if (Ustat(spool_fname(US"input", message_subdir, fq->text, US""), &statbuf) < 0)
-      continue;
+      goto go_around;
 
     /* There are some tests that require the reading of the header file. Ensure
     the store used is scavenged afterwards so that this process doesn't keep
@@ -521,14 +515,14 @@ for (int i = queue_run_in_order ? -1 : 0;
       {
       BOOL wanted = TRUE;
       BOOL orig_dont_deliver = f.dont_deliver;
-      void *reset_point2 = store_get(0);
+      rmark reset_point2 = store_mark();
 
       /* Restore the original setting of dont_deliver after reading the header,
       so that a setting for a particular message doesn't force it for any that
       follow. If the message is chosen for delivery, the header is read again
       in the deliver_message() function, in a subprocess. */
 
-      if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) continue;
+      if (spool_read_header(fq->text, FALSE, TRUE) != spool_read_OK) goto go_around;
       f.dont_deliver = orig_dont_deliver;
 
       /* Now decide if we want to deliver this message. As we have read the
@@ -600,7 +594,7 @@ for (int i = queue_run_in_order ? -1 : 0;
 
       spool_clear_header_globals();
       store_reset(reset_point2);
-      if (!wanted) continue;      /* With next message */
+      if (!wanted) goto go_around;      /* With next message */
       }
 
     /* OK, got a message we want to deliver. Create a pipe which will
@@ -641,13 +635,17 @@ for (int i = queue_run_in_order ? -1 : 0;
 
     set_process_info("running queue: %s", fq->text);
     fq->text[SPOOL_NAME_LENGTH-2] = 0;
+#ifdef MEASURE_TIMING
+    report_time_since(&timestamp_startup, US"queue msg selected");
+#endif
+
     if ((pid = fork()) == 0)
       {
       int rc;
-      if (f.running_in_test_harness) millisleep(100);
+      testharness_pause_ms(100);
       (void)close(pfd[pipe_read]);
       rc = deliver_message(fq->text, force_delivery, FALSE);
-      _exit(rc == DELIVER_NOT_ATTEMPTED);
+      exim_underbar_exit(rc == DELIVER_NOT_ATTEMPTED);
       }
     if (pid < 0)
       log_write(0, LOG_MAIN|LOG_PANIC_DIE, "fork of delivery process from "
@@ -663,11 +661,11 @@ for (int i = queue_run_in_order ? -1 : 0;
     /* A zero return means a delivery was attempted; turn off the force flag
     for any subsequent calls unless queue_force is set. */
 
-    if ((status & 0xffff) == 0) force_delivery = f.queue_run_force;
+    if (!(status & 0xffff)) force_delivery = f.queue_run_force;
 
     /* If the process crashed, tell somebody */
 
-    else if ((status & 0x00ff) != 0)
+    else if (status & 0x00ff)
       log_write(0, LOG_MAIN|LOG_PANIC,
         "queue run: process %d crashed with signal %d while delivering %s",
         (int)pid, status & 0x00ff, fq->text);
@@ -679,19 +677,32 @@ for (int i = queue_run_in_order ? -1 : 0;
 
     set_process_info("running queue: waiting for children of %d", pid);
     if ((status = read(pfd[pipe_read], buffer, sizeof(buffer))) != 0)
-      log_write(0, LOG_MAIN|LOG_PANIC, "queue run: %s on pipe",
-               status > 0 ? "unexpected data" : "error");
+      log_write(0, LOG_MAIN|LOG_PANIC, status > 0 ?
+       "queue run: unexpected data on pipe" : "queue run: error on pipe: %s",
+       strerror(errno));
     (void)close(pfd[pipe_read]);
     set_process_info("running queue");
 
+    /* If initial of a 2-phase run, we are a child - so just exit */
+    if (f.queue_2stage && !queue_run_in_order)
+      exim_exit(EXIT_SUCCESS, US"2-phase child");
+
     /* If we are in the test harness, and this is not the first of a 2-stage
     queue run, update fudged queue times. */
 
     if (f.running_in_test_harness && !f.queue_2stage)
       {
-      uschar *fqtnext = Ustrchr(fudged_queue_times, '/');
-      if (fqtnext != NULL) fudged_queue_times = fqtnext + 1;
+      uschar * fqtnext = Ustrchr(fudged_queue_times, '/');
+      if (fqtnext) fudged_queue_times = fqtnext + 1;
       }
+
+
+    continue;
+
+  go_around:
+    /* If initial of a 2-phase run, we are a child - so just exit */
+    if (f.queue_2stage && !queue_run_in_order)
+      exim_exit(EXIT_SUCCESS, US"2-phase child");
     }                                  /* End loop for list of messages */
 
   tree_nonrecipients = NULL;
@@ -719,6 +730,19 @@ turned off. */
 
 if (f.queue_2stage)
   {
+
+  /* wait for last children */
+  for (int i = 0; i < nelem(qpid); i++)
+    if (qpid[i])
+      {
+      DEBUG(D_queue_run) debug_printf("q2stage reaped child %d\n", (int)qpid[i]);
+      waitpid(qpid[i], NULL, 0);
+      }
+    else break;
+
+#ifdef MEASURE_TIMING
+  report_time_since(&timestamp_startup, US"queue_run 1st phase done");
+#endif
   f.queue_2stage = FALSE;
   queue_run(start_id, stop_id, TRUE);
   }
@@ -816,7 +840,7 @@ queue_list(int option, uschar **list, int count)
 {
 int subcount;
 int now = (int)time(NULL);
-void *reset_point;
+rmark reset_point;
 queue_filename * qf = NULL;
 uschar subdirs[64];
 
@@ -828,7 +852,7 @@ if (count > 0)
   for (int i = 0; i < count; i++)
     {
     queue_filename *next =
-      store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2);
+      store_get(sizeof(queue_filename) + Ustrlen(list[i]) + 2, is_tainted(list[i]));
     sprintf(CS next->text, "%s-H", list[i]);
     next->dir_uschar = '*';
     next->next = NULL;
@@ -851,8 +875,8 @@ if (option >= 8) option -= 8;
 /* Now scan the chain and print information, resetting store used
 each time. */
 
-for (reset_point = store_get(0);
-    qf;
+for (;
+    qf && (reset_point = store_mark());
     spool_clear_header_globals(), store_reset(reset_point), qf = qf->next
     )
   {
@@ -1034,7 +1058,7 @@ if (action >= MSG_SHOW_BODY)
 
   for (int i = 0; i < 2; i++)
     {
-    message_subdir[0] = split_spool_directory == (i == 0) ? id[5] : 0;
+    set_subdir_str(message_subdir, id, i);
     if ((fd = Uopen(spool_fname(subdirectory, message_subdir, id, suffix),
                    O_RDONLY, 0)) >= 0)
       break;
@@ -1252,7 +1276,7 @@ switch(action)
     if (removed)
       {
 #ifndef DISABLE_EVENT
-      for (int i = 0; i < recipients_count; i++)
+      if (event_action) for (int i = 0; i < recipients_count; i++)
        {
        tree_node *delivered =
          tree_search(tree_nonrecipients, recipients_list[i].address);
@@ -1290,6 +1314,14 @@ switch(action)
     }
 
 
+  case MSG_SETQUEUE:
+    /* The global "queue_name_dest" is used as destination, "queue_name"
+    as source */
+
+    spool_move_message(id, message_subdir, US"", US"");
+    break;
+
+
   case MSG_MARK_ALL_DELIVERED:
   for (int i = 0; i < recipients_count; i++)
     tree_add_nonrecipient(recipients_list[i].address);
@@ -1437,34 +1469,60 @@ queue_check_only(void)
 {
 int sep = 0;
 struct stat statbuf;
-const uschar *s;
-uschar *ss;
-uschar buffer[1024];
+const uschar * s = queue_only_file;
+uschar * ss;
 
-if (queue_only_file == NULL) return;
-
-s = queue_only_file;
-while ((ss = string_nextinlist(&s, &sep, buffer, sizeof(buffer))) != NULL)
-  {
-  if (Ustrncmp(ss, "smtp", 4) == 0)
-    {
-    ss += 4;
-    if (Ustat(ss, &statbuf) == 0)
+if (s)
+  while ((ss = string_nextinlist(&s, &sep, NULL, 0)))
+    if (Ustrncmp(ss, "smtp", 4) == 0)
       {
-      f.queue_smtp = TRUE;
-      DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss);
-      }
-    }
-  else
-    {
-    if (Ustat(ss, &statbuf) == 0)
-      {
-      queue_only = TRUE;
-      DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss);
+      ss += 4;
+      if (Ustat(ss, &statbuf) == 0)
+       {
+       f.queue_smtp = TRUE;
+       DEBUG(D_receive) debug_printf("queue_smtp set because %s exists\n", ss);
+       }
       }
-    }
+    else
+      if (Ustat(ss, &statbuf) == 0)
+       {
+       queue_only = TRUE;
+       DEBUG(D_receive) debug_printf("queue_only set because %s exists\n", ss);
+       }
+}
+
+
+
+/******************************************************************************/
+/******************************************************************************/
+
+#ifdef EXPERIMENTAL_QUEUE_RAMP
+void
+queue_notify_daemon(const uschar * msgid)
+{
+uschar buf[MESSAGE_ID_LENGTH + 2];
+int fd;
+
+DEBUG(D_queue_run) debug_printf("%s: %s\n", __FUNCTION__, msgid);
+
+buf[0] = NOTIFY_MSG_QRUN;
+memcpy(buf+1, msgid, MESSAGE_ID_LENGTH+1);
+
+if ((fd = socket(AF_UNIX, SOCK_DGRAM, 0)) >= 0)
+  {
+  struct sockaddr_un sun = {.sun_family = AF_UNIX};
+
+  snprintf(sun.sun_path, sizeof(sun.sun_path), "%s/%s",
+    spool_directory, NOTIFIER_SOCKET_NAME);
+
+  if (sendto(fd, buf, sizeof(buf), 0, &sun, sizeof(sun)) < 0)
+    DEBUG(D_queue_run)
+      debug_printf("%s: sendto %s\n", __FUNCTION__, strerror(errno));
+  close(fd);
   }
+else DEBUG(D_queue_run) debug_printf(" socket: %s\n", strerror(errno));
 }
+#endif
 
 #endif /*!COMPILE_UTILITY*/