From 58c5b0b08bf61b7253b3a169620e421528237536 Mon Sep 17 00:00:00 2001 From: Neil Young Date: Mon, 9 Mar 2015 16:09:36 -0500 Subject: [PATCH 1/2] Add heartbeat handling to work with Storm 0.9.3 --- lib/storm.php | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/lib/storm.php b/lib/storm.php index 86e1f84..2c2fc10 100644 --- a/lib/storm.php +++ b/lib/storm.php @@ -155,8 +155,12 @@ public function run() $command); $tuple = new Tuple($tupleMap['id'], $tupleMap['comp'], $tupleMap['stream'], $tupleMap['task'], $tupleMap['tuple']); - - $this->process($tuple); + + if ($tuple->task == -1 && $tuple->stream === '__heartbeat') { + $this->sync(); + } else { + $this->process($tuple); + } } } } @@ -233,6 +237,15 @@ protected function fail(Tuple $tuple) $this->sendCommand($command); } + + protected function sync() + { + $command = array( + 'command' => 'sync' + ); + + $this->sendCommand($command); + } } abstract class BasicBolt extends ShellBolt @@ -264,9 +277,13 @@ public function run() try { - $processed = $this->process($tuple); + if ($tuple->task == -1 && $tuple->stream === '__heartbeat') { + $this->sync(); + } else { + $processed = $this->process($tuple); - $this->ack($tuple); + $this->ack($tuple); + } } catch (BoltProcessException $e) { @@ -299,7 +316,7 @@ public function __construct() abstract protected function nextTuple(); abstract protected function ack($tuple_id); abstract protected function fail($tuple_id); - + public function run() { while (true) From 092a3a1179b522cac232d06000d432568c940dd2 Mon Sep 17 00:00:00 2001 From: unknown <明海区> Date: Thu, 17 Nov 2016 18:57:31 +0800 Subject: [PATCH 2/2] fix the debug mode and create file rule --- lib/storm.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/storm.php b/lib/storm.php index 2c2fc10..becb515 100644 --- a/lib/storm.php +++ b/lib/storm.php @@ -31,11 +31,11 @@ public function __construct($debug = false) $this->pid = getmypid(); $this->sendCommand(array( "pid" => $this->pid )); - $this->_debug = $debug; + $this->_DEBUG = $debug; if ($this->_DEBUG) { - $this->stormInc = fopen('/tmp/' . $this->pid . "_" . strtolower($_SERVER['argv'][0]) . '.txt', 'w+'); + $this->stormInc = fopen('/tmp/' . $this->pid . "_" . strtolower(basename($_SERVER['argv'][0])) . '.txt', 'w+'); } $handshake = $this->parseMessage( $this->waitForMessage() );