@@ -96,15 +96,13 @@ public function __construct() {
9696 public function onStart ($ serv )
9797 {
9898 for ($ i = 0 ; $ i < $ this ->pool_size ; $ i ++) {
99- $ db = new mysqli ;
100- $ db ->connect ($ this ->config ['host ' ],$ this ->config ['user ' ],$ this ->config ['pwd ' ],$ this ->config ['name ' ]);
99+ $ db = new swoole_mysql ;
100+ $ db ->connect (array ( ' host ' => $ this ->config ['host ' ],' user ' => $ this ->config ['user ' ],' password ' => $ this ->config ['pwd ' ],' database ' => $ this ->config ['name ' ]), array (& $ this , ' onSQLReady ' ) );
101101 //设置数据库编码
102- $ db ->query ("SET NAMES ' " .$ this ->config ['charset ' ]."' " );
103- $ db_sock = swoole_get_mysqli_sock ($ db );
104- swoole_event_add ($ db_sock , array (&$ this , 'onSQLReady ' ));
105- $ this ->idle_pool [] = array (
106- 'mysqli ' => $ db ,
107- 'db_sock ' => $ db_sock ,
102+ $ db ->query ("SET NAMES ' " .$ this ->config ['charset ' ]."' " ,array (&$ this , 'doQuery ' ));
103+ $ this ->idle_pool [$ i ] = array (
104+ 'db ' => $ db ,
105+ 'sock ' =>$ i ,
108106 'fd ' => 0 ,
109107 );
110108 }
@@ -117,9 +115,21 @@ public function onpipeMessage($serv, $src_worker_id, $data)
117115 //$this->idle_pool=json_decode($data,true);
118116
119117 }
120- public function onSQLReady ($ db_sock )
118+ public function onSQLReady ($ db , $ result )
121119 {
122- $ db_res = $ this ->busy_pool [$ db_sock ];
120+ if ($ result ){
121+ foreach ($ this ->idle_pool as $ k => $ v ) {
122+ if ($ v ['db ' ]==$ db ){
123+ array_unshift ($ this ->wait_queue ,$ db );
124+ $ this ->busy_pool [$ k ]=array (
125+ 'db ' => $ db ,
126+ 'sock ' =>$ k ,
127+ 'fd ' => 0 ,
128+ );
129+ }
130+ }
131+ }
132+ /*$db_res = $this->busy_pool[$db_sock];
123133 $mysqli = $db_res['mysqli'];
124134 $fd = $db_res['fd'];
125135 $data_select=array('status' =>'ok','error'=>0,'errormsg'=>'','result'=>'');
@@ -151,25 +161,25 @@ public function onSQLReady($db_sock)
151161 $req = array_shift($this->wait_queue);
152162 $this->doQuery($req['fd'], $req['sql']);
153163 }
154- }
164+ }*/
155165 }
156166
157167 public function onReceive ($ serv , $ fd , $ from_id , $ data )
158168 {
159169 if ($ this ->isasync ){
160- if (count ($ this ->idle_pool ) == 0 ) {
170+ // if (count($this->idle_pool) == 0) {
161171 //等待队列未满
162- if (count ($ this ->wait_queue ) < $ this ->wait_queue_max ) {
163- $ this ->wait_queue [] = array (
164- 'fd ' => $ fd ,
165- 'sql ' => $ data ,
166- );
167- } else {
168- $ this ->http ->send ($ fd , "request too many, Please try again later. " );
169- }
170- } else {
171- $ this ->doQuery ($ fd , $ data );
172- }
172+ // if (count($this->wait_queue) < $this->wait_queue_max) {
173+ // $this->wait_queue[] = array(
174+ // 'fd' => $fd,
175+ // 'sql' => $data,
176+ // );
177+ // } else {
178+ // $this->http->send($fd, "request too many, Please try again later.");
179+ // }
180+ // } else {
181+ $ this ->dosql ($ fd ,$ data );
182+ // }
173183 }else {
174184 if ($ this ->multiprocess ){
175185 $ result = $ this ->http ->task ($ data );
@@ -193,6 +203,26 @@ public function onReceive($serv, $fd, $from_id, $data)
193203 }
194204 }
195205
206+ //连接池策略
207+ public function dosql ($ fd ,$ data ){
208+ if (count ($ this ->wait_queue ) > $ this ->wait_queue_max ) {
209+ $ this ->http ->send ($ fd , "request too many, Please try again later. " );
210+ }else {
211+ if (count ($ this ->wait_queue ) > 0 ) {
212+ $ db =array_shift ($ this ->wait_queue );
213+ $ httpser =$ this ->http ;
214+ $ db ->query ($ data ,function ($ link ,$ result ) use ($ httpser ,$ fd ){
215+ if ($ result ){
216+ $ httpser ->send ($ fd ,json_encode ($ result ));
217+ array_unshift ($ this ->wait_queue ,$ db );
218+ }
219+ });
220+
221+ }
222+ }
223+
224+ }
225+
196226 public function onTask ($ serv , $ task_id , $ from_id , $ sql )
197227 {
198228 if (!self ::$ link ) {
@@ -225,28 +255,17 @@ public function onTask($serv, $task_id, $from_id, $sql)
225255 return $ data ;
226256 }
227257
228- public function doQuery ($ fd , $ sql )
258+ public function doQuery ($ link , $ result )
229259 {
260+ if ($ result ){
230261 //从空闲池中移除
231- $ db = array_pop ($ this ->idle_pool );
232- /**
233- * @var mysqli
234- */
235- $ mysqli = $ db ['mysqli ' ];
236- for ($ i = 0 ; $ i < 2 ; $ i ++) {
237- $ result = $ mysqli ->query ($ sql , MYSQLI_ASYNC );
238- if ($ result === false ) {
239- if ($ mysqli ->errno == 2013 or $ mysqli ->errno == 2006 ) {
240- $ mysqli ->close ();
241- $ r = $ mysqli ->connect ();
242- if ($ r === true ) continue ;
243- }
262+ foreach ($ this ->idle_pool as $ k => $ v ) {
263+ if ($ link ==$ v ['db ' ]){
264+ unset($ this ->busy_pool [$ k ]);
244265 }
245- break ;
246266 }
247- $ db ['fd ' ] = $ fd ;
248- //加入工作池中
249- $ this ->busy_pool [$ db ['db_sock ' ]] = $ db ;
267+ return $ result ;
268+ }
250269 }
251270 public function onFinish ($ serv , $ data )
252271 {
0 commit comments