提问者:小点点

c语言中多线程服务器的设计


当尝试在linux上实现具有并发支持的简单回显服务器时。

使用以下方法:

  • 使用pthread函数创建线程池,并将其保存在链表中。它在进程启动时创建,在进程终止时销毁

这个程序现在似乎起作用了。

问题是:

  • 是否适合在中间使用消息队列,它是否足够有效

共2个答案

匿名用户

这对我来说似乎太复杂了。多线程服务器的常用方法是:

  • 在线程进程中创建侦听套接字
  • 在线程中接受客户端连接
  • 对于每个接受的客户端连接,创建一个新线程,该线程接收相应的文件描述符并执行工作
  • 工作线程在完全处理客户端连接时关闭客户端连接

我看不到在这里预填充线程池有什么好处。

如果确实需要线程池:

我只会使用一个链接列表来接受连接,并使用pthread\u互斥体来同步对它的访问:

  • 侦听器进程将客户端FD排在列表末尾
  • 客户排在最前面

如果队列为空,线程可以等待变量(pthread\u cond\u wait),并在连接可用时由侦听器进程(pthread\u cond\u signal)通知。

另一种选择

根据处理请求的复杂性,可以选择将服务器设为单线程,即在一个线程中处理所有连接。这完全消除了上下文切换,因此性能非常好。

一个缺点是,只使用一个CPU内核。为了改进这一点,可以使用混合模型:

  • 每个核心创建一个工作线程

然而,您必须实施各种机制,以便在工人之间公平分配工作。

匿名用户

除了使用pthread_mutex,您还需要使用pthread_cond_t(p线程条件),这将允许您在线程池中的线程实际上没有工作时将它们置于睡眠状态。否则,如果它们坐在那里循环检查工作队列中的某些内容,您将浪费计算周期。

我肯定会考虑使用C而不仅仅是纯C。我建议使用C的原因是,在C中,您可以使用模板。使用纯虚拟基类(让我们称之为:“vtask”),可以创建模板化的派生类,这些派生类接受参数,并在调用重载运算符()时插入参数,从而在任务中实现更多功能:

//============================================================================//

void* thread_pool::execute_thread()
{
    vtask* task = NULL;
    while(true)
    {
        //--------------------------------------------------------------------//
        // Try to pick a task
        m_task_lock.lock();
        //--------------------------------------------------------------------//

        // We need to put condition.wait() in a loop for two reasons:
        // 1. There can be spurious wake-ups (due to signal/ENITR)
        // 2. When mutex is released for waiting, another thread can be waken up
        //    from a signal/broadcast and that thread can mess up the condition.
        //    So when the current thread wakes up the condition may no longer be
        //    actually true!
        while ((m_pool_state != state::STOPPED) && (m_main_tasks.empty()))
        {
            // Wait until there is a task in the queue
            // Unlock mutex while wait, then lock it back when signaled
            m_task_cond.wait(m_task_lock.base_mutex_ptr());
        }

        // If the thread was waked to notify process shutdown, return from here
        if (m_pool_state == state::STOPPED)
        {
            //m_has_exited.
            m_task_lock.unlock();
            //----------------------------------------------------------------//
            if(mad::details::allocator_list_tl::get_allocator_list_if_exists() &&
               tids.find(CORETHREADSELF()) != tids.end())
                mad::details::allocator_list_tl::get_allocator_list()
                        ->Destroy(tids.find(CORETHREADSELF())->second, 1);
            //----------------------------------------------------------------//

            CORETHREADEXIT(NULL);
        }

        task = m_main_tasks.front();
        m_main_tasks.pop_front();
        //--------------------------------------------------------------------//
        //run(task);
        // Unlock
        m_task_lock.unlock();
        //--------------------------------------------------------------------//

        // execute the task
        run(task);

        m_task_count -= 1;
        m_join_lock.lock();
        m_join_cond.signal();
        m_join_lock.unlock();

        //--------------------------------------------------------------------//
    }
    return NULL;
}

//============================================================================//

int thread_pool::add_task(vtask* task)
{
#ifndef ENABLE_THREADING
    run(task);
    return 0;
#endif

    if(!is_alive_flag)
    {
        run(task);
        return 0;
    }

    // do outside of lock because is thread-safe and needs to be updated as
    // soon as possible
    m_task_count += 1;

    m_task_lock.lock();

    // if the thread pool hasn't been initialize, initialize it
    if(m_pool_state == state::NONINIT)
        initialize_threadpool();

    // TODO: put a limit on how many tasks can be added at most
    m_main_tasks.push_back(task);

    // wake up one thread that is waiting for a task to be available
    m_task_cond.signal();

    m_task_lock.unlock();

    return 0;
}

//============================================================================//

void thread_pool::run(vtask*& task)
{
    (*task)();

    if(task->force_delete())
    {
        delete task;
        task = 0;
    } else {
        if(task->get() && !task->is_stored_elsewhere())
            save_task(task);
        else if(!task->is_stored_elsewhere())
        {
            delete task;
            task = 0;
        }
    }
}

在上面,每个创建的线程都会运行execute\u thread(),直到m\u pool\u state设置为state::STOPPED。锁定m\u task\u锁,如果状态未停止且列表为空,则将m\u task\u锁传递给您的条件,从而使线程进入睡眠状态并释放锁。创建任务(未显示),添加任务(顺便说一下,m\u task\u count是一个原子,这就是为什么它是线程安全的)。在add任务期间,该条件被通知唤醒一个线程,从该线程开始执行m\u task\u cond。获取并锁定m\u task\u lock后,执行线程()的wait(m\u task\u lock.base\u mutex\u ptr())部分。

注意:这是一个高度定制的实现,它将大多数pthread函数/对象封装到C类中,因此复制和粘贴无论如何都不起作用。。。很抱歉和w.r.t.线程池::run(),除非您担心返回值,否则(*task)()行就是您所需要的。

我希望这能有所帮助。

编辑:m\u join\u*引用用于检查是否已完成所有任务。主线程处于类似的条件等待中,检查所有任务是否已完成,因为这对于我在中使用此实现的应用程序来说是必要的,然后再继续。