Skip to content

Commit

Permalink
add streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
christian-rogobete committed Dec 18, 2021
1 parent 8783cc5 commit 619b10f
Show file tree
Hide file tree
Showing 2 changed files with 322 additions and 5 deletions.
249 changes: 244 additions & 5 deletions Soneso/StellarSDK/StellarSDK.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@

namespace Soneso\StellarSDK;

use Exception;

use GuzzleHttp\Client;
use GuzzleHttp\Exception\GuzzleException;
use GuzzleHttp\Psr7\Request;
use GuzzleHttp\Exception\ServerException;
use Soneso\StellarSDK\Exceptions\HorizonRequestException;
use Soneso\StellarSDK\Requests\AccountsRequestBuilder;
use Soneso\StellarSDK\Requests\AssetsRequestBuilder;
Expand All @@ -24,8 +24,6 @@
use Soneso\StellarSDK\Requests\OperationsRequestBuilder;
use Soneso\StellarSDK\Requests\OrderBookRequestBuilder;
use Soneso\StellarSDK\Requests\PaymentsRequestBuilder;
use Soneso\StellarSDK\Requests\RequestBuilder;
use Soneso\StellarSDK\Requests\RequestType;
use Soneso\StellarSDK\Requests\RootRequestBuilder;
use Soneso\StellarSDK\Requests\StrictReceivePathsRequestBuilder;
use Soneso\StellarSDK\Requests\StrictSendPathsRequestBuilder;
Expand All @@ -35,12 +33,17 @@
use Soneso\StellarSDK\Requests\TransactionsRequestBuilder;
use Soneso\StellarSDK\Responses\Account\AccountResponse;
use Soneso\StellarSDK\Responses\ClaimableBalances\ClaimableBalanceResponse;
use Soneso\StellarSDK\Responses\Effects\EffectResponse;
use Soneso\StellarSDK\Responses\FeeStats\FeeStatsResponse;
use Soneso\StellarSDK\Responses\Ledger\LedgerResponse;
use Soneso\StellarSDK\Responses\LiquidityPools\LiquidityPoolResponse;
use Soneso\StellarSDK\Responses\Offers\OfferResponse;
use Soneso\StellarSDK\Responses\Operations\AccountMergeOperationResponse;
use Soneso\StellarSDK\Responses\Operations\CreateAccountOperationResponse;
use Soneso\StellarSDK\Responses\Operations\OperationResponse;
use Soneso\StellarSDK\Responses\ResponseHandler;
use Soneso\StellarSDK\Responses\Operations\PathPaymentStrictReceiveOperationResponse;
use Soneso\StellarSDK\Responses\Operations\PathPaymentStrictSendOperationResponse;
use Soneso\StellarSDK\Responses\Operations\PaymentOperationResponse;
use Soneso\StellarSDK\Responses\Root\RootResponse;
use Soneso\StellarSDK\Responses\Transaction\SubmitTransactionResponse;
use Soneso\StellarSDK\Responses\Transaction\TransactionResponse;
Expand Down Expand Up @@ -271,4 +274,240 @@ public function submitTransaction(AbstractTransaction $transaction) : SubmitTran
$builder->setTransaction($transaction);
return $builder->execute();
}

/**
* @param string $relativeUrl
* @param callable $callback
* @param $retryOnServerException bool If true, ignore ServerException errors and retry
* @throws GuzzleException
*/
public function getAndStream(string $relativeUrl, callable $callback, bool $retryOnServerException = true) : void
{
while (true) {
try {
$response = $this->httpClient->get($relativeUrl, [
'stream' => true,
'read_timeout' => null,
'headers' => [
'Accept' => 'text/event-stream',
]
]);

$body = $response->getBody();

while (!$body->eof()) {
$line = '';

$char = null;
while ($char != "\n") {
$line .= $char;
$char = $body->read(1);
}

// Ignore empty lines
if (!$line) continue;

// Ignore "data: hello" handshake
if (str_starts_with($line, 'data: "hello"')) continue;

// Ignore lines that don't start with "data: "
$sentinel = 'data: ';
if (!str_starts_with($line, $sentinel)) continue;

// Remove sentinel prefix
$json = substr($line, strlen($sentinel));

$decoded = json_decode($json, true);
if ($decoded) {
$callback($decoded);
}
}

}
catch (ServerException $e) {
if (!$retryOnServerException) throw $e;

// Delay for a bit before trying again
sleep(10);
}
}
}

/**
* Streams Payment or CreateAccount objects to $callback
*
* $callback should have arguments:
* OperationResponse
*
* For example:
*
* $sdk = StellarSDK::getTestNetInstance();
* $sdk->streamPayments('now', function(OperationResponse $payment) {
* printf('Payment operation id %s' . PHP_EOL, $payment->getOperationId());
* });
*
* @param string $sinceCursor
* @param callable|null $callback
* @throws GuzzleException
*/
public function streamPayments(string $sinceCursor = 'now', callable $callback = null)
{
$url = '/payments';
$params = [];

if ($sinceCursor) $params['cursor'] = $sinceCursor;

if ($params) {
$url .= '?' . http_build_query($params);
}

$this->getAndStream($url, function($rawData) use ($callback) {
if (isset($rawData['type'])){
$parsedObject = match ($rawData['type']) {
'create_account' => CreateAccountOperationResponse::fromJson($rawData),
'payment' => PaymentOperationResponse::fromJson($rawData),
'account_merge' => AccountMergeOperationResponse::fromJson($rawData),
'path_payment_strict_send' => PathPaymentStrictSendOperationResponse::fromJson($rawData),
'path_payment_strict_receive' => PathPaymentStrictReceiveOperationResponse::fromJson($rawData)
};
$callback($parsedObject);
}
});
}

/**
* Streams Effect objects to $callback
*
* $callback should have arguments:
* EffectResponse
*
* For example:
*
* $sdk = StellarSDK::getTestNetInstance();
* $sdk->streamEffects('now', function(EffectResponse $effect) {
* printf('Effect type: %s' . PHP_EOL, $effect->getEffectType());
* });
*
* @param string $sinceCursor
* @param callable|null $callback
* @throws GuzzleException
*/
public function streamEffects(string $sinceCursor = 'now', callable $callback = null)
{
$url = '/effects';
$params = [];

if ($sinceCursor) $params['cursor'] = $sinceCursor;

if ($params) {
$url .= '?' . http_build_query($params);
}

$this->getAndStream($url, function($rawData) use ($callback) {
$parsedObject = EffectResponse::fromJson($rawData);
$callback($parsedObject);
});
}

/**
* Streams Ledger objects to $callback
*
* $callback should have arguments:
* LedgerResponse
*
* For example:
*
* $sdk = StellarSDK::getTestNetInstance();
* $sdk->streamLedgers('now', function(LedgerResponse $ledger) {
* printf('Ledger closed at: %s' . PHP_EOL, $ledger->getCreatedAt());
* });
*
* @param string $sinceCursor
* @param callable|null $callback
* @throws GuzzleException
*/
public function streamLedgers(string $sinceCursor = 'now', callable $callback = null)
{
$url = '/ledgers';
$params = [];

if ($sinceCursor) $params['cursor'] = $sinceCursor;

if ($params) {
$url .= '?' . http_build_query($params);
}

$this->getAndStream($url, function($rawData) use ($callback) {
$parsedObject = LedgerResponse::fromJson($rawData);
$callback($parsedObject);
});
}

/**
* Streams Operation objects to $callback
*
* $callback should have arguments:
* OperationResponse
*
* For example:
*
* $sdk = StellarSDK::getTestNetInstance();
* $sdk->streamOperations('now', function(OperationResponse $operation) {
* printf('Operation id %s' . PHP_EOL, $operation->getOperationId());
* });
*
* @param string $sinceCursor
* @param callable|null $callback
* @throws GuzzleException
*/
public function streamOperations(string $sinceCursor = 'now', callable $callback = null)
{
$url = '/operations';
$params = [];

if ($sinceCursor) $params['cursor'] = $sinceCursor;

if ($params) {
$url .= '?' . http_build_query($params);
}

$this->getAndStream($url, function($rawData) use ($callback) {
$parsedObject = OperationResponse::fromJson($rawData);
$callback($parsedObject);
});
}

/**
* Streams Transaction objects to $callback
*
* $callback should have arguments:
* TransactionResponse
*
* For example:
*
* $sdk = StellarSDK::getTestNetInstance();
* $sdk->streamTransactions('now', function(TransactionResponse $transaction) {
* printf('Transaction Hash %s' . PHP_EOL, $transaction->getHash());
* });
*
* @param string $sinceCursor
* @param callable|null $callback
* @throws GuzzleException
*/
public function streamTransactions(string $sinceCursor = 'now', callable $callback = null)
{
$url = '/transactions';
$params = [];

if ($sinceCursor) $params['cursor'] = $sinceCursor;

if ($params) {
$url .= '?' . http_build_query($params);
}

$this->getAndStream($url, function($rawData) use ($callback) {
$parsedObject = TransactionResponse::fromJson($rawData);
$callback($parsedObject);
});
}
}
78 changes: 78 additions & 0 deletions Soneso/StellarSDKTests/QueryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

namespace StellarSDKTests;

use Exception;
use PHPUnit\Framework\TestCase;
use Soneso\StellarSDK\Asset;
use Soneso\StellarSDK\AssetTypeCreditAlphanum12;
Expand All @@ -15,6 +16,10 @@
use Soneso\StellarSDK\Crypto\KeyPair;
use Soneso\StellarSDK\ManageBuyOfferOperationBuilder;
use Soneso\StellarSDK\Network;
use Soneso\StellarSDK\Responses\Effects\EffectResponse;
use Soneso\StellarSDK\Responses\Ledger\LedgerResponse;
use Soneso\StellarSDK\Responses\Operations\CreateAccountOperationResponse;
use Soneso\StellarSDK\Responses\Operations\OperationResponse;
use Soneso\StellarSDK\SetOptionsOperationBuilder;
use Soneso\StellarSDK\StellarSDK;
use Soneso\StellarSDK\TransactionBuilder;
Expand Down Expand Up @@ -300,5 +305,78 @@ public function testPaging(): void
$count = $prev->getOperations()->count();
$this->assertEquals($response->getOperations()->toArray()[0]->getOperationId(), $prev->getOperations()->toArray()[$count - 1]->getOperationId());
}

public function testStreamPayments(): void
{
$sdk = StellarSDK::getTestNetInstance();
$found = false;
try {
$sdk->streamPayments('now', function(OperationResponse $payment) {
printf('Payment operation %s id %s' . PHP_EOL, get_class($payment), $payment->getOperationId());
if ($payment instanceof CreateAccountOperationResponse) {
throw new Exception("stop");
}
});
} catch (Exception $e) {
if ($e->getMessage() == "stop") {
$found = true;
}
}
$this->assertTrue($found);
}

public function testStreamOperations(): void
{
$sdk = StellarSDK::getTestNetInstance();
$found = false;
try {
$sdk->streamOperations('now', function(OperationResponse $operation) {
printf('Operation id %s' . PHP_EOL, $operation->getOperationId());
if ($operation instanceof CreateAccountOperationResponse) {
throw new Exception("stop");
}
});
} catch (Exception $e) {
if ($e->getMessage() == "stop") {
$found = true;
}
}
$this->assertTrue($found);
}

public function testStreamLedgers(): void
{
$sdk = StellarSDK::getTestNetInstance();
$found = false;
try {
$sdk->streamLedgers('now', function(LedgerResponse $ledger) {
printf('Ledger sequence %s' . PHP_EOL, $ledger->getSequence()->toString());
throw new Exception("stop");
});
} catch (Exception $e) {
if ($e->getMessage() == "stop") {
$found = true;
}
}
$this->assertTrue($found);
}

public function testStreamEffects(): void
{
$sdk = StellarSDK::getTestNetInstance();
$found = false;
try {
$sdk->streamEffects('now', function(EffectResponse $effect) {
printf('Effect id %s' . PHP_EOL, $effect->getEffectId());
throw new Exception("stop");
});
} catch (Exception $e) {
if ($e->getMessage() == "stop") {
$found = true;
}
}
$this->assertTrue($found);
}

}

0 comments on commit 619b10f

Please sign in to comment.