swoole4+php7实现mysql高并发连接池

swoole4+php7实现mysql高并发连接池

先上源码:

php+swoole+pdo连接池类:

文件名:mysql.php

<?php
/**
 * Created by PhpStorm.
 * User: Administrator
 * Date: 2019/4/26
 * Time: 16:53
 */

class mysql
{

    protected $maxConnection = 50;
    protected $minConnection = 3;
    protected $currentConnection = 0;

    protected $timeout = 5; //pop超时时间
    protected $idletime = 10; //连接空闲时间

    protected static $instance = null;
    protected $pool;
    protected $channel;

    /**
     * @return mysql|null
     * 单例模式
     */
    public static function get_instance(){
        if (is_null(self::$instance)){
            self::$instance = new self();
        }
        return self::$instance;
    }

    /**
     * mysql constructor.
     * 初始化数据
     */
    private function __construct(){
        $this->channel = new \swoole\Coroutine\Channel($this->maxConnection);
        for ( $i=0; $i < $this->minConnection; $i++ ){
            if ( ($conn = $this->createConnection()) instanceof PDO ){
                $obj = ["last_used_time"=>time(), 'conn'=>$conn];
                $this->channel->push($obj);
                //$this->currentConnection++;
            }
        }

        $this->Gc();//定时回收连接
    }

    /**
     * @return bool|PDO
     * 创建连接
     */
    private function createConnection(){
        try{
            $this->currentConnection++;
            $conn = new PDO("mysql:host=127.0.0.1:3307;dbname=test","root","123456");
            $conn->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
            return $conn;
        }catch (\PDOException $e){
            $this->currentConnection--;
            //var_dump("PDO连接错误:",$e->getMessage());
        }
        return false;
    }

    /**
     * @param $sql
     * SQL查询
     */
    public function query(string $sql){
        if(empty($sql)){
            return false;
        }

        $retry = 3;
        $errorCode = [2006,2013];
        if( ($conn = $this->getConnection()) instanceof PDO ) {
            RETRY:
            try {
                $result = $conn->query($sql);
            } catch (\PDOException $e) {
                $errorInfo = $conn->errorInfo();
                //var_dump("错误捕获:", $errorInfo);


                if( in_array($errorInfo[1], $errorCode) ){

                    $this->currentConnection--;
                    if(!$this->channel->isEmpty()){
                        $obj = $this->channel->pop();
                        $conn = $obj["conn"];
                        goto RETRY;
                    }

                    while ($retry--){var_dump("重试倒数:".$retry);
                        sleep(5);
                        $conn = $this->createConnection();
                        if($conn instanceof PDO){
                            goto RETRY;
                        }
                    }
                }

                return false;
            }

            //归还连接
            $this->recycConnection($conn);
            return $result;
        }else{
            return "请求无连接";
            //return false;
        }

    }

    /**
     * 获取连接
     */
    public function getConnection(){
        //var_dump("获取了一个连接");
        //var_dump("2222222:",$this->currentConnection , $this->maxConnection);

        if(!$this->channel->isEmpty()){
            $obj = $this->channel->pop();
            return $obj["conn"];
        }else{
            //当前连接没有了要添加
            if( $this->currentConnection < $this->maxConnection ){
                return $this->createConnection();
                /*if ( ($conn = $this->createConnection()) instanceof PDO ){
                    $this->currentConnection++;
                    //var_dump("创建新连接");
                    return $conn;
                }else{
                    return false;
                }*/
            }else{
                //如果已是最大连接数,那么等待 其它连接释放  如果超时就抛出异常
                if( $obj = $this->channel->pop($this->timeout) ){
                    return $obj["conn"];
                }else{
                    throw new Exception("超时连接");
                }
            }
        }

    }

    /**
     * @param $conn
     * 归还连接
     */
    public function recycConnection(PDO $conn){
        $this->channel->push(["last_used_time"=>time(),'conn'=>$conn]);
        //var_dump("当前连接为:".$this->currentConnection);
    }


    /**
     * 定时回收超过空闲时间的连接
     */
    public function Gc(){
        swoole_timer_tick(5000, function (){
            $newPool = [];
            while (true){
                //连接池不为空,当前连接数大于最小连接数
                if( !$this->channel->isEmpty() && ($this->channel->length()>$this->minConnection) ){
                    $obj = $this->channel->pop(0.001);//快速取出一个连接
                   //var_dump($obj);
                    if(!$obj){
                        continue;
                    }
                    //当前时间-最后一次使用时间>空闲时间
                    if( (time()-$obj["last_used_time"]) > $this->idletime ){
                        $this->currentConnection--;
                        echo "回收连接.".PHP_EOL;
                    }else{
                        array_push($newPool, $obj);
                    }
                }else{
                    break;
                }
            }

            //重回连接池
            foreach ($newPool as $value){
                $this->channel->push($value);
            }

            var_dump(posix_getpid()."当前连接:(l/c)".$this->channel->length()."-----".$this->currentConnection);
        });
    }


}


猜您喜欢
    0条评论