@@ -74,6 +74,13 @@ class RequestHandler
7474 */
7575 private $ slave ;
7676
77+ /**
78+ * Contains the content from 'X-PPM-Restart'
79+ *
80+ * @var string
81+ */
82+ private $ restartMode ;
83+
7784 private $ redirectionTries = 0 ;
7885 private $ incomingBuffer = '' ;
7986 private $ lastOutgoingData = '' ; // Used to track abnormal responses
@@ -171,12 +178,12 @@ public function getNextSlave()
171178 private function createErrorResponse ($ code , $ text )
172179 {
173180 return \sprintf (
174- 'HTTP/1.1 %s ' . "\n" .
175- 'Date: %s ' . "\n" .
176- 'Content-Type: text/plain ' . "\n" .
177- 'Content-Length: %s ' . "\n" .
178- "\n" .
179- '%s ' ,
181+ 'HTTP/1.1 %s ' . "\n" .
182+ 'Date: %s ' . "\n" .
183+ 'Content-Type: text/plain ' . "\n" .
184+ 'Content-Length: %s ' . "\n" .
185+ "\n" .
186+ '%s ' ,
180187 $ code ,
181188 \gmdate ('D, d M Y H:i:s T ' ),
182189 \strlen ($ text ),
@@ -246,10 +253,22 @@ public function slaveConnected(ConnectionInterface $connection)
246253 // keep track of the last sent data to detect if slave exited abnormally
247254 $ this ->connection ->on ('data ' , function ($ data ) {
248255 $ this ->lastOutgoingData = $ data ;
249- });
250256
251- // relay data to client
252- $ this ->connection ->pipe ($ this ->incoming , ['end ' => false ]);
257+ // relay data to client
258+ if (stripos ($ data , 'X-PPM-Restart: worker ' ) !== false ) {
259+ $ this ->restartMode = 'worker ' ;
260+ }
261+
262+ if (stripos ($ data , 'X-PPM-Restart: all ' ) !== false ) {
263+ $ this ->restartMode = 'all ' ;
264+ }
265+
266+ if ($ this ->restartMode ) {
267+ $ data = $ this ->removeHeader ($ data , 'X-PPM-Restart ' );
268+ }
269+
270+ $ this ->incoming ->write ($ data );
271+ });
253272 }
254273
255274 /**
@@ -328,6 +347,24 @@ public function slaveClosed()
328347 $ this ->output ->writeln (\sprintf ('Restart worker #%d because it reached memory limit of %d ' , $ this ->slave ->getPort (), $ memoryLimit ));
329348 $ connection ->close ();
330349 }
350+ if ($ this ->restartMode === 'worker ' ) {
351+ $ this ->slave ->close ();
352+ $ this ->output ->writeln (sprintf ('Restart worker #%d because "X-PPM-Worker" Header with content "worker" was send ' , $ this ->slave ->getPort ()));
353+ $ connection ->close ();
354+
355+ $ this ->restartMode = '' ;
356+ }
357+
358+ if ($ this ->restartMode === 'all ' ) {
359+ foreach ($ this ->slaves ->getSlaves () as $ slave ) {
360+ $ slave ->getConnection ()->close ();
361+ $ slave ->close ();
362+
363+ $ this ->output ->writeln (sprintf ('Restart worker #%d because "X-PPM-Worker" Header with content "all" was send ' , $ slave ->getPort ()));
364+ }
365+
366+ $ this ->restartMode = '' ;
367+ }
331368 }
332369 }
333370
@@ -348,7 +385,7 @@ public function slaveConnectFailed(\Exception $e)
348385 $ this ->verboseTimer (function ($ took ) use ($ e ) {
349386 return \sprintf (
350387 '<error>Connection to worker %d failed. Try #%d, took %.3fs ' .
351- '(timeout %ds). Error message: [%d] %s</error> ' ,
388+ '(timeout %ds). Error message: [%d] %s</error> ' ,
352389 $ this ->slave ->getPort (),
353390 $ this ->redirectionTries ,
354391 $ took ,
@@ -395,6 +432,18 @@ protected function isHeaderEnd($buffer)
395432 return false !== \strpos ($ buffer , "\r\n\r\n" );
396433 }
397434
435+ protected function removeHeader ($ header , $ headerToRemove )
436+ {
437+ $ result = $ header ;
438+
439+ if (false !== $ headerPosition = stripos ($ result , $ headerToRemove . ': ' )) {
440+ $ length = strpos (substr ($ header , $ headerPosition ), "\r\n" );
441+ $ result = substr_replace ($ result , '' , $ headerPosition , $ length );
442+ }
443+
444+ return $ result ;
445+ }
446+
398447 /**
399448 * Replaces or injects header
400449 *
0 commit comments