Hi,为确保您的账号安全,请大家修改登陆密码为包含大小写字母、标点、数字的复合密码,不要使用过于简单的登陆密码。
您当前的位置:首页 :: PHP

laravel之PHP WebSocket研发+WS or WSS (websocket SSL) js websocket

时间:2018-04-28 16:59:59  来源:www.upvup.com  作者:jorsh

 网上有很多是讲php websocket的聊天程序最多,我主要也是参考了前辈们的代码. 

项目描述及使用场景 公司决定做文章详情的统计主要统计用户浏览位置及在线时长, 大家知道HTTP是特性,不能准确获得用户在线时长,原来打算用AJAX 1秒请求一次服务器,但结果服务器压力太大,所以就想到了使用HTTP长链接的方式及websocket
定时获得用户在线时长及浏览位置, 减少服务器压力,我主要做收集处理,  入库的接口由另外一位同事写
 
研发的使用就使用WS 不使用SSL的WSS, 因为WS比较好抓包测试, WSS只是外层增加了安全协议. 后面会讲到WSS的配置
 
所在平台laravel PHP WebSocket 我们框架是使用的laravel没有办法, 当然使用swoole和laravel-echo来写服务也是可以的, 但是本文讲的是php socket写法,如果你是大神可以跳过了.
 
既然我们写的个服务,那个就得来实现启动,那么启动管理你自己也可以使用linux shell等来搞,也可以用supervisor来管理, 但是发现部署上线很不方便,那么我们就用laravel的定时任务来启动服务,这样不需要每台服务器单独部署减少运维成本.
 
 
我的第三方库类都放到 App\Knowledge这个目录下的,结合自己的项目来搞吧
服务端一共四个文件, 核心文件就三个,有一个只应用文件
Daemon.php //这是一个守护进程类
WebSockets.php //这个是主要的WebSocket文件了
WSUser.php  //这个每个连接用户的socket或是用户信息类
 
先看WSUser.php
<?php

namespace App\Knowledge;


class WSUser {
    public $socket;
    public $id;
    public $headers = array();
    public $handshake = false;
    public $handlingPartialPacket = false;
    public $partialBuffer = "";
    public $sendingContinuous = false;
    public $partialMessage = "";
    public $hasSentClose = false;
    public $clientFileName ;
    public $serverFileName ;
    public $fileHandler ;
    public $fileSize ;
    public $recLength = 0 ;
    public $cennectTime = 0;
    function __construct($id, $socket) {
        $this->id = $id;
        $this->socket = $socket;
        $this->cennectTime = time();
    }
}
这里就是定义一些变量 主要是id与socket来区别每个连接的用户信息
 
然后来看WebSockets.php
<?php


namespace App\Knowledge;




abstract class Websockets{

    protected $master;
    protected $maxConnect;
    protected $maxBufferSize;
    protected $sockets                              = array();
    protected $users                                = array();
    protected $heldMessages                         = array();
    protected $interactive                          = true;
    protected $headerOriginRequired                 = false;
    protected $headerSecWebSocketProtocolRequired   = false;
    protected $headerSecWebSocketExtensionsRequired = false;


    function __construct($addr, $port, $connect=1000, $bufferLength = 1024) {
        $this->maxConnect = $connect;
        $this->maxBufferSize = $bufferLength * 1024 + 8;
        $this->master = socket_create(AF_INET, SOCK_STREAM, SOL_TCP)  or die("Failed: socket_create()");
        socket_set_option($this->master, SOL_SOCKET, SO_REUSEADDR, 1) or die("Failed: socket_option()");
        socket_bind($this->master, $addr, $port)                      or die("Failed: socket_bind()");
        socket_listen($this->master, $this->maxConnect)               or die("Failed: socket_listen()");
        $this->sockets['m'] = $this->master;
        $this->stdout("Server started\nListening on: $addr:$port\nMaster socket: ".$this->master);
    }
    abstract protected function process($user,$message); // Called immediately when the data is recieved.
    abstract protected function connected($user);        // Called after the handshake response is sent to the client.
    abstract protected function closed($user);           // Called after the connection is closed.
    protected function connecting($user) {
        // Override to handle a connecting user, after the instance of the User is created, but before
        // the handshake has completed.
    }
    protected function send($user, $message) {
        if ($user->handshake) {
            $message = $this->frame($message,$user);
            $result = @socket_write($user->socket, $message, strlen($message));
        }else {
            // User has not yet performed their handshake.  Store for sending later.
            $holdingMessage = array('user' => $user, 'message' => $message);
            $this->heldMessages[] = $holdingMessage;
        }
    }
    protected function sendAll($message, $user=''){
        if(empty($this->users)){
            return false;
        }
        foreach($this->users as $k => $v){
            if(!empty($user) && $user==$v){
                continue;
            }
            $this->send($v, $message);
        }
        return true;
    }

    protected function tick() {
        // Override this for any process that should happen periodically.  Will happen at least once
        // per second, but possibly more often.
    }
    protected function _tick() {
        // Core maintenance processes, such as retrying failed messages.
        foreach ($this->heldMessages as $key => $hm) {
            $found = false;
            foreach ($this->users as $currentUser) {
                if ($hm['user']->socket == $currentUser->socket) {
                    $found = true;
                    if ($currentUser->handshake) {
                        unset($this->heldMessages[$key]);
                        $this->send($currentUser, $hm['message']);
                    }
                }
            }
            if (!$found) {
                // If they're no longer in the list of connected users, drop the message.
                unset($this->heldMessages[$key]);
            }
        }
    }
    /**
     * Main processing loop
     */
    public function run() {
        while(true) {
            if (empty($this->sockets)) {
                $this->sockets['m'] = $this->master;
            }
            $read = $this->sockets;
            $write = $except = null;
            $this->_tick();
            $this->tick();
            @socket_select($read,$write,$except,1);
            foreach ($read as $socket) {
                if ($socket == $this->master) {
                    $client = socket_accept($socket);
                    if ($client < 0) {
                        $this->stderr("Failed: socket_accept()");
                        continue;
                    }
                    $this->connect($client);
                    $this->stdout("Client connected. " . $client);
                    continue;
                }

                $numBytes = @socket_recv($socket, $buffer, $this->maxBufferSize, 0);
                if ($numBytes === false) {
                    $this->_SocketErr($socket);
                    continue;
                }
                if ($numBytes == 0) {
                    $this->disconnect($socket);
                    $this->stderr("Client disconnected. TCP connection lost: " . $socket);
                    continue;
                }
                $user = $this->getUserBySocket($socket);
                if (!$user->handshake) {
                    $tmp = str_replace("\r", '', $buffer);
                    if (strpos($tmp, "\n\n") === false ) {
                        continue; // If the client has not finished sending the header, then wait before sending our upgrade response.
                    }
                    $ws_tcp = strpos($buffer,"Upgrade:websocket") || strpos($buffer,"Sec-WebSocket-Key") ;
                    if($ws_tcp){ //如果客户端使用的是websocket
                        $this->doHandshake($user,$buffer);
                        $user->handshake = 'WS';
                        continue;
                    }
                    //当客户端使用的是socket
                    $user->handshake = "TCP" ;
                    //echo $buffer."\n" ;
                    $this->process($user, $buffer);
                    continue;
                }

                if($user->handshake == "TCP"){//如果客户端是socket发过来的消息
                    if (strpos($buffer, "\n\n") === false ) {
                        continue; // 检查是否成功完全接收客户端是否发送消息.
                    }
                    $this->process($user, $buffer);
                    continue;
                }

                //split packet into frame and send it to deframe
                $this->split_packet($numBytes,$buffer, $user);
            }
        }
    }

    protected function _SocketErr($socket){
        $sockErrNo = socket_last_error($socket);
        switch ($sockErrNo) {
            case 102: // ENETRESET    -- Network dropped connection because of reset
            case 103: // ECONNABORTED -- Software caused connection abort
            case 104: // ECONNRESET   -- Connection reset by peer
            case 108: // ESHUTDOWN    -- Cannot send after transport endpoint shutdown -- probably more of an error on our part, if we're trying to write after the socket is closed.  Probably not a critical error, though.
            case 110: // ETIMEDOUT    -- Connection timed out
            case 111: // ECONNREFUSED -- Connection refused -- We shouldn't see this one, since we're listening... Still not a critical error.
            case 112: // EHOSTDOWN    -- Host is down -- Again, we shouldn't see this, and again, not critical because it's just one connection and we still want to listen to/for others.
            case 113: // EHOSTUNREACH -- No route to host
            case 121: // EREMOTEIO    -- Rempte I/O error -- Their hard drive just blew up.
            case 125: // ECANCELED    -- Operation canceled
                $this->stderr("Unusual disconnect on socket " . $socket);
                $this->disconnect($socket, true, $sockErrNo); // disconnect before clearing error, in case someone with their own implementation wants to check for error conditions on the socket.
                break;
            default:
                $this->stderr('Socket error: ' . socket_strerror($sockErrNo));
        }
    }
    protected function connect($socket) {
        $user = new WSUser(uniqid('u'), $socket);
        $this->users[$user->id] = $user;
        $this->sockets[$user->id] = $socket;
        $this->connecting($user);
    }
    protected function disconnect($socket, $triggerClosed = true, $sockErrNo = null) {
        $disconnectedUser = $this->getUserBySocket($socket);
        if ($disconnectedUser !== null) {
            unset($this->users[$disconnectedUser->id]);
            if (array_key_exists($disconnectedUser->id, $this->sockets)) {
                unset($this->sockets[$disconnectedUser->id]);
            }
            if (!is_null($sockErrNo)) {
                socket_clear_error($socket);
            }
            if ($triggerClosed) {
                $this->stdout("Client disconnected. ".$disconnectedUser->socket);
                $this->closed($disconnectedUser);
                socket_close($disconnectedUser->socket);
            }else {
                $message = $this->frame('', $disconnectedUser, 'close');
                @socket_write($disconnectedUser->socket, $message, strlen($message));
            }
        }
    }
    protected function doHandshake($user, $buffer) {
        $magicGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
        $headers = array();
        $lines = explode("\n",$buffer);
        foreach ($lines as $line) {
            if (strpos($line,":") !== false) {
                $header = explode(":",$line,2);
                $headers[strtolower(trim($header[0]))] = trim($header[1]);
            }elseif (stripos($line,"get ") !== false) {
                preg_match("/GET (.*) HTTP/i", $buffer, $reqResource);
                $headers['get'] = trim($reqResource[1]);
            }
        }
        if (isset($headers['get'])) {
            $user->requestedResource = $headers['get'];
        }else {
            $handshakeResponse = "HTTP/1.1 405 Method Not Allowed\r\n\r\n";
        }
        if (!isset($headers['host']) || !$this->checkHost($headers['host'])) {
            $handshakeResponse = "HTTP/1.1 400 Bad Request";
        }
        if (!isset($headers['upgrade']) || strtolower($headers['upgrade']) != 'websocket') {
            $handshakeResponse = "HTTP/1.1 400 Bad Request";
        }
        if (!isset($headers['connection']) || strpos(strtolower($headers['connection']), 'upgrade') === FALSE) {
            $handshakeResponse = "HTTP/1.1 400 Bad Request";
        }
        if (!isset($headers['sec-websocket-key'])) {
            $handshakeResponse = "HTTP/1.1 400 Bad Request";
        }else {
        }
        if (!isset($headers['sec-websocket-version']) || strtolower($headers['sec-websocket-version']) != 13) {
            $handshakeResponse = "HTTP/1.1 426 Upgrade Required\r\nSec-WebSocketVersion: 13";
        }
        if (($this->headerOriginRequired && !isset($headers['origin']) ) || ($this->headerOriginRequired && !$this->checkOrigin($headers['origin']))) {
            $handshakeResponse = "HTTP/1.1 403 Forbidden";
        }
        if (($this->headerSecWebSocketProtocolRequired && !isset($headers['sec-websocket-protocol'])) || ($this->headerSecWebSocketProtocolRequired && !$this->checkWebsocProtocol($headers['sec-websocket-protocol']))) {
            $handshakeResponse = "HTTP/1.1 400 Bad Request";
        }
        if (($this->headerSecWebSocketExtensionsRequired && !isset($headers['sec-websocket-extensions'])) || ($this->headerSecWebSocketExtensionsRequired && !$this->checkWebsocExtensions($headers['sec-websocket-extensions']))) {
            $handshakeResponse = "HTTP/1.1 400 Bad Request";
        }
        // Done verifying the _required_ headers and optionally required headers.
        if (isset($handshakeResponse)) {
            socket_write($user->socket,$handshakeResponse,strlen($handshakeResponse));
            $this->disconnect($user->socket);
            return;
        }
        $user->headers = $headers;
        $user->handshake = $buffer;
        $webSocketKeyHash = sha1($headers['sec-websocket-key'] . $magicGUID);
        $rawToken = "";
        for ($i = 0; $i < 20; $i++) {
            $rawToken .= chr(hexdec(substr($webSocketKeyHash,$i*2, 2)));
        }
        $handshakeToken = base64_encode($rawToken) . "\r\n";
        $subProtocol = (isset($headers['sec-websocket-protocol'])) ? $this->processProtocol($headers['sec-websocket-protocol']) : "";
        $extensions = (isset($headers['sec-websocket-extensions'])) ? $this->processExtensions($headers['sec-websocket-extensions']) : "";
        $handshakeResponse = "HTTP/1.1 101 Switching Protocols\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nSec-WebSocket-Accept: $handshakeToken$subProtocol$extensions\r\n";
        socket_write($user->socket,$handshakeResponse,strlen($handshakeResponse));
        $this->connected($user);
    }
    protected function checkHost($hostName) {
        return true; // Override and return false if the host is not one that you would expect.
        // Ex: You only want to accept hosts from the my-domain.com domain,
        // but you receive a host from malicious-site.com instead.
    }
    protected function checkOrigin($origin) {
        return true; // Override and return false if the origin is not one that you would expect.
    }
    protected function checkWebsocProtocol($protocol) {
        return true; // Override and return false if a protocol is not found that you would expect.
    }
    protected function checkWebsocExtensions($extensions) {
        return true; // Override and return false if an extension is not found that you would expect.
    }
    protected function processProtocol($protocol) {
        return ""; // return either "Sec-WebSocket-Protocol: SelectedProtocolFromClientList\r\n" or return an empty string.
        // The carriage return/newline combo must appear at the end of a non-empty string, and must not
        // appear at the beginning of the string nor in an otherwise empty string, or it will be considered part of
        // the response body, which will trigger an error in the client as it will not be formatted correctly.
    }
    protected function processExtensions($extensions) {
        return ""; // return either "Sec-WebSocket-Extensions: SelectedExtensions\r\n" or return an empty string.
    }
    protected function getUserBySocket($socket) {
        foreach ($this->users as $user) {
            if ($user->socket == $socket) {
                return $user;
            }
        }
        return null;
    }
    public function stdout($msg) {
        if ($this->interactive) {
            echo "$msg\n";
            //$this->msg = $msg;
        }
    }
    public function stderr($msg) {
        if ($this->interactive) {
            echo "$msg\n";
            //$this->msg = $msg;
        }
    }
    protected function frame($message, $user, $messageType='text', $messageContinues=false) {
        switch ($messageType) {
            case 'continuous':
                $b1 = 0;
                break;
            case 'text':
                $b1 = ($user->sendingContinuous) ? 0 : 1;
                break;
            case 'binary':
                $b1 = ($user->sendingContinuous) ? 0 : 2;
                break;
            case 'close':
                $b1 = 8;
                break;
            case 'ping':
                $b1 = 9;
                break;
            case 'pong':
                $b1 = 10;
                break;
        }
        if ($messageContinues) {
            $user->sendingContinuous = true;
        }else {
            $b1 += 128;
            $user->sendingContinuous = false;
        }
        $length = strlen($message);
        $lengthField = "";
        if ($length < 126) {
            $b2 = $length;
        }elseif ($length < 65536) {
            $b2 = 126;
            $hexLength = dechex($length);
            //$this->stdout("Hex Length: $hexLength");
            if (strlen($hexLength)%2 == 1) {
                $hexLength = '0' . $hexLength;
            }
            $n = strlen($hexLength) - 2;
            for ($i = $n; $i >= 0; $i=$i-2) {
                $lengthField = chr(hexdec(substr($hexLength, $i, 2))) . $lengthField;
            }
            while (strlen($lengthField) < 2) {
                $lengthField = chr(0) . $lengthField;
            }
        }else {
            $b2 = 127;
            $hexLength = dechex($length);
            if (strlen($hexLength)%2 == 1) {
                $hexLength = '0' . $hexLength;
            }
            $n = strlen($hexLength) - 2;
            for ($i = $n; $i >= 0; $i=$i-2) {
                $lengthField = chr(hexdec(substr($hexLength, $i, 2))) . $lengthField;
            }
            while (strlen($lengthField) < 8) {
                $lengthField = chr(0) . $lengthField;
            }
        }
        return chr($b1) . chr($b2) . $lengthField . $message;
    }
    //check packet if he have more than one frame and process each frame individually
    protected function split_packet($length,$packet, $user) {
        //add PartialPacket and calculate the new $length
        if ($user->handlingPartialPacket) {
            $packet = $user->partialBuffer . $packet;
            $user->handlingPartialPacket = false;
            $length=strlen($packet);
        }
        $fullpacket=$packet;
        $frame_pos=0;
        $frame_id=1;
        while($frame_pos<$length) {
            $headers = $this->extractHeaders($packet);
            $headers_size = $this->calcoffset($headers);
            $framesize=$headers['length']+$headers_size;
            //split frame from packet and process it
            $frame=substr($fullpacket,$frame_pos,$framesize);
            if (($message = $this->deframe($frame, $user,$headers)) !== FALSE) {
                if ($user->hasSentClose) {
                    $this->disconnect($user->socket);
                } else {
                    //  if ((preg_match('//u', $message)) || ($headers['opcode']==2)) {
                    //$this->stdout("Text msg encoded UTF-8 or Binary msg\n".$message);
                    $this->process($user, $message);
                    /*} else {
                      $this->stderr("not UTF-8\n");
                    }*/
                }
            }
            //get the new position also modify packet data
            $frame_pos+=$framesize;
            $packet=substr($fullpacket,$frame_pos);
            $frame_id++;
        }
    }
    protected function calcoffset($headers) {
        $offset = 2;
        if ($headers['hasmask']) {
            $offset += 4;
        }
        if ($headers['length'] > 65535) {
            $offset += 8;
        } elseif ($headers['length'] > 125) {
            $offset += 2;
        }
        return $offset;
    }
    protected function deframe($message, &$user) {
        //echo $this->strtohex($message);
        $headers = $this->extractHeaders($message);
        $pongReply = false;
        $willClose = false;
        switch($headers['opcode']) {
            case 0:
            case 1:
            case 2:
                break;
            case 8:
                // todo: close the connection
                $user->hasSentClose = true;
                return "";
            case 9:
                $pongReply = true;
            case 10:
                break;
            default:
                //$this->disconnect($user); // todo: fail connection
                $willClose = true;
                break;
        }
        /* Deal by split_packet() as now deframe() do only one frame at a time.
        if ($user->handlingPartialPacket) {
          $message = $user->partialBuffer . $message;
          $user->handlingPartialPacket = false;
          return $this->deframe($message, $user);
        }
        */
        if ($this->checkRSVBits($headers,$user)) {
            return false;
        }
        if ($willClose) {
            // todo: fail the connection
            return false;
        }
        $payload = $user->partialMessage . $this->extractPayload($message,$headers);
        if ($pongReply) {
            $reply = $this->frame($payload,$user,'pong');
            socket_write($user->socket,$reply,strlen($reply));
            return false;
        }
        if ($headers['length'] > strlen($this->applyMask($headers,$payload))) {
            $user->handlingPartialPacket = true;
            $user->partialBuffer = $message;
            return false;
        }
        $payload = $this->applyMask($headers,$payload);
        if ($headers['fin']) {
            $user->partialMessage = "";
            return $payload;
        }
        $user->partialMessage = $payload;
        return false;
    }
    protected function extractHeaders($message) {
        $header = array('fin'     => $message[0] & chr(128),
            'rsv1'    => $message[0] & chr(64),
            'rsv2'    => $message[0] & chr(32),
            'rsv3'    => $message[0] & chr(16),
            'opcode'  => ord($message[0]) & 15,
            'hasmask' => $message[1] & chr(128),
            'length'  => 0,
            'mask'    => "");
        $header['length'] = (ord($message[1]) >= 128) ? ord($message[1]) - 128 : ord($message[1]);
        if ($header['length'] == 126) {
            if ($header['hasmask']) {
                $header['mask'] = $message[4] . $message[5] . $message[6] . $message[7];
            }
            $header['length'] = ord($message[2]) * 256
                + ord($message[3]);
        }elseif ($header['length'] == 127) {
            if ($header['hasmask']) {
                $header['mask'] = $message[10] . $message[11] . $message[12] . $message[13];
            }
            $header['length'] = ord($message[2]) * 65536 * 65536 * 65536 * 256
                + ord($message[3]) * 65536 * 65536 * 65536
                + ord($message[4]) * 65536 * 65536 * 256
                + ord($message[5]) * 65536 * 65536
                + ord($message[6]) * 65536 * 256
                + ord($message[7]) * 65536
                + ord($message[8]) * 256
                + ord($message[9]);
        }elseif ($header['hasmask']) {
            $header['mask'] = $message[2] . $message[3] . $message[4] . $message[5];
        }
        //echo $this->strtohex($message);
        //$this->printHeaders($header);
        return $header;
    }
    protected function extractPayload($message,$headers) {
        $offset = 2;
        if ($headers['hasmask']) {
            $offset += 4;
        }
        if ($headers['length'] > 65535) {
            $offset += 8;
        }elseif ($headers['length'] > 125) {
            $offset += 2;
        }
        return substr($message,$offset);
    }
    protected function applyMask($headers,$payload) {
        $effectiveMask = "";
        if ($headers['hasmask']) {
            $mask = $headers['mask'];
        }else {
            return $payload;
        }
        while (strlen($effectiveMask) < strlen($payload)) {
            $effectiveMask .= $mask;
        }
        while (strlen($effectiveMask) > strlen($payload)) {
            $effectiveMask = substr($effectiveMask,0,-1);
        }
        return $effectiveMask ^ $payload;
    }
    protected function checkRSVBits($headers,$user) { // override this method if you are using an extension where the RSV bits are used.
        if (ord($headers['rsv1']) + ord($headers['rsv2']) + ord($headers['rsv3']) > 0) {
            //$this->disconnect($user); // todo: fail connection
            return true;
        }
        return false;
    }
    protected function strtohex($str) {
        $strout = "";
        for ($i = 0; $i < strlen($str); $i++) {
            $strout .= (ord($str[$i])<16) ? "0" . dechex(ord($str[$i])) : dechex(ord($str[$i]));
            $strout .= " ";
            if ($i%32 == 7) {
                $strout .= ": ";
            }
            if ($i%32 == 15) {
                $strout .= ": ";
            }
            if ($i%32 == 23) {
                $strout .= ": ";
            }
            if ($i%32 == 31) {
                $strout .= "\n";
            }
        }
        return $strout . "\n";
    }
    protected function printHeaders($headers) {
        //echo "Array\n(\n";
        foreach ($headers as $key => $value) {
            if ($key == 'length' || $key == 'opcode') {
                //echo "\t[$key] => $value\n\n";
            }else {
                //echo "\t[$key] => ".$this->strtohex($value)."\n";
            }
        }
        //echo ")\n";
    }
}
 
这时有详细的注释,自己消化吧,拿来就可以用,不懂百度或谷哥吧
这个socket也可以用来上传文件,但要复写process这个函数 下面会给出上传文件的具体实现
 
然后就是具体应用的实现类ArticleMonitor.php
<?php


namespace App\Knowledge;



class ArticleMonitor extends Websockets
{

    protected function process ($user, $message) {
        $data = json_decode($message, 1);
        if(empty($data) || empty($data['articleId'])){
            $this->disconnect($user->socket);
        }
        if(empty($user->articleId)){
            $this->monitorArticle($data);
            $user->articleId = $data['articleId'];
            $this->users[$user->id] = $user;
            return '';
        }
        $this->viewArticle($user, $data);
        $this->stdout($data['articleId']);
        $this->send($user,'root@www.upvup.com#>success');
    }
    protected function connected ($user) {
    }
    protected function closed ($user) {
    }

    protected function monitorArticle($data){
        $pdata = [
            'event_id' => 'view_article_detail',
            'object_id'=> $data['articleId'],
            'user_id'  => empty($data['userId']) ? 0 : $data['userId'],
            'param1'   => empty($data['columnId']) ? 0 : $data['columnId'],
            'param2'   => 0,
            'bundle_id'=> 'www.upvup.com',
            'uuid'     => empty($data['openid']) ? '' : 'openid:'.$data['openid'],
        ];
        $url = DOMAIN_API.'article/count';
        $r = curl($url, 1, $pdata);
    }
    protected function viewArticle($user, $data){
        $data['pos'] = empty($data['pos']) ? 0 : $data['pos'];
        if($data['pos']>100){
            $data['pos'] = 100;
        }
        $addr = $post = '';
        socket_getpeername($user->socket, $addr, $port);
        $pdata = [
            'event_id' => 'view_article_time',
            'object_id'=> $data['articleId'],
            'user_id'  => empty($data['userId']) ? 0 : $data['userId'],
            'param1'   => $user->id,
            'param2'   => $data['pos'],
            'param3'   => time() - $user->cennectTime,
            'bundle_id'=> 'www.upvup.com',
            'uuid'     => empty($data['openid']) ? '' : 'openid:'.$data['openid'],
            'ip'       => $addr,
        ];
        $url = DOMAIN_API.'article/count';
        $r = curl($url, 1, $pdata);
    }

}
 
这里复写了process, connected, closed 三个函数
主要就是process函数这里是业务实现的细节了
 
 
最就是守护进程类Daemon.php
<?php
/**
 * php 守户进程
 * User: jorsh20180419
 */

namespace App\Knowledge;


class Daemon{
    public function init(){
        //创建一个子进程
        $pid = pcntl_fork();
        if ($pid == -1){
            throw new Exception('fork子进程失败');
        }elseif ($pid > 0){
            //父进程退出,子进程变成孤儿进程被1号进程收养,进程脱离终端
            exit(0); //这里退父里程,保留子进程
        }
        //创建一个新的会话,脱离终端控制,更改子进程为组长进程
        $sid = posix_setsid();
        if ($sid == -1) {
            throw new Exception('setsid fail');
        }
        /**
         * 通过上一步,我们创建了一个新的会话组长,进程组长,且脱离了终端,但是会话组长可以申请重新打开一个终端,为了避免
         * 这种情况,我们再次创建一个子进程,并退出当前进程,这样运行的进程就不再是会话组长。
         */
        $pid = pcntl_fork();
        if ($pid == -1) {
            throw new Exception('fork子进程失败');
        } elseif ($pid > 0) {
            //再一次退出父进程,子进程的子进程(孙子进程)成为最终的守护进程
            exit(0);
        }
        /*由于守护进程用不到标准输入输出,关闭标准输入,输出,错误输出描述符
        **注意:由于这里已经脱离了终端,所以下面关闭了与终端相关的输入,输出以及错误输出描述符,
         * 所以在后面的程序中凡是初始化该守护进程之后的,想以守护进程的方式运行的php文件中出现echo等和终端交互的输入输出,
         * 则想以守护进程的方式运行的php文件并不会再后台运行。切记:后面的代码中一定不能出现echo等。
         *
         */
        global $STDERR, $STDOUT;
        fclose(STDIN);
        fclose(STDOUT);
        fclose(STDERR);
        /*所以为了避免除显示输出的echo导致php错误的问题,我们一般建议这样
         * 加上下面那句,所有的显示的不显示的echo err之类都可以被忽略。也就是说你把echo "kdsld";这句加上也没有问题指到dev/null,
         *把/dev/null看作"黑洞". 它非常等价于一个只写文件. 所有写入它的内容都会永远丢失. 而尝试从它那儿读取内容则什么也读不到. 然而, /dev/null对命令行和脚本都非常的有用.
         */
        $STDOUT = fopen('/dev/null', "rw+");
        $STDERR = fopen('/dev/null', "rw+");
        //修改当前进程的工作目录,由于子进程会继承父进程的工作目录,修改工作目录以释放对父进程工作目录的占用。
        chdir('/');
        umask(0); //清除文件掩码
    }

}
 
这里的注释已经非常详细了,不懂在下面留言吧
 
当这里服务器已经算是完成了, 那么我们可以开始跑了
在跑之前我们看看laravel的artisan命令了,
 
路径如下App\Console\Commands\
在这里我们创建一个WSCommand.php
代码如下:
<?php

namespace App\Console\Commands;


use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;
use App\Knowledge\ArticleMonitor;
use App\Knowledge\Daemon;

class WSCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'ws:action {action}';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'ws webserver command';


    /**
     *  constructor.
     * @param Message $message
     * @param User $user
     * @param RoomJoin $room
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * @return mixed
     */
    public function handle()
    {
        $action = $this->argument('action');
        switch ($action) {
            case 'start':
                $this->start();
                break;
            case 'stop':
                $this->stop();
                break;
            case 'restart':
                $this->restart();
                break;
        }
    }

    /**
     *开启websocket
     */
    private function start()
    {
        if (strtoupper(substr(PHP_OS, 0, 5)) === 'LINUX') {
            $daemon = new daemon() ;
            $daemon -> init();
        }
        $WS = new ArticleMonitor("0.0.0.0","8001");
        try {
            $WS->run();
        }
        catch (Exception $e) {
            $this->info($e->getMessage());
        }
        $this->info('start ok');
    }

    /**
     * 停止websocket
     */
    private function stop(){
    }

    /**
     * 重启
     */
    private function restart(){
    }
}
 
这个也没有啥好说的,看代码吧
 
 
然后是App\Console\Kernel.php里加入
protected $commands = [
	Commands\WSCommand::class,
]

protected function schedule(Schedule $schedule){
	$schedule->command('ws:action start')->everyFiveMinutes(); //每5分钟启动一下代码
}
 
上操作完成后,登录到服务器的laravel项目里 执行命令
php artisan ws:action start
然后看看端口是否打开, 我们上面使用的是8001端口, 根据业务自己改
netstat -an|grep 8001
tcp        0      0 0.0.0.0:8001            0.0.0.0:*               LISTEN
到些服务端已经OK了.
那我们来做WEB端或其他的客户端, 本文是WEB端 H5 WebSocet
<script type="text/javascript">
	var ws,wsmonitor;
	var wsurl = 'ws://你的域名.com:8001';
	//var wsurl = 'wss://你的域名.com/wss'; //这个地些是使用SSL 这里要注意使用一下路径 下面会讲到SSL的配置
	$(function(){
		ws = new WebSocket(wsurl);
        ws.onopen = function() {
        	console.log('websocketed');
            wsmonitor = setInterval(function(){ //这里使用一个循环发送数据,这果发送有户流经浏览的位置
                var pos = 0; //位置这里是一个方法,我给干掉了,写成0了,请根据项目自己来搞吧
                var userId = user.id || 0;
                var columnId = '';
                var openid = '';
                var data ={articleId:0, pos:pos, userId:userId, columnId:columnId, openid:openid};
                ws.send(JSON.stringify(data)); //这里是向服务器发送一个JSON的数据 处理请看ArticleMonitor.php的process函数
            }, 3000);

            //ws.send('hello www.upvup.com');

            console.log('send data ok');
        };
        ws.onmessage = function(res){
        	console.log('recv data:'+res);
        };
        ws.onclose = function(e){
            clearInterval(wsmonitor);
            console.log('websocket closed');
        }
        ws.onerror = function(e){
            clearInterval(wsmonitor);
            console.log('websocket error');
        }
	})

</script>
这里就是主要的js了
 
 
下来讲一下WSS的配置 
那么访问的就是var wsurl = 'wss://你的域名.com/wss'这个地址
我们来配置一下ngnix的反理,让所有访问/wss的链接都转到上面开启服务器的机器上面的端口
 
那么nginx的SSL配置我也贴出来供大家参考 这里的使用了负载均衡
upstream web {
    #ip_hash;
    server web1:80; //这里是另一台服务器的IP地址+端口 我这里省了
    server web2:80;
    server web3:80;
}
upstream wss{
    server web1:8001;
    server web2:8001;
    server web3:8001;
}

server {
    listen 80;
    #listen 443 ssl;
    server_name XXXX.com;
    index index.html index.htm index.php;

	#add_header Access-Control-Allow-Origin http://XXXX.com;
    #这个就是WSS的代理配置了
    location /wss {
        proxy_pass http://wss;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
        proxy_set_header X-NginX-Proxy true;
    }
    # 正常站点的配置 WEB1、WEB2
    location / {
        proxy_pass http://web;  #使用后端的web服务器
        proxy_redirect off;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto https;
        #下面的参数根据自己的服务器硬件来改吧
        client_max_body_size 10m;
        client_body_buffer_size 128k;
        proxy_connect_timeout 90;
        proxy_send_timeout 90;
        proxy_read_timeout 90;
        proxy_buffer_size 4k;
        proxy_buffers 4 32k;
        proxy_busy_buffers_size 64k;
        proxy_temp_file_write_size 64k;
    }
    #access_log /var/log/nginx/news-access.log
}

server {
    listen 443 ssl;
    server_name news.caijingmobile.com;
    index index.html index.htm index.php;
	#add_header Access-Control-Allow-Origin https://XXXX.com;    

	ssl on;
	ssl_certificate /etc/tmp..../fullchain.pem; #这里就是SSL 域名证书的地址自己改吧
	ssl_certificate_key /etc/tmp.../privkey.pem;
	#ssl_trusted_certificate /etc/tmp..../chain.pem;
	ssl_session_timeout 5m;
	ssl_protocols TLSv1 TLSv1.1 TLSv1.2 ;
	#ssl_ciphers AESGCM:ALL:!DH:!EXPORT:!RC4:+HIGH:!MEDIUM:!LOW:!aNULL:!eNULL;
	#ssl_ciphers "HIGH:!aNULL:!MD5 or HIGH:!aNULL:!MD5:!3DES";
	ssl_ciphers "EECDH+CHACHA20:EECDH+CHACHA20-draft:EECDH+AES128:RSA+AES128:EECDH+AES256:RSA+AES256:EECDH+3DES:RSA+3DES:!MD5";
	ssl_prefer_server_ciphers on;
	add_header Strict-Transport-Security max-age=15768000;
	# web add
	#add_header Content-Security-Policy upgrade-insecure-requests;
    #这个就是WSS的代理配置了, 同上面一样
    location /wss {
        proxy_pass http://wss;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header Host $http_host;
        proxy_set_header X-NginX-Proxy true;
    }
    # 通过均衡负载服务器指向WEB1、WEB2
    location / {
        proxy_pass http://web;  #使用后端的web服务器
        proxy_redirect off;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto https;
        client_max_body_size 10m;
        client_body_buffer_size 128k;
        proxy_connect_timeout 90;
        proxy_send_timeout 90;
        proxy_read_timeout 90;
        proxy_buffer_size 4k;
        proxy_buffers 4 32k;
        proxy_busy_buffers_size 64k;
        proxy_temp_file_write_size 64k;
    }
    #access_log /var/log/nginx/news-access.log
}
 
这样就完了,可以在浏览器里对上面的代码进行调试,
 
 
           game over
 
 
 
 
 
 
 
举报
收藏0次 / 评论0
评论(0)
还可以输入 2000 个字符
还可以输入 2000 个字符
取消回复
举报×

还可以输入 264 字符

收藏(0)×