forked from victorycodedev/metaapi-cloud-php-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreaming_example.php
More file actions
150 lines (123 loc) · 5.63 KB
/
streaming_example.php
File metadata and controls
150 lines (123 loc) · 5.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
<?php
require_once __DIR__ . '/../../vendor/autoload.php';
use Oyi77\MetaapiCloudPhpSdk\MetaApi\MetaApi;
use Oyi77\MetaapiCloudPhpSdk\MetaApi\Streaming\SynchronizationListener;
/**
* MetaApi Streaming Connection Example
*
* Demonstrates how to:
* - Connect to MetaApi via streaming (continuous synchronization)
* - Access terminal state (positions, orders, account info, specifications, prices)
* - Subscribe to market data (quotes, candles, ticks, books)
* - Use typed synchronization listeners for real-time events
* - Access history storage
*/
$token = getenv('TOKEN') ?: '<put your token here>';
$accountId = getenv('ACCOUNT_ID') ?: '<put your account ID here>';
$symbol = getenv('SYMBOL') ?: 'EURUSD';
// Custom synchronization listener
class QuoteListener implements SynchronizationListener
{
private string $symbol;
public function __construct(string $symbol)
{
$this->symbol = $symbol;
}
public function onSymbolPriceUpdated(int $instanceIndex, array $price): void
{
if (($price['symbol'] ?? '') === $this->symbol) {
echo "[{$this->symbol}] Price: bid={$price['bid']}, ask={$price['ask']}\n";
}
}
public function onCandlesUpdated(int $instanceIndex, array $candles, ?float $equity = null, ?float $margin = null, ?float $freeMargin = null, ?float $marginLevel = null): void
{
foreach ($candles as $candle) {
if (($candle['symbol'] ?? '') === $this->symbol) {
echo "[{$this->symbol}] Candle: time={$candle['time']}, open={$candle['open']}, high={$candle['high']}, low={$candle['low']}, close={$candle['close']}\n";
}
}
}
public function onTicksUpdated(int $instanceIndex, array $ticks, ?float $equity = null, ?float $margin = null, ?float $freeMargin = null, ?float $marginLevel = null): void
{
foreach ($ticks as $tick) {
if (($tick['symbol'] ?? '') === $this->symbol) {
echo "[{$this->symbol}] Tick: time={$tick['time']}, bid={$tick['bid']}, ask={$tick['ask']}\n";
}
}
}
public function onBooksUpdated(int $instanceIndex, array $books, ?float $equity = null, ?float $margin = null, ?float $freeMargin = null, ?float $marginLevel = null): void
{
foreach ($books as $book) {
if (($book['symbol'] ?? '') === $this->symbol) {
echo "[{$this->symbol}] Order book updated\n";
}
}
}
public function onSubscriptionDowngraded(int $instanceIndex, string $symbol, ?array $updates = null, ?array $unsubscriptions = null): void
{
echo "Market data subscriptions for {$symbol} were downgraded by the server due to rate limits\n";
}
public function onPositionsUpdated(int $instanceIndex, array $positions): void {}
public function onOrdersUpdated(int $instanceIndex, array $orders): void {}
public function onHistoryOrdersUpdated(int $instanceIndex, array $historyOrders): void {}
public function onDealsUpdated(int $instanceIndex, array $deals): void {}
}
try {
// Create MetaApi instance
$metaApi = new MetaApi($token, [
'region' => 'new-york',
'domain' => 'agiliumtrade.agiliumtrade.ai',
]);
// Get streaming connection
$connection = $metaApi->streamingConnection($accountId);
// Add typed listener
$quoteListener = new QuoteListener($symbol);
$connection->addTypedSynchronizationListener($quoteListener);
// Connect to websocket
echo "Connecting to MetaApi streaming...\n";
$connection->connect(10.0);
// Wait for synchronization
echo "Waiting for synchronization...\n";
$connection->waitSynchronized(300);
echo "Synchronized!\n\n";
// Access terminal state
echo "Terminal State:\n";
echo "Connected: " . ($connection->terminalState->connected ? 'yes' : 'no') . "\n";
echo "Connected to broker: " . ($connection->terminalState->connectedToBroker ? 'yes' : 'no') . "\n";
echo "Account info: " . json_encode($connection->terminalState->accountInformation) . "\n";
echo "Positions: " . count($connection->terminalState->positions) . "\n";
echo "Orders: " . count($connection->terminalState->orders) . "\n\n";
// Access history storage
echo "History Storage:\n";
echo "Order sync finished: " . ($connection->historyStorage->isOrderSynchronizationFinished() ? 'yes' : 'no') . "\n";
echo "Deal sync finished: " . ($connection->historyStorage->isDealSynchronizationFinished() ? 'yes' : 'no') . "\n";
echo "History orders: " . count($connection->historyStorage->getHistoryOrders()) . "\n";
echo "Deals: " . count($connection->historyStorage->getDeals()) . "\n\n";
// Subscribe to market data
echo "Subscribing to {$symbol} market data...\n";
$connection->subscribeToMarketData($symbol, [
['type' => 'quotes', 'intervalInMilliseconds' => 5000],
['type' => 'candles', 'timeframe' => '1m', 'intervalInMilliseconds' => 10000],
['type' => 'ticks'],
]);
echo "Subscribed!\n\n";
// Get current price from terminal state
$price = $connection->terminalState->price($symbol);
if ($price) {
echo "Current {$symbol} price: " . json_encode($price) . "\n\n";
}
// Poll for events for 30 seconds
echo "Receiving market data for 30 seconds...\n";
$endTime = time() + 30;
while (time() < $endTime) {
$connection->poll(0.25);
}
// Unsubscribe and close
echo "\nUnsubscribing from market data...\n";
$connection->unsubscribeFromMarketData($symbol);
$connection->close();
echo "Connection closed.\n";
} catch (Exception $e) {
echo "Error: " . $e->getMessage() . "\n";
echo $e->getTraceAsString() . "\n";
}