[BACK]Return to netmgr.c CVS log [TXT][DIR] Up to [cvs.NetBSD.org] / src / external / mpl / bind / dist / lib / isc / netmgr

Please note that diffs are not public domain; they are subject to the copyright notices on the relevant files.

Diff for /src/external/mpl/bind/dist/lib/isc/netmgr/netmgr.c between version 1.1.1.5 and 1.1.1.6

version 1.1.1.5, 2021/08/19 11:45:28 version 1.1.1.6, 2022/09/23 12:09:22
Line 3 
Line 3 
 /*  /*
  * Copyright (C) Internet Systems Consortium, Inc. ("ISC")   * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
  *   *
    * SPDX-License-Identifier: MPL-2.0
    *
  * This Source Code Form is subject to the terms of the Mozilla Public   * This Source Code Form is subject to the terms of the Mozilla Public
  * License, v. 2.0. If a copy of the MPL was not distributed with this   * License, v. 2.0. If a copy of the MPL was not distributed with this
  * file, you can obtain one at https://mozilla.org/MPL/2.0/.   * file, you can obtain one at https://mozilla.org/MPL/2.0/.
Line 23 
Line 25 
 #include <isc/buffer.h>  #include <isc/buffer.h>
 #include <isc/condition.h>  #include <isc/condition.h>
 #include <isc/errno.h>  #include <isc/errno.h>
   #include <isc/list.h>
 #include <isc/log.h>  #include <isc/log.h>
 #include <isc/magic.h>  #include <isc/magic.h>
 #include <isc/mem.h>  #include <isc/mem.h>
Line 146  static isc_threadresult_t
Line 149  static isc_threadresult_t
 nm_thread(isc_threadarg_t worker0);  nm_thread(isc_threadarg_t worker0);
 static void  static void
 async_cb(uv_async_t *handle);  async_cb(uv_async_t *handle);
   
 static bool  static bool
 process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);  process_netievent(isc__networker_t *worker, isc__netievent_t *ievent);
 static isc_result_t  static isc_result_t
Line 155  wait_for_priority_queue(isc__networker_t
Line 159  wait_for_priority_queue(isc__networker_t
 static void  static void
 drain_queue(isc__networker_t *worker, netievent_type_t type);  drain_queue(isc__networker_t *worker, netievent_type_t type);
   
 #define ENQUEUE_NETIEVENT(worker, queue, event) \  
         isc_queue_enqueue(worker->ievents[queue], (uintptr_t)event)  
 #define DEQUEUE_NETIEVENT(worker, queue) \  
         (isc__netievent_t *)isc_queue_dequeue(worker->ievents[queue])  
   
 #define ENQUEUE_PRIORITY_NETIEVENT(worker, event) \  
         ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY, event)  
 #define ENQUEUE_PRIVILEGED_NETIEVENT(worker, event) \  
         ENQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED, event)  
 #define ENQUEUE_TASK_NETIEVENT(worker, event) \  
         ENQUEUE_NETIEVENT(worker, NETIEVENT_TASK, event)  
 #define ENQUEUE_NORMAL_NETIEVENT(worker, event) \  
         ENQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL, event)  
   
 #define DEQUEUE_PRIORITY_NETIEVENT(worker) \  
         DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIORITY)  
 #define DEQUEUE_PRIVILEGED_NETIEVENT(worker) \  
         DEQUEUE_NETIEVENT(worker, NETIEVENT_PRIVILEGED)  
 #define DEQUEUE_TASK_NETIEVENT(worker) DEQUEUE_NETIEVENT(worker, NETIEVENT_TASK)  
 #define DEQUEUE_NORMAL_NETIEVENT(worker) \  
         DEQUEUE_NETIEVENT(worker, NETIEVENT_NORMAL)  
   
 #define INCREMENT_NETIEVENT(worker, queue) \  
         atomic_fetch_add_release(&worker->nievents[queue], 1)  
 #define DECREMENT_NETIEVENT(worker, queue) \  
         atomic_fetch_sub_release(&worker->nievents[queue], 1)  
   
 #define INCREMENT_PRIORITY_NETIEVENT(worker) \  
         INCREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)  
 #define INCREMENT_PRIVILEGED_NETIEVENT(worker) \  
         INCREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)  
 #define INCREMENT_TASK_NETIEVENT(worker) \  
         INCREMENT_NETIEVENT(worker, NETIEVENT_TASK)  
 #define INCREMENT_NORMAL_NETIEVENT(worker) \  
         INCREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)  
   
 #define DECREMENT_PRIORITY_NETIEVENT(worker) \  
         DECREMENT_NETIEVENT(worker, NETIEVENT_PRIORITY)  
 #define DECREMENT_PRIVILEGED_NETIEVENT(worker) \  
         DECREMENT_NETIEVENT(worker, NETIEVENT_PRIVILEGED)  
 #define DECREMENT_TASK_NETIEVENT(worker) \  
         DECREMENT_NETIEVENT(worker, NETIEVENT_TASK)  
 #define DECREMENT_NORMAL_NETIEVENT(worker) \  
         DECREMENT_NETIEVENT(worker, NETIEVENT_NORMAL)  
   
 static void  static void
 isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);  isc__nm_async_stop(isc__networker_t *worker, isc__netievent_t *ev0);
 static void  static void
Line 218  isc__nm_work_cb(uv_work_t *req);
Line 177  isc__nm_work_cb(uv_work_t *req);
 static void  static void
 isc__nm_after_work_cb(uv_work_t *req, int status);  isc__nm_after_work_cb(uv_work_t *req, int status);
   
   void
   isc__nmsocket_reset(isc_nmsocket_t *sock);
   
 /*%<  /*%<
  * Issue a 'handle closed' callback on the socket.   * Issue a 'handle closed' callback on the socket.
  */   */
Line 281  isc__nm_threadpool_initialize(uint32_t w
Line 243  isc__nm_threadpool_initialize(uint32_t w
         }          }
 }  }
   
   #if HAVE_DECL_UV_UDP_LINUX_RECVERR
   #define MINIMAL_UV_VERSION UV_VERSION(1, 42, 0)
   #elif HAVE_DECL_UV_UDP_MMSG_FREE
   #define MINIMAL_UV_VERSION UV_VERSION(1, 40, 0)
   #elif HAVE_DECL_UV_UDP_RECVMMSG
   #define MINIMAL_UV_VERSION UV_VERSION(1, 37, 0)
   #elif HAVE_DECL_UV_UDP_MMSG_CHUNK
   #define MINIMAL_UV_VERSION UV_VERSION(1, 35, 0)
   #else
   #define MINIMAL_UV_VERSION UV_VERSION(1, 0, 0)
   #endif
   
 void  void
 isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {  isc__netmgr_create(isc_mem_t *mctx, uint32_t workers, isc_nm_t **netmgrp) {
         isc_nm_t *mgr = NULL;          isc_nm_t *mgr = NULL;
Line 288  isc__netmgr_create(isc_mem_t *mctx, uint
Line 262  isc__netmgr_create(isc_mem_t *mctx, uint
   
         REQUIRE(workers > 0);          REQUIRE(workers > 0);
   
           if (uv_version() < MINIMAL_UV_VERSION) {
                   isc_error_fatal(__FILE__, __LINE__,
                                   "libuv version too old: running with libuv %s "
                                   "when compiled with libuv %s will lead to "
                                   "libuv failures because of unknown flags",
                                   uv_version_string(), UV_VERSION_STRING);
           }
   
 #ifdef WIN32  #ifdef WIN32
         isc__nm_winsock_initialize();          isc__nm_winsock_initialize();
 #endif /* WIN32 */  #endif /* WIN32 */
Line 307  isc__netmgr_create(isc_mem_t *mctx, uint
Line 289  isc__netmgr_create(isc_mem_t *mctx, uint
         atomic_init(&mgr->workers_paused, 0);          atomic_init(&mgr->workers_paused, 0);
         atomic_init(&mgr->paused, false);          atomic_init(&mgr->paused, false);
         atomic_init(&mgr->closing, false);          atomic_init(&mgr->closing, false);
   #if HAVE_SO_REUSEPORT_LB
           mgr->load_balance_sockets = true;
   #else
           mgr->load_balance_sockets = false;
   #endif
   
 #ifdef NETMGR_TRACE  #ifdef NETMGR_TRACE
         ISC_LIST_INIT(mgr->active_sockets);          ISC_LIST_INIT(mgr->active_sockets);
Line 321  isc__netmgr_create(isc_mem_t *mctx, uint
Line 308  isc__netmgr_create(isc_mem_t *mctx, uint
         atomic_init(&mgr->keepalive, 30000);          atomic_init(&mgr->keepalive, 30000);
         atomic_init(&mgr->advertised, 30000);          atomic_init(&mgr->advertised, 30000);
   
         isc_mutex_init(&mgr->reqlock);  
         isc_mempool_create(mgr->mctx, sizeof(isc__nm_uvreq_t), &mgr->reqpool);  
         isc_mempool_setname(mgr->reqpool, "nm_reqpool");  
         isc_mempool_setfreemax(mgr->reqpool, 4096);  
         isc_mempool_associatelock(mgr->reqpool, &mgr->reqlock);  
         isc_mempool_setfillcount(mgr->reqpool, 32);  
   
         isc_mutex_init(&mgr->evlock);  
         isc_mempool_create(mgr->mctx, sizeof(isc__netievent_storage_t),  
                            &mgr->evpool);  
         isc_mempool_setname(mgr->evpool, "nm_evpool");  
         isc_mempool_setfreemax(mgr->evpool, 4096);  
         isc_mempool_associatelock(mgr->evpool, &mgr->evlock);  
         isc_mempool_setfillcount(mgr->evpool, 32);  
   
         isc_barrier_init(&mgr->pausing, workers);          isc_barrier_init(&mgr->pausing, workers);
         isc_barrier_init(&mgr->resuming, workers);          isc_barrier_init(&mgr->resuming, workers);
   
         mgr->workers = isc_mem_get(mctx, workers * sizeof(isc__networker_t));          mgr->workers = isc_mem_get(mctx, workers * sizeof(isc__networker_t));
         for (size_t i = 0; i < workers; i++) {          for (size_t i = 0; i < workers; i++) {
                 int r;  
                 isc__networker_t *worker = &mgr->workers[i];                  isc__networker_t *worker = &mgr->workers[i];
                   int r;
   
                 *worker = (isc__networker_t){                  *worker = (isc__networker_t){
                         .mgr = mgr,                          .mgr = mgr,
                         .id = i,                          .id = i,
                 };                  };
   
                 r = uv_loop_init(&worker->loop);                  r = uv_loop_init(&worker->loop);
                 RUNTIME_CHECK(r == 0);                  UV_RUNTIME_CHECK(uv_loop_init, r);
   
                 worker->loop.data = &mgr->workers[i];                  worker->loop.data = &mgr->workers[i];
   
                 r = uv_async_init(&worker->loop, &worker->async, async_cb);                  r = uv_async_init(&worker->loop, &worker->async, async_cb);
                 RUNTIME_CHECK(r == 0);                  UV_RUNTIME_CHECK(uv_async_init, r);
   
                 isc_mutex_init(&worker->lock);  
                 isc_condition_init(&worker->cond_prio);  
   
                 for (size_t type = 0; type < NETIEVENT_MAX; type++) {                  for (size_t type = 0; type < NETIEVENT_MAX; type++) {
                         worker->ievents[type] = isc_queue_new(mgr->mctx, 128);                          isc_mutex_init(&worker->ievents[type].lock);
                         atomic_init(&worker->nievents[type], 0);                          isc_condition_init(&worker->ievents[type].cond);
                           ISC_LIST_INIT(worker->ievents[type].list);
                 }                  }
   
                 worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);                  worker->recvbuf = isc_mem_get(mctx, ISC_NETMGR_RECVBUF_SIZE);
Line 412  nm_destroy(isc_nm_t **mgr0) {
Line 383  nm_destroy(isc_nm_t **mgr0) {
   
         for (int i = 0; i < mgr->nworkers; i++) {          for (int i = 0; i < mgr->nworkers; i++) {
                 isc__networker_t *worker = &mgr->workers[i];                  isc__networker_t *worker = &mgr->workers[i];
                 isc__netievent_t *ievent = NULL;  
                 int r;                  int r;
   
                 /* Empty the async event queues */  
                 while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) {  
                         isc_mempool_put(mgr->evpool, ievent);  
                 }  
   
                 INSIST(DEQUEUE_PRIVILEGED_NETIEVENT(worker) == NULL);  
                 INSIST(DEQUEUE_TASK_NETIEVENT(worker) == NULL);  
   
                 while ((ievent = DEQUEUE_PRIORITY_NETIEVENT(worker)) != NULL) {  
                         isc_mempool_put(mgr->evpool, ievent);  
                 }  
                 isc_condition_destroy(&worker->cond_prio);  
   
                 r = uv_loop_close(&worker->loop);                  r = uv_loop_close(&worker->loop);
                 INSIST(r == 0);                  UV_RUNTIME_CHECK(uv_loop_close, r);
   
                 for (size_t type = 0; type < NETIEVENT_MAX; type++) {                  for (size_t type = 0; type < NETIEVENT_MAX; type++) {
                         isc_queue_destroy(worker->ievents[type]);                          INSIST(ISC_LIST_EMPTY(worker->ievents[type].list));
                           isc_condition_destroy(&worker->ievents[type].cond);
                           isc_mutex_destroy(&worker->ievents[type].lock);
                 }                  }
   
                 isc_mem_put(mgr->mctx, worker->sendbuf,                  isc_mem_put(mgr->mctx, worker->sendbuf,
Line 453  nm_destroy(isc_nm_t **mgr0) {
Line 412  nm_destroy(isc_nm_t **mgr0) {
         isc_condition_destroy(&mgr->wkpausecond);          isc_condition_destroy(&mgr->wkpausecond);
         isc_mutex_destroy(&mgr->lock);          isc_mutex_destroy(&mgr->lock);
   
         isc_mempool_destroy(&mgr->evpool);  
         isc_mutex_destroy(&mgr->evlock);  
   
         isc_mempool_destroy(&mgr->reqpool);  
         isc_mutex_destroy(&mgr->reqlock);  
   
         isc_mem_put(mgr->mctx, mgr->workers,          isc_mem_put(mgr->mctx, mgr->workers,
                     mgr->nworkers * sizeof(isc__networker_t));                      mgr->nworkers * sizeof(isc__networker_t));
         isc_mem_putanddetach(&mgr->mctx, mgr, sizeof(*mgr));          isc_mem_putanddetach(&mgr->mctx, mgr, sizeof(*mgr));
Line 633  isc__netmgr_destroy(isc_nm_t **netmgrp) 
Line 586  isc__netmgr_destroy(isc_nm_t **netmgrp) 
 #ifdef NETMGR_TRACE  #ifdef NETMGR_TRACE
         if (isc_refcount_current(&mgr->references) > 1) {          if (isc_refcount_current(&mgr->references) > 1) {
                 isc__nm_dump_active(mgr);                  isc__nm_dump_active(mgr);
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
 #endif  #endif
   
Line 659  isc_nm_maxudp(isc_nm_t *mgr, uint32_t ma
Line 611  isc_nm_maxudp(isc_nm_t *mgr, uint32_t ma
 }  }
   
 void  void
   isc_nmhandle_setwritetimeout(isc_nmhandle_t *handle, uint64_t write_timeout) {
           REQUIRE(VALID_NMHANDLE(handle));
           REQUIRE(VALID_NMSOCK(handle->sock));
   
           handle->sock->write_timeout = write_timeout;
   }
   
   void
 isc_nm_settimeouts(isc_nm_t *mgr, uint32_t init, uint32_t idle,  isc_nm_settimeouts(isc_nm_t *mgr, uint32_t init, uint32_t idle,
                    uint32_t keepalive, uint32_t advertised) {                     uint32_t keepalive, uint32_t advertised) {
         REQUIRE(VALID_NM(mgr));          REQUIRE(VALID_NM(mgr));
Line 669  isc_nm_settimeouts(isc_nm_t *mgr, uint32
Line 629  isc_nm_settimeouts(isc_nm_t *mgr, uint32
         atomic_store(&mgr->advertised, advertised);          atomic_store(&mgr->advertised, advertised);
 }  }
   
   bool
   isc_nm_getloadbalancesockets(isc_nm_t *mgr) {
           REQUIRE(VALID_NM(mgr));
   
           return (mgr->load_balance_sockets);
   }
   
   void
   isc_nm_setloadbalancesockets(isc_nm_t *mgr, bool enabled) {
           REQUIRE(VALID_NM(mgr));
   
   #if HAVE_SO_REUSEPORT_LB
           mgr->load_balance_sockets = enabled;
   #else
           UNUSED(enabled);
   #endif
   }
   
 void  void
 isc_nm_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle,  isc_nm_gettimeouts(isc_nm_t *mgr, uint32_t *initial, uint32_t *idle,
                    uint32_t *keepalive, uint32_t *advertised) {                     uint32_t *keepalive, uint32_t *advertised) {
Line 772  nm_thread(isc_threadarg_t worker0) {
Line 750  nm_thread(isc_threadarg_t worker0) {
         }          }
   
         /*          /*
          * We are shutting down. Process the task queues           * We are shutting down.  Drain the queues.
          * (they may include shutdown events) but do not process  
          * the netmgr event queue.  
          */           */
         drain_queue(worker, NETIEVENT_PRIVILEGED);          drain_queue(worker, NETIEVENT_PRIVILEGED);
         drain_queue(worker, NETIEVENT_TASK);          drain_queue(worker, NETIEVENT_TASK);
   
           for (size_t type = 0; type < NETIEVENT_MAX; type++) {
                   LOCK(&worker->ievents[type].lock);
                   INSIST(ISC_LIST_EMPTY(worker->ievents[type].list));
                   UNLOCK(&worker->ievents[type].lock);
           }
   
         LOCK(&mgr->lock);          LOCK(&mgr->lock);
         mgr->workers_running--;          mgr->workers_running--;
         SIGNAL(&mgr->wkstatecond);          SIGNAL(&mgr->wkstatecond);
Line 800  process_all_queues(isc__networker_t *wor
Line 782  process_all_queues(isc__networker_t *wor
                 isc_result_t result = process_queue(worker, type);                  isc_result_t result = process_queue(worker, type);
                 switch (result) {                  switch (result) {
                 case ISC_R_SUSPEND:                  case ISC_R_SUSPEND:
                         return (true);                          reschedule = true;
                           break;
                 case ISC_R_EMPTY:                  case ISC_R_EMPTY:
                         /* empty queue */                          /* empty queue */
                         break;                          break;
Line 808  process_all_queues(isc__networker_t *wor
Line 791  process_all_queues(isc__networker_t *wor
                         reschedule = true;                          reschedule = true;
                         break;                          break;
                 default:                  default:
                         INSIST(0);                          UNREACHABLE();
                         ISC_UNREACHABLE();  
                 }                  }
         }          }
   
Line 887  isc__nm_async_task(isc__networker_t *wor
Line 869  isc__nm_async_task(isc__networker_t *wor
         case ISC_R_SUCCESS:          case ISC_R_SUCCESS:
                 return;                  return;
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
 }  }
   
 static void  static void
 wait_for_priority_queue(isc__networker_t *worker) {  wait_for_priority_queue(isc__networker_t *worker) {
         isc_condition_t *cond = &worker->cond_prio;          isc_condition_t *cond = &worker->ievents[NETIEVENT_PRIORITY].cond;
         bool wait_for_work = true;          isc_mutex_t *lock = &worker->ievents[NETIEVENT_PRIORITY].lock;
           isc__netievent_list_t *list =
         while (true) {                  &(worker->ievents[NETIEVENT_PRIORITY].list);
                 isc__netievent_t *ievent;  
                 LOCK(&worker->lock);          LOCK(lock);
                 ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);          while (ISC_LIST_EMPTY(*list)) {
                 if (wait_for_work) {                  WAIT(cond, lock);
                         while (ievent == NULL) {  
                                 WAIT(cond, &worker->lock);  
                                 ievent = DEQUEUE_PRIORITY_NETIEVENT(worker);  
                         }  
                 }  
                 UNLOCK(&worker->lock);  
                 wait_for_work = false;  
   
                 if (ievent == NULL) {  
                         return;  
                 }  
                 DECREMENT_PRIORITY_NETIEVENT(worker);  
   
                 (void)process_netievent(worker, ievent);  
         }          }
           UNLOCK(lock);
   
           drain_queue(worker, NETIEVENT_PRIORITY);
 }  }
   
 static void  static void
 drain_queue(isc__networker_t *worker, netievent_type_t type) {  drain_queue(isc__networker_t *worker, netievent_type_t type) {
         while (process_queue(worker, type) != ISC_R_EMPTY) {          bool empty = false;
                 ;          while (!empty) {
                   if (process_queue(worker, type) == ISC_R_EMPTY) {
                           LOCK(&worker->ievents[type].lock);
                           empty = ISC_LIST_EMPTY(worker->ievents[type].list);
                           UNLOCK(&worker->ievents[type].lock);
                   }
         }          }
 }  }
   
Line 997  process_netievent(isc__networker_t *work
Line 972  process_netievent(isc__networker_t *work
                 NETIEVENT_CASE(resume);                  NETIEVENT_CASE(resume);
                 NETIEVENT_CASE_NOMORE(pause);                  NETIEVENT_CASE_NOMORE(pause);
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
         return (true);          return (true);
 }  }
   
 static isc_result_t  static isc_result_t
 process_queue(isc__networker_t *worker, netievent_type_t type) {  process_queue(isc__networker_t *worker, netievent_type_t type) {
         /*          isc__netievent_t *ievent = NULL;
          * The number of items on the queue is only loosely synchronized with          isc__netievent_list_t list;
          * the items on the queue.  But there's a guarantee that if there's an  
          * item on the queue, it will be accounted for.  However there's a          ISC_LIST_INIT(list);
          * possibility that the counter might be higher than the items on the  
          * queue stored.  
          */  
         uint_fast32_t waiting = atomic_load_acquire(&worker->nievents[type]);  
         isc__netievent_t *ievent = DEQUEUE_NETIEVENT(worker, type);  
   
         if (ievent == NULL && waiting == 0) {          LOCK(&worker->ievents[type].lock);
           ISC_LIST_MOVE(list, worker->ievents[type].list);
           UNLOCK(&worker->ievents[type].lock);
   
           ievent = ISC_LIST_HEAD(list);
           if (ievent == NULL) {
                 /* There's nothing scheduled */                  /* There's nothing scheduled */
                 return (ISC_R_EMPTY);                  return (ISC_R_EMPTY);
         } else if (ievent == NULL) {  
                 /* There's at least one item scheduled, but not on the queue yet  
                  */  
                 return (ISC_R_SUCCESS);  
         }          }
   
         while (ievent != NULL) {          while (ievent != NULL) {
                 DECREMENT_NETIEVENT(worker, type);                  isc__netievent_t *next = ISC_LIST_NEXT(ievent, link);
                 bool stop = !process_netievent(worker, ievent);                  ISC_LIST_DEQUEUE(list, ievent, link);
   
                 if (stop) {                  if (!process_netievent(worker, ievent)) {
                         /* Netievent told us to stop */                          /* The netievent told us to stop */
                           if (!ISC_LIST_EMPTY(list)) {
                                   /*
                                    * Reschedule the rest of the unprocessed
                                    * events.
                                    */
                                   LOCK(&worker->ievents[type].lock);
                                   ISC_LIST_PREPENDLIST(worker->ievents[type].list,
                                                        list, link);
                                   UNLOCK(&worker->ievents[type].lock);
                           }
                         return (ISC_R_SUSPEND);                          return (ISC_R_SUSPEND);
                 }                  }
   
                 if (waiting-- == 0) {                  ievent = next;
                         /* We reached this round "quota" */  
                         break;  
                 }  
   
                 ievent = DEQUEUE_NETIEVENT(worker, type);  
         }          }
   
         /* We processed at least one */          /* We processed at least one */
Line 1047  process_queue(isc__networker_t *worker, 
Line 1022  process_queue(isc__networker_t *worker, 
   
 void *  void *
 isc__nm_get_netievent(isc_nm_t *mgr, isc__netievent_type type) {  isc__nm_get_netievent(isc_nm_t *mgr, isc__netievent_type type) {
         isc__netievent_storage_t *event = isc_mempool_get(mgr->evpool);          isc__netievent_storage_t *event = isc_mem_get(mgr->mctx,
                                                         sizeof(*event));
   
         *event = (isc__netievent_storage_t){ .ni.type = type };          *event = (isc__netievent_storage_t){ .ni.type = type };
           ISC_LINK_INIT(&(event->ni), link);
         return (event);          return (event);
 }  }
   
 void  void
 isc__nm_put_netievent(isc_nm_t *mgr, void *ievent) {  isc__nm_put_netievent(isc_nm_t *mgr, void *ievent) {
         isc_mempool_put(mgr->evpool, ievent);          isc_mem_put(mgr->mctx, ievent, sizeof(isc__netievent_storage_t));
 }  }
   
 NETIEVENT_SOCKET_DEF(tcpclose);  NETIEVENT_SOCKET_DEF(tcpclose);
Line 1117  isc__nm_maybe_enqueue_ievent(isc__networ
Line 1094  isc__nm_maybe_enqueue_ievent(isc__networ
   
 void  void
 isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {  isc__nm_enqueue_ievent(isc__networker_t *worker, isc__netievent_t *event) {
           netievent_type_t type;
   
         if (event->type > netievent_prio) {          if (event->type > netievent_prio) {
                 /*                  type = NETIEVENT_PRIORITY;
                  * We need to make sure this signal will be delivered and  
                  * the queue will be processed.  
                  */  
                 LOCK(&worker->lock);  
                 INCREMENT_PRIORITY_NETIEVENT(worker);  
                 ENQUEUE_PRIORITY_NETIEVENT(worker, event);  
                 SIGNAL(&worker->cond_prio);  
                 UNLOCK(&worker->lock);  
         } else if (event->type == netievent_privilegedtask) {  
                 INCREMENT_PRIVILEGED_NETIEVENT(worker);  
                 ENQUEUE_PRIVILEGED_NETIEVENT(worker, event);  
         } else if (event->type == netievent_task) {  
                 INCREMENT_TASK_NETIEVENT(worker);  
                 ENQUEUE_TASK_NETIEVENT(worker, event);  
         } else {          } else {
                 INCREMENT_NORMAL_NETIEVENT(worker);                  switch (event->type) {
                 ENQUEUE_NORMAL_NETIEVENT(worker, event);                  case netievent_prio:
                           UNREACHABLE();
                           break;
                   case netievent_privilegedtask:
                           type = NETIEVENT_PRIVILEGED;
                           break;
                   case netievent_task:
                           type = NETIEVENT_TASK;
                           break;
                   default:
                           type = NETIEVENT_NORMAL;
                           break;
                   }
           }
   
           /*
            * We need to make sure this signal will be delivered and
            * the queue will be processed.
            */
           LOCK(&worker->ievents[type].lock);
           ISC_LIST_ENQUEUE(worker->ievents[type].list, event, link);
           if (type == NETIEVENT_PRIORITY) {
                   SIGNAL(&worker->ievents[type].cond);
         }          }
           UNLOCK(&worker->ievents[type].lock);
   
         uv_async_send(&worker->async);          uv_async_send(&worker->async);
 }  }
   
Line 1261  nmsocket_cleanup(isc_nmsocket_t *sock, b
Line 1250  nmsocket_cleanup(isc_nmsocket_t *sock, b
         isc_astack_destroy(sock->inactivehandles);          isc_astack_destroy(sock->inactivehandles);
   
         while ((uvreq = isc_astack_pop(sock->inactivereqs)) != NULL) {          while ((uvreq = isc_astack_pop(sock->inactivereqs)) != NULL) {
                 isc_mempool_put(sock->mgr->reqpool, uvreq);                  isc_mem_put(sock->mgr->mctx, uvreq, sizeof(*uvreq));
         }          }
   
         isc_astack_destroy(sock->inactivereqs);          isc_astack_destroy(sock->inactivereqs);
         sock->magic = 0;          sock->magic = 0;
   
         isc_mem_free(sock->mgr->mctx, sock->ah_frees);  
         isc_mem_free(sock->mgr->mctx, sock->ah_handles);  
         isc_mutex_destroy(&sock->lock);  
         isc_condition_destroy(&sock->scond);          isc_condition_destroy(&sock->scond);
           isc_condition_destroy(&sock->cond);
           isc_mutex_destroy(&sock->lock);
 #ifdef NETMGR_TRACE  #ifdef NETMGR_TRACE
         LOCK(&sock->mgr->lock);          LOCK(&sock->mgr->lock);
         ISC_LIST_UNLINK(sock->mgr->active_sockets, sock, active_link);          ISC_LIST_UNLINK(sock->mgr->active_sockets, sock, active_link);
Line 1444  isc___nmsocket_init(isc_nmsocket_t *sock
Line 1432  isc___nmsocket_init(isc_nmsocket_t *sock
         *sock = (isc_nmsocket_t){ .type = type,          *sock = (isc_nmsocket_t){ .type = type,
                                   .iface = *iface,                                    .iface = *iface,
                                   .fd = -1,                                    .fd = -1,
                                   .ah_size = 32,  
                                   .inactivehandles = isc_astack_new(                                    .inactivehandles = isc_astack_new(
                                           mgr->mctx, ISC_NM_HANDLES_STACK_SIZE),                                            mgr->mctx, ISC_NM_HANDLES_STACK_SIZE),
                                   .inactivereqs = isc_astack_new(                                    .inactivereqs = isc_astack_new(
Line 1462  isc___nmsocket_init(isc_nmsocket_t *sock
Line 1449  isc___nmsocket_init(isc_nmsocket_t *sock
         isc_nm_attach(mgr, &sock->mgr);          isc_nm_attach(mgr, &sock->mgr);
         sock->uv_handle.handle.data = sock;          sock->uv_handle.handle.data = sock;
   
         sock->ah_frees = isc_mem_allocate(mgr->mctx,  
                                           sock->ah_size * sizeof(size_t));  
         sock->ah_handles = isc_mem_allocate(  
                 mgr->mctx, sock->ah_size * sizeof(isc_nmhandle_t *));  
         ISC_LINK_INIT(&sock->quotacb, link);          ISC_LINK_INIT(&sock->quotacb, link);
         for (size_t i = 0; i < 32; i++) {  
                 sock->ah_frees[i] = i;  
                 sock->ah_handles[i] = NULL;  
         }  
   
         switch (type) {          switch (type) {
         case isc_nm_udpsocket:          case isc_nm_udpsocket:
Line 1518  isc___nmsocket_init(isc_nmsocket_t *sock
Line 1497  isc___nmsocket_init(isc_nmsocket_t *sock
         atomic_init(&sock->connecting, false);          atomic_init(&sock->connecting, false);
         atomic_init(&sock->keepalive, false);          atomic_init(&sock->keepalive, false);
         atomic_init(&sock->connected, false);          atomic_init(&sock->connected, false);
           atomic_init(&sock->timedout, false);
   
         atomic_init(&sock->active_child_connections, 0);          atomic_init(&sock->active_child_connections, 0);
   
Line 1542  isc__nm_free_uvbuf(isc_nmsocket_t *sock,
Line 1522  isc__nm_free_uvbuf(isc_nmsocket_t *sock,
         isc__networker_t *worker = NULL;          isc__networker_t *worker = NULL;
   
         REQUIRE(VALID_NMSOCK(sock));          REQUIRE(VALID_NMSOCK(sock));
         if (buf->base == NULL) {  
                 /* Empty buffer: might happen in case of error. */  
                 return;  
         }  
         worker = &sock->mgr->workers[sock->tid];  
   
         REQUIRE(worker->recvbuf_inuse);          worker = &sock->mgr->workers[sock->tid];
         if (sock->type == isc_nm_udpsocket && buf->base > worker->recvbuf &&  
             buf->base <= worker->recvbuf + ISC_NETMGR_RECVBUF_SIZE)  
         {  
                 /* Can happen in case of out-of-order recvmmsg in libuv1.36 */  
                 return;  
         }  
         REQUIRE(buf->base == worker->recvbuf);          REQUIRE(buf->base == worker->recvbuf);
   
         worker->recvbuf_inuse = false;          worker->recvbuf_inuse = false;
 }  }
   
Line 1578  isc_nmhandle_t *
Line 1548  isc_nmhandle_t *
 isc___nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,  isc___nmhandle_get(isc_nmsocket_t *sock, isc_sockaddr_t *peer,
                    isc_sockaddr_t *local FLARG) {                     isc_sockaddr_t *local FLARG) {
         isc_nmhandle_t *handle = NULL;          isc_nmhandle_t *handle = NULL;
         size_t handlenum;  
         int pos;  
   
         REQUIRE(VALID_NMSOCK(sock));          REQUIRE(VALID_NMSOCK(sock));
   
Line 1614  isc___nmhandle_get(isc_nmsocket_t *sock,
Line 1582  isc___nmhandle_get(isc_nmsocket_t *sock,
                 handle->local = sock->iface;                  handle->local = sock->iface;
         }          }
   
         LOCK(&sock->lock);          (void)atomic_fetch_add(&sock->ah, 1);
         /* We need to add this handle to the list of active handles */  
         if ((size_t)atomic_load(&sock->ah) == sock->ah_size) {  
                 sock->ah_frees =  
                         isc_mem_reallocate(sock->mgr->mctx, sock->ah_frees,  
                                            sock->ah_size * 2 * sizeof(size_t));  
                 sock->ah_handles = isc_mem_reallocate(  
                         sock->mgr->mctx, sock->ah_handles,  
                         sock->ah_size * 2 * sizeof(isc_nmhandle_t *));  
   
                 for (size_t i = sock->ah_size; i < sock->ah_size * 2; i++) {  
                         sock->ah_frees[i] = i;  
                         sock->ah_handles[i] = NULL;  
                 }  
   
                 sock->ah_size *= 2;  
         }  
   
         handlenum = atomic_fetch_add(&sock->ah, 1);  
         pos = sock->ah_frees[handlenum];  
   
         INSIST(sock->ah_handles[pos] == NULL);  
         sock->ah_handles[pos] = handle;  
         handle->ah_pos = pos;  
 #ifdef NETMGR_TRACE  #ifdef NETMGR_TRACE
           LOCK(&sock->lock);
         ISC_LIST_APPEND(sock->active_handles, handle, active_link);          ISC_LIST_APPEND(sock->active_handles, handle, active_link);
 #endif  
         UNLOCK(&sock->lock);          UNLOCK(&sock->lock);
   #endif
   
         switch (sock->type) {          switch (sock->type) {
         case isc_nm_udpsocket:          case isc_nm_udpsocket:
Line 1649  isc___nmhandle_get(isc_nmsocket_t *sock,
Line 1596  isc___nmhandle_get(isc_nmsocket_t *sock,
                 if (!atomic_load(&sock->client)) {                  if (!atomic_load(&sock->client)) {
                         break;                          break;
                 }                  }
                 /* fallthrough */                  FALLTHROUGH;
         case isc_nm_tcpsocket:          case isc_nm_tcpsocket:
                 INSIST(sock->statichandle == NULL);                  INSIST(sock->statichandle == NULL);
   
Line 1706  nmhandle_free(isc_nmsocket_t *sock, isc_
Line 1653  nmhandle_free(isc_nmsocket_t *sock, isc_
   
 static void  static void
 nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {  nmhandle_deactivate(isc_nmsocket_t *sock, isc_nmhandle_t *handle) {
         size_t handlenum;  
         bool reuse = false;          bool reuse = false;
   
         /*          /*
Line 1716  nmhandle_deactivate(isc_nmsocket_t *sock
Line 1662  nmhandle_deactivate(isc_nmsocket_t *sock
          */           */
         LOCK(&sock->lock);          LOCK(&sock->lock);
   
         INSIST(sock->ah_handles[handle->ah_pos] == handle);  
         INSIST(sock->ah_size > handle->ah_pos);  
         INSIST(atomic_load(&sock->ah) > 0);  
   
 #ifdef NETMGR_TRACE  #ifdef NETMGR_TRACE
         ISC_LIST_UNLINK(sock->active_handles, handle, active_link);          ISC_LIST_UNLINK(sock->active_handles, handle, active_link);
 #endif  #endif
   
         sock->ah_handles[handle->ah_pos] = NULL;          INSIST(atomic_fetch_sub(&sock->ah, 1) > 0);
         handlenum = atomic_fetch_sub(&sock->ah, 1) - 1;  
         sock->ah_frees[handlenum] = handle->ah_pos;  #if !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__
         handle->ah_pos = 0;  
         if (atomic_load(&sock->active)) {          if (atomic_load(&sock->active)) {
                 reuse = isc_astack_trypush(sock->inactivehandles, handle);                  reuse = isc_astack_trypush(sock->inactivehandles, handle);
         }          }
   #endif /* !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ */
         if (!reuse) {          if (!reuse) {
                 nmhandle_free(sock, handle);                  nmhandle_free(sock, handle);
         }          }
Line 1748  isc__nmhandle_detach(isc_nmhandle_t **ha
Line 1690  isc__nmhandle_detach(isc_nmhandle_t **ha
         handle = *handlep;          handle = *handlep;
         *handlep = NULL;          *handlep = NULL;
   
           /*
            * If the closehandle_cb is set, it needs to run asynchronously to
            * ensure correct ordering of the isc__nm_process_sock_buffer().
            */
         sock = handle->sock;          sock = handle->sock;
         if (sock->tid == isc_nm_tid()) {          if (sock->tid == isc_nm_tid() && sock->closehandle_cb == NULL) {
                 nmhandle_detach_cb(&handle FLARG_PASS);                  nmhandle_detach_cb(&handle FLARG_PASS);
         } else {          } else {
                 isc__netievent_detach_t *event =                  isc__netievent_detach_t *event =
Line 1909  isc__nm_failed_connect_cb(isc_nmsocket_t
Line 1855  isc__nm_failed_connect_cb(isc_nmsocket_t
         REQUIRE(req->cb.connect != NULL);          REQUIRE(req->cb.connect != NULL);
   
         isc__nmsocket_timer_stop(sock);          isc__nmsocket_timer_stop(sock);
         uv_handle_set_data((uv_handle_t *)&sock->timer, sock);          uv_handle_set_data((uv_handle_t *)&sock->read_timer, sock);
   
         INSIST(atomic_compare_exchange_strong(&sock->connecting,          INSIST(atomic_compare_exchange_strong(&sock->connecting,
                                               &(bool){ true }, false));                                                &(bool){ true }, false));
Line 1936  isc__nm_failed_read_cb(isc_nmsocket_t *s
Line 1882  isc__nm_failed_read_cb(isc_nmsocket_t *s
                 isc__nm_tcpdns_failed_read_cb(sock, result);                  isc__nm_tcpdns_failed_read_cb(sock, result);
                 return;                  return;
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
 }  }
   
Line 1955  isc__nmsocket_connecttimeout_cb(uv_timer
Line 1900  isc__nmsocket_connecttimeout_cb(uv_timer
   
         isc__nmsocket_timer_stop(sock);          isc__nmsocket_timer_stop(sock);
   
         /* Call the connect callback directly */          /*
            * Mark the connection as timed out and shutdown the socket.
            */
   
         req->cb.connect(req->handle, ISC_R_TIMEDOUT, req->cbarg);          INSIST(atomic_compare_exchange_strong(&sock->timedout, &(bool){ false },
                                                 true));
           isc__nmsocket_clearcb(sock);
           isc__nmsocket_shutdown(sock);
   }
   
         /* Timer is not running, cleanup and shutdown everything */  void
         if (!isc__nmsocket_timer_running(sock)) {  isc__nm_accept_connection_log(isc_result_t result, bool can_log_quota) {
                 INSIST(atomic_compare_exchange_strong(&sock->connecting,          int level;
                                                       &(bool){ true }, false));  
                 isc__nm_uvreq_put(&req, sock);          switch (result) {
                 isc__nmsocket_clearcb(sock);          case ISC_R_SUCCESS:
                 isc__nmsocket_shutdown(sock);          case ISC_R_NOCONN:
                   return;
           case ISC_R_QUOTA:
           case ISC_R_SOFTQUOTA:
                   if (!can_log_quota) {
                           return;
                   }
                   level = ISC_LOG_INFO;
                   break;
           case ISC_R_NOTCONNECTED:
                   level = ISC_LOG_INFO;
                   break;
           default:
                   level = ISC_LOG_ERROR;
         }          }
   
           isc_log_write(isc_lctx, ISC_LOGCATEGORY_GENERAL, ISC_LOGMODULE_NETMGR,
                         level, "Accepting TCP connection failed: %s",
                         isc_result_totext(result));
 }  }
   
 static void  void
   isc__nmsocket_writetimeout_cb(void *data, isc_result_t eresult) {
           isc__nm_uvreq_t *req = data;
           isc_nmsocket_t *sock = NULL;
   
           REQUIRE(eresult == ISC_R_TIMEDOUT);
           REQUIRE(VALID_UVREQ(req));
           REQUIRE(VALID_NMSOCK(req->sock));
   
           sock = req->sock;
   
           isc__nmsocket_reset(sock);
   }
   
   void
 isc__nmsocket_readtimeout_cb(uv_timer_t *timer) {  isc__nmsocket_readtimeout_cb(uv_timer_t *timer) {
         isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)timer);          isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)timer);
   
Line 1996  isc__nmsocket_readtimeout_cb(uv_timer_t 
Line 1978  isc__nmsocket_readtimeout_cb(uv_timer_t 
   
 void  void
 isc__nmsocket_timer_restart(isc_nmsocket_t *sock) {  isc__nmsocket_timer_restart(isc_nmsocket_t *sock) {
         int r = 0;  
   
         REQUIRE(VALID_NMSOCK(sock));          REQUIRE(VALID_NMSOCK(sock));
   
         if (atomic_load(&sock->connecting)) {          if (atomic_load(&sock->connecting)) {
                   int r;
   
                 if (sock->connect_timeout == 0) {                  if (sock->connect_timeout == 0) {
                         return;                          return;
                 }                  }
   
                 r = uv_timer_start(&sock->timer,                  r = uv_timer_start(&sock->read_timer,
                                    isc__nmsocket_connecttimeout_cb,                                     isc__nmsocket_connecttimeout_cb,
                                    sock->connect_timeout + 10, 0);                                     sock->connect_timeout + 10, 0);
                   UV_RUNTIME_CHECK(uv_timer_start, r);
   
         } else {          } else {
                   int r;
   
                 if (sock->read_timeout == 0) {                  if (sock->read_timeout == 0) {
                         return;                          return;
                 }                  }
   
                 r = uv_timer_start(&sock->timer, isc__nmsocket_readtimeout_cb,                  r = uv_timer_start(&sock->read_timer,
                                      isc__nmsocket_readtimeout_cb,
                                    sock->read_timeout, 0);                                     sock->read_timeout, 0);
                   UV_RUNTIME_CHECK(uv_timer_start, r);
         }          }
   
         RUNTIME_CHECK(r == 0);  
 }  }
   
 bool  bool
 isc__nmsocket_timer_running(isc_nmsocket_t *sock) {  isc__nmsocket_timer_running(isc_nmsocket_t *sock) {
         REQUIRE(VALID_NMSOCK(sock));          REQUIRE(VALID_NMSOCK(sock));
   
         return (uv_is_active((uv_handle_t *)&sock->timer));          return (uv_is_active((uv_handle_t *)&sock->read_timer));
 }  }
   
 void  void
Line 2041  isc__nmsocket_timer_start(isc_nmsocket_t
Line 2026  isc__nmsocket_timer_start(isc_nmsocket_t
   
 void  void
 isc__nmsocket_timer_stop(isc_nmsocket_t *sock) {  isc__nmsocket_timer_stop(isc_nmsocket_t *sock) {
           int r;
   
         REQUIRE(VALID_NMSOCK(sock));          REQUIRE(VALID_NMSOCK(sock));
   
         /* uv_timer_stop() is idempotent, no need to check if running */          /* uv_timer_stop() is idempotent, no need to check if running */
   
         int r = uv_timer_stop(&sock->timer);          r = uv_timer_stop(&sock->read_timer);
         RUNTIME_CHECK(r == 0);          UV_RUNTIME_CHECK(uv_timer_stop, r);
 }  }
   
 isc__nm_uvreq_t *  isc__nm_uvreq_t *
Line 2074  isc__nm_get_read_req(isc_nmsocket_t *soc
Line 2061  isc__nm_get_read_req(isc_nmsocket_t *soc
 }  }
   
 /*%<  /*%<
  * Allocator for read operations. Limited to size 2^16.   * Allocator callback for read operations.
  *   *
  * Note this doesn't actually allocate anything, it just assigns the   * Note this doesn't actually allocate anything, it just assigns the
  * worker's receive buffer to a socket, and marks it as "in use".   * worker's receive buffer to a socket, and marks it as "in use".
Line 2086  isc__nm_alloc_cb(uv_handle_t *handle, si
Line 2073  isc__nm_alloc_cb(uv_handle_t *handle, si
   
         REQUIRE(VALID_NMSOCK(sock));          REQUIRE(VALID_NMSOCK(sock));
         REQUIRE(isc__nm_in_netthread());          REQUIRE(isc__nm_in_netthread());
           /*
            * The size provided by libuv is only suggested size, and it always
            * defaults to 64 * 1024 in the current versions of libuv (see
            * src/unix/udp.c and src/unix/stream.c).
            */
           UNUSED(size);
   
           worker = &sock->mgr->workers[sock->tid];
           INSIST(!worker->recvbuf_inuse);
           INSIST(worker->recvbuf != NULL);
   
         switch (sock->type) {          switch (sock->type) {
         case isc_nm_udpsocket:          case isc_nm_udpsocket:
                 REQUIRE(size <= ISC_NETMGR_RECVBUF_SIZE);                  buf->len = ISC_NETMGR_UDP_RECVBUF_SIZE;
                 size = ISC_NETMGR_RECVBUF_SIZE;  
                 break;                  break;
         case isc_nm_tcpsocket:          case isc_nm_tcpsocket:
         case isc_nm_tcpdnssocket:          case isc_nm_tcpdnssocket:
                   buf->len = ISC_NETMGR_TCP_RECVBUF_SIZE;
                 break;                  break;
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
   
         worker = &sock->mgr->workers[sock->tid];          REQUIRE(buf->len <= ISC_NETMGR_RECVBUF_SIZE);
         INSIST(!worker->recvbuf_inuse || sock->type == isc_nm_udpsocket);  
   
         buf->base = worker->recvbuf;          buf->base = worker->recvbuf;
         buf->len = size;  
         worker->recvbuf_inuse = true;          worker->recvbuf_inuse = true;
 }  }
   
 void  isc_result_t
 isc__nm_start_reading(isc_nmsocket_t *sock) {  isc__nm_start_reading(isc_nmsocket_t *sock) {
           isc_result_t result = ISC_R_SUCCESS;
         int r;          int r;
   
         if (sock->reading) {          if (sock->reading) {
                 return;                  return (ISC_R_SUCCESS);
         }          }
   
         switch (sock->type) {          switch (sock->type) {
Line 2130  isc__nm_start_reading(isc_nmsocket_t *so
Line 2125  isc__nm_start_reading(isc_nmsocket_t *so
                                   isc__nm_tcpdns_read_cb);                                    isc__nm_tcpdns_read_cb);
                 break;                  break;
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
         RUNTIME_CHECK(r == 0);  
         sock->reading = true;          if (r != 0) {
                   result = isc__nm_uverr2result(r);
           } else {
                   sock->reading = true;
           }
   
           return (result);
 }  }
   
 void  void
Line 2148  isc__nm_stop_reading(isc_nmsocket_t *soc
Line 2148  isc__nm_stop_reading(isc_nmsocket_t *soc
         switch (sock->type) {          switch (sock->type) {
         case isc_nm_udpsocket:          case isc_nm_udpsocket:
                 r = uv_udp_recv_stop(&sock->uv_handle.udp);                  r = uv_udp_recv_stop(&sock->uv_handle.udp);
                   UV_RUNTIME_CHECK(uv_udp_recv_stop, r);
                 break;                  break;
         case isc_nm_tcpsocket:          case isc_nm_tcpsocket:
         case isc_nm_tcpdnssocket:          case isc_nm_tcpdnssocket:
                 r = uv_read_stop(&sock->uv_handle.stream);                  r = uv_read_stop(&sock->uv_handle.stream);
                   UV_RUNTIME_CHECK(uv_read_stop, r);
                 break;                  break;
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
         RUNTIME_CHECK(r == 0);  
         sock->reading = false;          sock->reading = false;
 }  }
   
Line 2179  processbuffer(isc_nmsocket_t *sock) {
Line 2179  processbuffer(isc_nmsocket_t *sock) {
         case isc_nm_tcpdnssocket:          case isc_nm_tcpdnssocket:
                 return (isc__nm_tcpdns_processbuffer(sock));                  return (isc__nm_tcpdns_processbuffer(sock));
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
 }  }
   
Line 2196  processbuffer(isc_nmsocket_t *sock) {
Line 2195  processbuffer(isc_nmsocket_t *sock) {
  * limit. In this case we'll be called again by resume_processing()   * limit. In this case we'll be called again by resume_processing()
  * later.   * later.
  */   */
 void  isc_result_t
 isc__nm_process_sock_buffer(isc_nmsocket_t *sock) {  isc__nm_process_sock_buffer(isc_nmsocket_t *sock) {
         for (;;) {          for (;;) {
                 int_fast32_t ah = atomic_load(&sock->ah);                  int_fast32_t ah = atomic_load(&sock->ah);
Line 2207  isc__nm_process_sock_buffer(isc_nmsocket
Line 2206  isc__nm_process_sock_buffer(isc_nmsocket
                          * Don't reset the timer until we have a                           * Don't reset the timer until we have a
                          * full DNS message.                           * full DNS message.
                          */                           */
                         isc__nm_start_reading(sock);                          result = isc__nm_start_reading(sock);
                           if (result != ISC_R_SUCCESS) {
                                   return (result);
                           }
                         /*                          /*
                          * Start the timer only if there are no externally used                           * Start the timer only if there are no externally used
                          * active handles, there's always one active handle                           * active handles, there's always one active handle
Line 2217  isc__nm_process_sock_buffer(isc_nmsocket
Line 2219  isc__nm_process_sock_buffer(isc_nmsocket
                         if (ah == 1) {                          if (ah == 1) {
                                 isc__nmsocket_timer_start(sock);                                  isc__nmsocket_timer_start(sock);
                         }                          }
                         return;                          goto done;
                 case ISC_R_CANCELED:                  case ISC_R_CANCELED:
                         isc__nmsocket_timer_stop(sock);                          isc__nmsocket_timer_stop(sock);
                         isc__nm_stop_reading(sock);                          isc__nm_stop_reading(sock);
                         return;                          goto done;
                 case ISC_R_SUCCESS:                  case ISC_R_SUCCESS:
                         /*                          /*
                          * Stop the timer on the successful message read, this                           * Stop the timer on the successful message read, this
Line 2235  isc__nm_process_sock_buffer(isc_nmsocket
Line 2237  isc__nm_process_sock_buffer(isc_nmsocket
                             ah >= STREAM_CLIENTS_PER_CONN)                              ah >= STREAM_CLIENTS_PER_CONN)
                         {                          {
                                 isc__nm_stop_reading(sock);                                  isc__nm_stop_reading(sock);
                                 return;                                  goto done;
                         }                          }
                         break;                          break;
                 default:                  default:
                         INSIST(0);                          UNREACHABLE();
                 }                  }
         }          }
   done:
           return (ISC_R_SUCCESS);
 }  }
   
 void  void
Line 2268  isc_nmhandle_cleartimeout(isc_nmhandle_t
Line 2272  isc_nmhandle_cleartimeout(isc_nmhandle_t
         default:          default:
                 handle->sock->read_timeout = 0;                  handle->sock->read_timeout = 0;
   
                 if (uv_is_active((uv_handle_t *)&handle->sock->timer)) {                  if (uv_is_active((uv_handle_t *)&handle->sock->read_timer)) {
                         isc__nmsocket_timer_stop(handle->sock);                          isc__nmsocket_timer_stop(handle->sock);
                 }                  }
         }          }
Line 2286  isc_nmhandle_settimeout(isc_nmhandle_t *
Line 2290  isc_nmhandle_settimeout(isc_nmhandle_t *
         }          }
 }  }
   
   void
   isc_nmhandle_keepalive(isc_nmhandle_t *handle, bool value) {
           isc_nmsocket_t *sock = NULL;
   
           REQUIRE(VALID_NMHANDLE(handle));
           REQUIRE(VALID_NMSOCK(handle->sock));
   
           sock = handle->sock;
   
           switch (sock->type) {
           case isc_nm_tcpsocket:
           case isc_nm_tcpdnssocket:
                   atomic_store(&sock->keepalive, value);
                   sock->read_timeout = value ? atomic_load(&sock->mgr->keepalive)
                                              : atomic_load(&sock->mgr->idle);
                   sock->write_timeout = value ? atomic_load(&sock->mgr->keepalive)
                                               : atomic_load(&sock->mgr->idle);
                   break;
           default:
                   /*
                    * For any other protocol, this is a no-op.
                    */
                   return;
           }
   }
   
 void *  void *
 isc_nmhandle_getextra(isc_nmhandle_t *handle) {  isc_nmhandle_getextra(isc_nmhandle_t *handle) {
         REQUIRE(VALID_NMHANDLE(handle));          REQUIRE(VALID_NMHANDLE(handle));
Line 2328  isc___nm_uvreq_get(isc_nm_t *mgr, isc_nm
Line 2358  isc___nm_uvreq_get(isc_nm_t *mgr, isc_nm
         }          }
   
         if (req == NULL) {          if (req == NULL) {
                 req = isc_mempool_get(mgr->reqpool);                  req = isc_mem_get(mgr->mctx, sizeof(*req));
         }          }
   
         *req = (isc__nm_uvreq_t){ .magic = 0 };          *req = (isc__nm_uvreq_t){ .magic = 0 };
Line 2362  isc___nm_uvreq_put(isc__nm_uvreq_t **req
Line 2392  isc___nm_uvreq_put(isc__nm_uvreq_t **req
         handle = req->handle;          handle = req->handle;
         req->handle = NULL;          req->handle = NULL;
   
   #if !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__
         if (!isc__nmsocket_active(sock) ||          if (!isc__nmsocket_active(sock) ||
             !isc_astack_trypush(sock->inactivereqs, req)) {              !isc_astack_trypush(sock->inactivereqs, req)) {
                 isc_mempool_put(sock->mgr->reqpool, req);                  isc_mem_put(sock->mgr->mctx, req, sizeof(*req));
         }          }
   #else  /* !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ */
           isc_mem_put(sock->mgr->mctx, req, sizeof(*req));
   #endif /* !__SANITIZE_ADDRESS__ && !__SANITIZE_THREAD__ */
   
         if (handle != NULL) {          if (handle != NULL) {
                 isc__nmhandle_detach(&handle FLARG_PASS);                  isc__nmhandle_detach(&handle FLARG_PASS);
Line 2391  isc_nm_send(isc_nmhandle_t *handle, isc_
Line 2425  isc_nm_send(isc_nmhandle_t *handle, isc_
                 isc__nm_tcpdns_send(handle, region, cb, cbarg);                  isc__nm_tcpdns_send(handle, region, cb, cbarg);
                 break;                  break;
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
 }  }
   
Line 2418  isc_nm_read(isc_nmhandle_t *handle, isc_
Line 2451  isc_nm_read(isc_nmhandle_t *handle, isc_
                 isc__nm_tcpdns_read(handle, cb, cbarg);                  isc__nm_tcpdns_read(handle, cb, cbarg);
                 break;                  break;
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
 }  }
   
Line 2438  isc_nm_cancelread(isc_nmhandle_t *handle
Line 2470  isc_nm_cancelread(isc_nmhandle_t *handle
                 isc__nm_tcpdns_cancelread(handle);                  isc__nm_tcpdns_cancelread(handle);
                 break;                  break;
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
 }  }
   
Line 2454  isc_nm_pauseread(isc_nmhandle_t *handle)
Line 2485  isc_nm_pauseread(isc_nmhandle_t *handle)
                 isc__nm_tcp_pauseread(handle);                  isc__nm_tcp_pauseread(handle);
                 break;                  break;
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
 }  }
   
Line 2470  isc_nm_resumeread(isc_nmhandle_t *handle
Line 2500  isc_nm_resumeread(isc_nmhandle_t *handle
                 isc__nm_tcp_resumeread(handle);                  isc__nm_tcp_resumeread(handle);
                 break;                  break;
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
 }  }
   
Line 2490  isc_nm_stoplistening(isc_nmsocket_t *soc
Line 2519  isc_nm_stoplistening(isc_nmsocket_t *soc
                 isc__nm_tcp_stoplistening(sock);                  isc__nm_tcp_stoplistening(sock);
                 break;                  break;
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
 }  }
   
Line 2563  isc__nm_async_readcb(isc__networker_t *w
Line 2591  isc__nm_async_readcb(isc__networker_t *w
         isc_nmsocket_t *sock = ievent->sock;          isc_nmsocket_t *sock = ievent->sock;
         isc__nm_uvreq_t *uvreq = ievent->req;          isc__nm_uvreq_t *uvreq = ievent->req;
         isc_result_t eresult = ievent->result;          isc_result_t eresult = ievent->result;
         isc_region_t region = { .base = (unsigned char *)uvreq->uvbuf.base,          isc_region_t region;
                                 .length = uvreq->uvbuf.len };  
   
         UNUSED(worker);          UNUSED(worker);
   
Line 2573  isc__nm_async_readcb(isc__networker_t *w
Line 2600  isc__nm_async_readcb(isc__networker_t *w
         REQUIRE(VALID_NMHANDLE(uvreq->handle));          REQUIRE(VALID_NMHANDLE(uvreq->handle));
         REQUIRE(sock->tid == isc_nm_tid());          REQUIRE(sock->tid == isc_nm_tid());
   
           region.base = (unsigned char *)uvreq->uvbuf.base;
           region.length = uvreq->uvbuf.len;
   
         uvreq->cb.recv(uvreq->handle, eresult, &region, uvreq->cbarg);          uvreq->cb.recv(uvreq->handle, eresult, &region, uvreq->cbarg);
   
         isc__nm_uvreq_put(&uvreq, sock);          isc__nm_uvreq_put(&uvreq, sock);
Line 2646  isc__nm_async_detach(isc__networker_t *w
Line 2676  isc__nm_async_detach(isc__networker_t *w
         nmhandle_detach_cb(&ievent->handle FLARG_PASS);          nmhandle_detach_cb(&ievent->handle FLARG_PASS);
 }  }
   
   static void
   reset_shutdown(uv_handle_t *handle) {
           isc_nmsocket_t *sock = uv_handle_get_data(handle);
   
           isc__nmsocket_shutdown(sock);
           isc__nmsocket_detach(&sock);
   }
   
   void
   isc__nmsocket_reset(isc_nmsocket_t *sock) {
           REQUIRE(VALID_NMSOCK(sock));
   
           switch (sock->type) {
           case isc_nm_tcpsocket:
           case isc_nm_tcpdnssocket:
                   /*
                    * This can be called from the TCP write timeout.
                    */
                   REQUIRE(sock->parent == NULL);
                   break;
           default:
                   UNREACHABLE();
                   break;
           }
   
           if (!uv_is_closing(&sock->uv_handle.handle) &&
               uv_is_active(&sock->uv_handle.handle))
           {
                   /*
                    * The real shutdown will be handled in the respective
                    * close functions.
                    */
                   isc__nmsocket_attach(sock, &(isc_nmsocket_t *){ NULL });
                   int r = uv_tcp_close_reset(&sock->uv_handle.tcp,
                                              reset_shutdown);
                   UV_RUNTIME_CHECK(uv_tcp_close_reset, r);
           } else {
                   isc__nmsocket_shutdown(sock);
           }
   }
   
 void  void
 isc__nmsocket_shutdown(isc_nmsocket_t *sock) {  isc__nmsocket_shutdown(isc_nmsocket_t *sock) {
         REQUIRE(VALID_NMSOCK(sock));          REQUIRE(VALID_NMSOCK(sock));
Line 2664  isc__nmsocket_shutdown(isc_nmsocket_t *s
Line 2735  isc__nmsocket_shutdown(isc_nmsocket_t *s
         case isc_nm_tcpdnslistener:          case isc_nm_tcpdnslistener:
                 return;                  return;
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
 }  }
   
Line 2680  shutdown_walk_cb(uv_handle_t *handle, vo
Line 2750  shutdown_walk_cb(uv_handle_t *handle, vo
   
         switch (handle->type) {          switch (handle->type) {
         case UV_UDP:          case UV_UDP:
                   isc__nmsocket_shutdown(sock);
                   return;
         case UV_TCP:          case UV_TCP:
                 break;                  switch (sock->type) {
                   case isc_nm_tcpsocket:
                   case isc_nm_tcpdnssocket:
                           if (sock->parent == NULL) {
                                   /* Reset the TCP connections on shutdown */
                                   isc__nmsocket_reset(sock);
                                   return;
                           }
                           FALLTHROUGH;
                   default:
                           isc__nmsocket_shutdown(sock);
                   }
   
                   return;
         default:          default:
                 return;                  return;
         }          }
   
         isc__nmsocket_shutdown(sock);  
 }  }
   
 void  void
 isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0) {  isc__nm_async_shutdown(isc__networker_t *worker, isc__netievent_t *ev0) {
         UNUSED(ev0);          UNUSED(ev0);
   
         uv_walk(&worker->loop, shutdown_walk_cb, NULL);          uv_walk(&worker->loop, shutdown_walk_cb, NULL);
 }  }
   
Line 3110  isc_nm_work_offload(isc_nm_t *netmgr, is
Line 3194  isc_nm_work_offload(isc_nm_t *netmgr, is
   
         r = uv_queue_work(&worker->loop, &work->req, isc__nm_work_cb,          r = uv_queue_work(&worker->loop, &work->req, isc__nm_work_cb,
                           isc__nm_after_work_cb);                            isc__nm_after_work_cb);
         RUNTIME_CHECK(r == 0);          UV_RUNTIME_CHECK(uv_queue_work, r);
   }
   
   void
   isc_nm_timer_create(isc_nmhandle_t *handle, isc_nm_timer_cb cb, void *cbarg,
                       isc_nm_timer_t **timerp) {
           isc__networker_t *worker = NULL;
           isc_nmsocket_t *sock = NULL;
           isc_nm_timer_t *timer = NULL;
           int r;
   
           REQUIRE(isc__nm_in_netthread());
           REQUIRE(VALID_NMHANDLE(handle));
           REQUIRE(VALID_NMSOCK(handle->sock));
   
           sock = handle->sock;
           worker = &sock->mgr->workers[isc_nm_tid()];
   
           timer = isc_mem_get(sock->mgr->mctx, sizeof(*timer));
           *timer = (isc_nm_timer_t){ .cb = cb, .cbarg = cbarg };
           isc_refcount_init(&timer->references, 1);
           isc_nmhandle_attach(handle, &timer->handle);
   
           r = uv_timer_init(&worker->loop, &timer->timer);
           UV_RUNTIME_CHECK(uv_timer_init, r);
   
           uv_handle_set_data((uv_handle_t *)&timer->timer, timer);
   
           *timerp = timer;
   }
   
   void
   isc_nm_timer_attach(isc_nm_timer_t *timer, isc_nm_timer_t **timerp) {
           REQUIRE(timer != NULL);
           REQUIRE(timerp != NULL && *timerp == NULL);
   
           isc_refcount_increment(&timer->references);
           *timerp = timer;
   }
   
   static void
   timer_destroy(uv_handle_t *uvhandle) {
           isc_nm_timer_t *timer = uv_handle_get_data(uvhandle);
           isc_nmhandle_t *handle = timer->handle;
           isc_mem_t *mctx = timer->handle->sock->mgr->mctx;
   
           isc_mem_put(mctx, timer, sizeof(*timer));
   
           isc_nmhandle_detach(&handle);
   }
   
   void
   isc_nm_timer_detach(isc_nm_timer_t **timerp) {
           isc_nm_timer_t *timer = NULL;
           isc_nmhandle_t *handle = NULL;
   
           REQUIRE(timerp != NULL && *timerp != NULL);
   
           timer = *timerp;
           *timerp = NULL;
   
           handle = timer->handle;
   
           REQUIRE(isc__nm_in_netthread());
           REQUIRE(VALID_NMHANDLE(handle));
           REQUIRE(VALID_NMSOCK(handle->sock));
   
           if (isc_refcount_decrement(&timer->references) == 1) {
                   int r = uv_timer_stop(&timer->timer);
                   UV_RUNTIME_CHECK(uv_timer_stop, r);
                   uv_close((uv_handle_t *)&timer->timer, timer_destroy);
           }
   }
   
   static void
   timer_cb(uv_timer_t *uvtimer) {
           isc_nm_timer_t *timer = uv_handle_get_data((uv_handle_t *)uvtimer);
   
           REQUIRE(timer->cb != NULL);
   
           timer->cb(timer->cbarg, ISC_R_TIMEDOUT);
   }
   
   void
   isc_nm_timer_start(isc_nm_timer_t *timer, uint64_t timeout) {
           int r = uv_timer_start(&timer->timer, timer_cb, timeout, 0);
           UV_RUNTIME_CHECK(uv_timer_start, r);
   }
   
   void
   isc_nm_timer_stop(isc_nm_timer_t *timer) {
           int r = uv_timer_stop(&timer->timer);
           UV_RUNTIME_CHECK(uv_timer_stop, r);
 }  }
   
 #ifdef NETMGR_TRACE  #ifdef NETMGR_TRACE
Line 3135  nmsocket_type_totext(isc_nmsocket_type t
Line 3311  nmsocket_type_totext(isc_nmsocket_type t
         case isc_nm_tcpdnssocket:          case isc_nm_tcpdnssocket:
                 return ("isc_nm_tcpdnssocket");                  return ("isc_nm_tcpdnssocket");
         default:          default:
                 INSIST(0);                  UNREACHABLE();
                 ISC_UNREACHABLE();  
         }          }
 }  }
   

Legend:
Removed from v.1.1.1.5  
changed lines
  Added in v.1.1.1.6

CVSweb <webmaster@jp.NetBSD.org>