Skip to content

Commit

Permalink
refactor streaming & add example
Browse files Browse the repository at this point in the history
  • Loading branch information
christian-rogobete committed Dec 27, 2021
1 parent f0be99a commit b268199
Show file tree
Hide file tree
Showing 11 changed files with 354 additions and 186 deletions.
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,27 @@ foreach ($operationsPage->getOperations() as $payment) {
```
You can use:`limit`, `order`, and `cursor` to customize the query. Get the most recent payments for accounts, ledgers and transactions.

Horizon has SSE support for push data. You can use it like this:
```php
$accountId = "GCDBA6GFGEHAMVAMRL6R2733EXUENJ35EMYNA2LE7WWJPVANORVC4UNA";

$sdk->payments()->forAccount($accountId)->cursor("now")->stream(function(OperationResponse $response) {
if ($response instanceof PaymentOperationResponse) {
switch ($response->getAsset()->getType()) {
case Asset::TYPE_NATIVE:
printf("Payment of %s XLM from %s received.", $response->getAmount(), $response->getSourceAccount());
break;
default:
printf("Payment of %s %s from %s received.", $response->getAmount(), $response->getAsset()->getCode(), $response->getSourceAccount());
}
if (floatval($response->getAmount()) > 0.5) {
exit;
}
}
});
```
see also [stream payments example](examples/stream_payments.md)

#### 3.3 Check others

Just like payments, you can check `assets`, `transactions`, `effects`, `offers`, `operations`, `ledgers` etc.
Expand Down Expand Up @@ -192,5 +213,6 @@ if ($response->isSuccessful()) {
| [Allow trust](examples/allow_trust.md) | Updates the authorized flag of an existing trustline. | [Allow trust](https://www.stellar.org/developers/learn/concepts/list-of-operations.html#allow-trust) and [Assets documentation](https://www.stellar.org/developers/learn/concepts/assets.html) |
| [Fee bump transaction](examples/fee_bump.md) | Fee bump transactions allow an arbitrary account to pay the fee for a transaction.| [Fee bump transactions](https://github.com/stellar/stellar-protocol/blob/master/core/cap-0015.md)|
| [Muxed accounts](examples/muxed_account_payment.md) | In this example we will see how to use a muxed account in a payment operation.| [First-class multiplexed accounts](https://github.com/stellar/stellar-protocol/blob/master/core/cap-0027.md)|
| [Stream payments](examples/stream_payments.md) | Listens for payments received by a given account.| [Streaming](https://developers.stellar.org/api/introduction/streaming/) |

More examples can be found in the [tests](https://github.com/Soneso/stellar-php-sdk/tree/main/Soneso/StellarSDKTests).
26 changes: 26 additions & 0 deletions Soneso/StellarSDK/Requests/EffectsRequestBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
namespace Soneso\StellarSDK\Requests;

use GuzzleHttp\Client;
use GuzzleHttp\Exception\GuzzleException;
use Soneso\StellarSDK\Exceptions\HorizonRequestException;
use Soneso\StellarSDK\Responses\Effects\EffectResponse;
use Soneso\StellarSDK\Responses\Effects\EffectsPageResponse;

class EffectsRequestBuilder extends RequestBuilder
Expand Down Expand Up @@ -114,4 +116,28 @@ public function request(string $url): EffectsPageResponse {
public function execute() : EffectsPageResponse {
return $this->request($this->buildUrl());
}

/**
* Streams Effect objects to $callback
*
* $callback should have arguments:
* EffectResponse
*
* For example:
*
* $sdk = StellarSDK::getTestNetInstance();
* $sdk->effects()->cursor("now")->stream(function(EffectResponse $effect) {
* printf('Effect type: %s' . PHP_EOL, $effect->getEffectType());
* });
*
* @param callable|null $callback
* @throws GuzzleException
*/
public function stream(callable $callback = null)
{
$this->getAndStream($this->buildUrl(), function($rawData) use ($callback) {
$parsedObject = EffectResponse::fromJson($rawData);
$callback($parsedObject);
});
}
}
25 changes: 25 additions & 0 deletions Soneso/StellarSDK/Requests/LedgersRequestBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
namespace Soneso\StellarSDK\Requests;

use GuzzleHttp\Client;
use GuzzleHttp\Exception\GuzzleException;
use Soneso\StellarSDK\Exceptions\HorizonRequestException;
use Soneso\StellarSDK\Responses\Ledger\LedgerResponse;
use Soneso\StellarSDK\Responses\Ledger\LedgersPageResponse;
Expand Down Expand Up @@ -74,4 +75,28 @@ public function request(string $url): LedgersPageResponse {
public function execute() : LedgersPageResponse {
return $this->request($this->buildUrl());
}

/**
* Streams Ledger objects to $callback
*
* $callback should have arguments:
* LedgerResponse
*
* For example:
*
* $sdk = StellarSDK::getTestNetInstance();
* $sdk->ledgers()->cursor("now")->stream(function(LedgerResponse $ledger) {
* printf('Ledger closed at: %s' . PHP_EOL, $ledger->getCreatedAt());
* });
*
* @param callable|null $callback
* @throws GuzzleException
*/
public function stream(callable $callback = null)
{
$this->getAndStream($this->buildUrl(), function($rawData) use ($callback) {
$parsedObject = LedgerResponse::fromJson($rawData);
$callback($parsedObject);
});
}
}
25 changes: 25 additions & 0 deletions Soneso/StellarSDK/Requests/OperationsRequestBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
namespace Soneso\StellarSDK\Requests;

use GuzzleHttp\Client;
use GuzzleHttp\Exception\GuzzleException;
use Soneso\StellarSDK\Exceptions\HorizonRequestException;
use Soneso\StellarSDK\Responses\Operations\OperationResponse;
use Soneso\StellarSDK\Responses\Operations\OperationsPageResponse;
Expand Down Expand Up @@ -159,4 +160,28 @@ public function request(string $url): OperationsPageResponse {
public function execute() : OperationsPageResponse {
return $this->request($this->buildUrl());
}

/**
* Streams Operation objects to $callback
*
* $callback should have arguments:
* OperationResponse
*
* For example:
*
* $sdk = StellarSDK::getTestNetInstance();
* $sdk->operations()->cursor("now")->stream(function(OperationResponse $operation) {
* printf('Operation id %s' . PHP_EOL, $operation->getOperationId());
* });
*
* @param callable|null $callback
* @throws GuzzleException
*/
public function stream(callable $callback = null)
{
$this->getAndStream($this->buildUrl(), function($rawData) use ($callback) {
$parsedObject = OperationResponse::fromJson($rawData);
$callback($parsedObject);
});
}
}
39 changes: 39 additions & 0 deletions Soneso/StellarSDK/Requests/PaymentsRequestBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@
namespace Soneso\StellarSDK\Requests;

use GuzzleHttp\Client;
use GuzzleHttp\Exception\GuzzleException;
use Soneso\StellarSDK\Exceptions\HorizonRequestException;
use Soneso\StellarSDK\Responses\Operations\AccountMergeOperationResponse;
use Soneso\StellarSDK\Responses\Operations\CreateAccountOperationResponse;
use Soneso\StellarSDK\Responses\Operations\OperationsPageResponse;
use Soneso\StellarSDK\Responses\Operations\PathPaymentStrictReceiveOperationResponse;
use Soneso\StellarSDK\Responses\Operations\PathPaymentStrictSendOperationResponse;
use Soneso\StellarSDK\Responses\Operations\PaymentOperationResponse;
use Soneso\StellarSDK\Responses\Payments\PaymentsPageResponse;

class PaymentsRequestBuilder extends RequestBuilder
Expand Down Expand Up @@ -112,4 +118,37 @@ public function request(string $url): OperationsPageResponse {
public function execute() : OperationsPageResponse {
return $this->request($this->buildUrl());
}

/**
* Streams Payment or CreateAccount objects to $callback
*
* $callback should have arguments:
* OperationResponse
*
* For example:
*
* $sdk = StellarSDK::getTestNetInstance();
* $sdk->payments()->cursor("now")->stream(function(OperationResponse $payment) {
* printf('Payment operation id %s' . PHP_EOL, $payment->getOperationId());
* });
*
* @param callable|null $callback
* @throws GuzzleException
*/
public function stream(callable $callback = null)
{
$this->getAndStream($this->buildUrl(), function($rawData) use ($callback) {
if (isset($rawData['type'])){
$parsedObject = match ($rawData['type']) {
'create_account' => CreateAccountOperationResponse::fromJson($rawData),
'payment' => PaymentOperationResponse::fromJson($rawData),
'account_merge' => AccountMergeOperationResponse::fromJson($rawData),
'path_payment_strict_send' => PathPaymentStrictSendOperationResponse::fromJson($rawData),
'path_payment_strict_receive' => PathPaymentStrictReceiveOperationResponse::fromJson($rawData)
};
$callback($parsedObject);
}
});
}

}
58 changes: 58 additions & 0 deletions Soneso/StellarSDK/Requests/RequestBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use GuzzleHttp\Client;
use GuzzleHttp\Exception\GuzzleException;
use GuzzleHttp\Exception\ServerException;
use GuzzleHttp\Psr7\Request;
use RuntimeException;
use Soneso\StellarSDK\Exceptions\HorizonRequestException;
Expand Down Expand Up @@ -115,4 +116,61 @@ public function executeRequest(string $url, string $requestType, ?string $reques
* @throws HorizonRequestException
*/
public abstract function execute() : Response;

/**
* @param string $relativeUrl
* @param callable $callback
* @param $retryOnServerException bool If true, ignore ServerException errors and retry
* @throws GuzzleException
*/
public function getAndStream(string $relativeUrl, callable $callback, bool $retryOnServerException = true) : void
{
while (true) {
try {
$response = $this->httpClient->get($relativeUrl, [
'stream' => true,
'read_timeout' => null,
'headers' => [
'Accept' => 'text/event-stream',
]
]);

$body = $response->getBody();

while (!$body->eof()) {
$line = '';

$char = null;
while ($char != "\n") {
$line .= $char;
$char = $body->read(1);
}

// Ignore empty lines
if (!$line) continue;

// Ignore "data: hello" handshake
if (str_starts_with($line, 'data: "hello"')) continue;

// Ignore lines that don't start with "data: "
$sentinel = 'data: ';
if (!str_starts_with($line, $sentinel)) continue;

// Remove sentinel prefix
$json = substr($line, strlen($sentinel));

$decoded = json_decode($json, true);
if ($decoded) {
$callback($decoded);
}
}
}
catch (ServerException $e) {
if (!$retryOnServerException) throw $e;

// Delay for a bit before trying again
sleep(10);
}
}
}
}
26 changes: 26 additions & 0 deletions Soneso/StellarSDK/Requests/TransactionsRequestBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Soneso\StellarSDK\Requests;

use GuzzleHttp\Client;
use GuzzleHttp\Exception\GuzzleException;
use Soneso\StellarSDK\Exceptions\HorizonRequestException;
use Soneso\StellarSDK\Responses\Transaction\TransactionResponse;
use Soneso\StellarSDK\Responses\Transaction\TransactionsPageResponse;
Expand Down Expand Up @@ -126,4 +127,29 @@ public function request(string $url): TransactionsPageResponse
public function execute() : TransactionsPageResponse {
return $this->request($this->buildUrl());
}


/**
* Streams Transaction objects to $callback
*
* $callback should have arguments:
* TransactionResponse
*
* For example:
*
* $sdk = StellarSDK::getTestNetInstance();
* $sdk->transactions()->cursor("now")->stream(function(TransactionResponse $transaction) {
* printf('Transaction Hash %s' . PHP_EOL, $transaction->getHash());
* });
*
* @param callable|null $callback
* @throws GuzzleException
*/
public function stream(callable $callback = null)
{
$this->getAndStream($this->buildUrl(), function($rawData) use ($callback) {
$parsedObject = TransactionResponse::fromJson($rawData);
$callback($parsedObject);
});
}
}
Loading

0 comments on commit b268199

Please sign in to comment.