Linux操作系统——管道(二) 进程池

news/2024/7/8 4:34:52 标签: linux, 运维, 服务器

概念层面理解进程池

比如说我们一开始有一个父进程,分别创建5个管道,5个子进程,这5个子进程都向管道里面进行读取,而我们对应的父进程,因为我们前面谈过管道的4种情况里面,有一个种情况是,正常情况下,如果管道没有数据了,读端必须等待,直到有数据为止(写端写入数据了),也就是说父进程只需要向某一个管道进行写入数据,对应的子进程就会被唤醒然后去读取对应的数据,其实管道里面写入的数据除了字符串还可以是整形,因为管道是面向字节流的,当子进程拿到父进程写入的int数据,就可以根据数据的值做不同的工作了。所以我们把父进程叫做主进程,一个一个的子进程就是相应的工作进程。所以我们规定,通信双方按4字节来读取的话,当子进程还没有唤醒的时候也可以继续给管道里面进行写入数据,然后一旦子进程醒来就读取管道中的数据,我们把管道当作队列来用,这样的话就可以按照写入的顺序来进行读取了。

我们都知道有很多资源创建的时候都要有成本的,目前来说这些成本对于计算机而言无外乎就是空间资源(内存),时间资源,比如说创建进程是需要花费系统的时间和空间的,创建我们的未来的线程都是在系统层面通过系统调用去创建的,可是呢在我们的系统当中有一些任务需要去处理,如果我们是在要去做任务的时候再去创建,那么就会比较耽误时间,如果我们提前把进程创建好,当有任务到来的时候呢,我们直接把任务派发给已经创建好的进程,然后让已经创建好的进程帮我们去完成相应的任务,这样就能够省略了创建进程需要等待的时间,所以我们把提前创建好的这些进程,后续帮我们完成任务的这些进程呢就叫做进程池。说到这里呢,其实我们在之前学的c/c++用到的new ,malloc这种申请内存的关键字是需要系统做很多工作的,那么关于申请内存这方面,我们是一次性100MB内存好还是申请10次10MB的内存好呢?其实当我们使用new和malloc这样申请内存的关键字时,是需要进行系统调用的来申请内存,进而把内存交给我们,而系统调用是对

相当于是什么呢,如果我们只申请一次100MB,那么就是只进行一次系统调用,申请10次10MB就是要进行10次系统调用,那么其实就是说你是希望操作系统帮你干事情干一次还是干十次,虽然我们没有去对其进行验证,但是很明显,申请一次100MB效率会更高,因为这种情况和操作系统交互调用系统调用的次数是比较小的,其实就是想说一个朴素的道理就是,调用系统调用也是有成本的。其实生活当中也有大量的池化技术,只不过这里是把池化技术搬到了我们的计算机当中。

下面我们再用代码来进行实现:

代码层面实现进程池

Makefile代码:

processpool:ProcessPool.cc
	g++ -o $@ $^ -std=c++11
.PHONY:clean
clean:
	rm -f processpool

ProcessPool.cc代码:

#include <iostream>
#include <string>
#include <vector>
#include <cassert>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "Task.hpp"

const int num = 5;
static int number = 1;

class channel
{
public:
    channel(int fd, pid_t id) : ctrlfd(fd), workerid(id)
    {
        name = "channel-" + std::to_string(number++);
    }

public:
    int ctrlfd;
    pid_t workerid;
    std::string name;
};

void Work()
{
    while (true)
    {
        int code = 0;
        ssize_t n = read(0, &code, sizeof(code));
        if (n == sizeof(code))
        {
            if (!init.CheckSafe(code))
                continue;
            init.RunTask(code);
        }
        else if (n == 0)
        {
            break;
        }
        else
        {
            // do nothing
        }
    }

    std::cout << "child quit" << std::endl;
}

void PrintFd(const std::vector<int> &fds)
{
    std::cout << getpid() << " close fds: ";
    for(auto fd : fds)
    {
        std::cout << fd << " ";
    }
    std::cout << std::endl;
}

// 传参形式:
// 1. 输入参数:const &
// 2. 输出参数:*
// 3. 输入输出参数:&
void CreateChannels(std::vector<channel> *c)
{
    // bug
    std::vector<int> old;
    for (int i = 0; i < num; i++)
    {
        // 1. 定义并创建管道
        int pipefd[2];
        int n = pipe(pipefd);
        assert(n == 0);
        (void)n;

        // 2. 创建进程
        pid_t id = fork();
        assert(id != -1);

        // 3. 构建单向通信信道
        if (id == 0) // child
        {
            if(!old.empty())
            {
                for(auto fd : old)
                {
                    close(fd);
                }
                PrintFd(old);
            }
            close(pipefd[1]);
            dup2(pipefd[0], 0);
            Work();
            exit(0); // 会自动关闭自己打开的所有的fd
        }

        // father
        close(pipefd[0]);
        c->push_back(channel(pipefd[1], id));
        old.push_back(pipefd[1]);
        // childid, pipefd[1]
    }
}

void PrintDebug(const std::vector<channel> &c)
{
    for (const auto &channel : c)
    {
        std::cout << channel.name << ", " << channel.ctrlfd << ", " << channel.workerid << std::endl;
    }
}

void SendCommand(const std::vector<channel> &c, bool flag, int num = -1)
{
    int pos = 0;
    while (true)
    {
        // 1. 选择任务
        int command = init.SelectTask();

        // 2. 选择信道(进程)
        const auto &channel = c[pos++];
        pos %= c.size();

        // debug
        std::cout << "send command " << init.ToDesc(command) << "[" << command << "]"
                  << " in "
                  << channel.name << " worker is : " << channel.workerid << std::endl;

        // 3. 发送任务
        write(channel.ctrlfd, &command, sizeof(command));

        // 4. 判断是否要退出
        if (!flag)
        {
            num--;
            if (num <= 0)
                break;
        }
        sleep(1);
    }

    std::cout << "SendCommand done..." << std::endl;
}
void ReleaseChannels(std::vector<channel> c)
{
    // version 2
    // int num = c.size() - 1;

    // for (; num >= 0; num--)
    // {
    //     close(c[num].ctrlfd);
    //     waitpid(c[num].workerid, nullptr, 0);
    // }

    // version 1
    for (const auto &channel : c)
    {
        close(channel.ctrlfd);
        waitpid(channel.workerid, nullptr, 0);
    }
    // for (const auto &channel : c)
    // {
    //     pid_t rid = waitpid(channel.workerid, nullptr, 0);
    //     if (rid == channel.workerid)
    //     {
    //         std::cout << "wait child: " << channel.workerid << " success" << std::endl;
    //     }
    // }
}
int main()
{
    std::vector<channel> channels;
    // 1. 创建信道,创建进程
    CreateChannels(&channels);

    // 2. 开始发送任务
    const bool g_always_loop = true;
    // SendCommand(channels, g_always_loop);
    SendCommand(channels, !g_always_loop, 10);

    // 3. 回收资源,想让子进程退出,并且释放管道,只要关闭写端
    ReleaseChannels(channels);

    return 0;
}

Task.hpp代码:

#pragma once

#include<iostream>
#include<functional>
#include<ctime>
#include<unistd.h>
#include<vector>
using task_t = std::function<void()>;
//typedef std::function<void()> task_t;
void Download()
{
    std::cout<<"我是一个下载任务"<<" 处理者 : "<<getpid()<<std::endl;
}

void PrintLog()
{
    std::cout<<"我是一个打印日志的任务"<<" 处理者 : "<<getpid()<<std::endl;
}

void PushVideoStream()
{
    std::cout<<"这是一个推送视频流的任务"<<" 处理者 : "<<getpid()<<std::endl;
}


class Init
{
public:

    //任务码
    const static int g_download_code = 1;
    const static int g_printlog_code = 2;
    const static int g_push_videostream_code = 3;

         //任务集合
    std::vector<task_t> tasks;
public:
    Init()
    {
        tasks.push_back(Download);
        tasks.push_back(PrintLog);
        tasks.push_back(PushVideoStream);
    }

    bool CheckSafe(int code)
    {
        if(code>=0&&code<tasks.size()) return true;
        else return false;
    }

    void RunTask(int code)
    {
        return tasks[code]();
    }

    int  SelectTask()
    {
        return rand()% tasks.size();
    }

    std::string ToDesc(int code)
    {
        switch(code)
        {
            case g_download_code:
            return "Download";
            case g_printlog_code:
            return "PrintLog";
            case g_push_videostream_code:
            return "PushVideoStream";
            default:
            return "Uknow";
        }
    }
};

Init init; //定义对象




运行结果:


http://www.niftyadmin.cn/n/5360646.html

相关文章

【Android】高仿京东三级类型列表Demo

本demo基于二级分类双列表联动Demo进行了改进&#xff0c;高仿实现了京东的三级类型列表。 京东的如图&#xff1a; 本demo的&#xff1a; 改进之处 实现了三级列表联动&#xff0c;二三级列表之间的滑动监听优化了一下&#xff0c;将二级类型选中交予自身的点击事件&#…

labview高低拼接的理解

1.高低拼接的原因 计算机的一个字节由8位组成,无符号的8位整形表示的范围为0~255,范围太小了,所以用双字节,即16位整形表示更大范围的数字0 ~65535 2.labview中的高低拼接实质是高8位字节左移8位与低8位字节相加,结果是双字节整形 3.类似的C语言中的表达为: (U8)DataO…

cocoapi的 rle 解码使用

cocoapi 是目前mask数据记录的一个比较好的解决方案, 网上有很多相关的使用资料.如COCO数据集介绍, 这里感谢大家的贡献. cocoapi 数据本身是一组json描述结构, cocoapi 项目是解析和生成json数据结构, json有个不能嵌套的弊端, 所以一个json文件往往比较大. cocoapi核心部分…

学习Android的第一天

目录 什么是 Android&#xff1f; Android 官网 Android 应用程序 Android 开发环境搭建 Android 平台架构 Android 应用程序组件 附件组件 Android 第一个程序 HelloWorld 什么是 Android&#xff1f; Android&#xff08;发音为[ˈnˌdrɔɪd]&#xff0c;非官方中文…

Java开发工具 IntelliJ IDEA 2023中文

IntelliJ IDEA 2023是一款强大的集成开发环境&#xff08;IDE&#xff09;&#xff0c;适用于多种编程语言&#xff0c;包括Java、Python、Kotlin等。它提供了许多特色功能&#xff0c;以提高开发效率和代码质量。 Java开发工具 IntelliJ IDEA 2023中文 以下是一些IntelliJ ID…

Tomcat在Java web的应用

Tomcat在Java web的应用 本来这篇博客顺应之前的内容&#xff0c;应该是需要写Tomcat的简介、基本使用、配置和部署项目、Web的项目结构、创建MavenWeb、idea本地集成以及Tomcat的Maven插件的笔记内容&#xff0c;但是总觉得没必要&#xff0c;因为这些内容网上肯定很多了&…

Emmet常用语法总结

Emmet常用语法总结 子元素&#xff1a;>兄弟元素&#xff1a;上级元素&#xff1a;^倍数&#xff1a;*分组&#xff1a;&#xff08;&#xff09;属性&#xff1a;[]id和类&#xff1a;# .迭代数字&#xff1a;$文本内容&#xff1a;{}注意事项 Emmet是许多流行文本编辑器的…

FreeRTOS 使用 二进制信号量进行任务同步

使用 FreeRTOS 的二进制信号量进行任务同步 在嵌入式系统中&#xff0c;任务之间的同步是一项重要的任务&#xff0c;特别是在多任务环境下。FreeRTOS 是一个流行的实时操作系统内核&#xff0c;它提供了许多同步机制&#xff0c;其中包括二进制信号量。本文将介绍二进制信号量…