PHP code example of miquido / observable

1. Go to this page and download the library: Download miquido/observable library. Choose the download type require.

2. Extract the ZIP file and open the index.php.

3. Add this code to the index.php.
    
        
<?php
require_once('vendor/autoload.php');

/* Start to develop here. Best regards https://php-download.com/ */

    

miquido / observable example snippets




use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Observer;

// create an observable stream from an array:
// $stream is an objects that implements Miquido\Observable\ObservableInterface
$stream = FromArray::create([1, 2, 3, 4, 5]);

// then you can subscribe to the stream using Observer (both parameters are optional):
$stream->subscribe(new Observer(
    function (int $i): void { /* this callback will be called 5 times with consecutive 1, 2, 3, 4, 5*/ }, 
    function (): void { /* this callback will be called once after every items in the stream will be emitted */ }
));

// alternatively - if you are only interested in items in the stream you can pass just a callback 
$stream->subscribe(function (int $i): void {
    // do something with numbers
});




use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);

$squareStream = $stream->pipe(new Operator\Map(function (int $i): int {
    return $i * $i;
}));

$sumStream = $stream->pipe(new Operator\Sum());
$squareSumStream = $squareStream->pipe(new Operator\Sum());
$tripleSumStream = $stream
    ->pipe(new Operator\Map(function (int $i): int {
        return $i ** 3;
    }))
    ->pipe(new Operator\Filter(function (int $i): bool {
        return $i % 3 > 0;
    }));

$squareStream->subscribe(function ($i) {}); // called 5 times with consecutive: 1, 4, 9, 16, 25
$squareSumStream->subscribe(function ($i) {}); // called once with a number 55
$sumStream->subscribe(function ($i) {}); // called once with a number 15



use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;


$stream = FromArray::create([1, 2, 3, 4, 5, 6]);
$stream
    // first pipe raises each number to a power 3,
    ->pipe(new Operator\Map(function (int $i): int {
        return $i ** 3;
    }))
    // then BufferCount(3) receives stream of numbers: 1, 8, 25, 64, 125, 216
    // holds the stream until it receives 3 values, then releases array with three values
    ->pipe(new Operator\BufferCount(3))
    // next pipes receives two arrays of three numbers: [1, 8, 27], [64, 125, 216] and returns sum of each group
    ->pipe(new Operator\Map(function (array $numbers): int {
        return array_sum($numbers);
    }))
    ->subscribe(function (int $i): void {
        // subscribe() is called twice with numbers: 36, 405
    });



use Miquido\Observable\Subject\Subject;
use Miquido\Observable\Operator;

// lets create a Subject
$words = new Subject();

// because it is an observable, you can pipe and subscribe to the data
$words
    ->pipe(new Operator\Map(function (string $word): string {
        return \strtoupper($word);
    }))
    ->subscribe(function (string $word): void {
        // receives upper cased words
    });

$words
    ->pipe(new Operator\Map('ucfirst'))
    ->pipe(new Operator\Reduce(
        function (string $sentenceInProgress, string $word) {
            return \sprintf('%s %s', $sentenceInProgress, $word);
        },
        ''
    ))
    ->pipe(new Operator\Map('trim'))
    ->subscribe(function (string $sentence): void {
        // receives a sentence of all words int the stream
    });

// And because a Subject is also an observer, you can push new items to the stream.
$words->next('lorem');
$words->next('ipsum');
$words->next('dolor');
$words->next('sit');
$words->next('amet');

// complete will send a "complete" notification to all observers and will remove observers from the subject
$words->complete();

/**
 * In this example:
 * - first subscriber will receive 5 items: 'LOREM', 'IPSUM', 'DOLOR', 'SIT' and 'AMET'
 * - second subscriber will recive one item: 'Lorem Ipsum Dolor Sit Amet'
 */




use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([
    [1, 2],
    [3, 4, 5] 
]);
$stream
    ->pipe(new Operator\ArrayCount())
    ->subscribe(function (int $count): void {
        // called twice with values: 2 and 3
    });



use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5, 6]);
$stream
    ->pipe(new Operator\BufferCount(3))
    ->subscribe(function (array $values): void {
        // called twice with values: [1, 2, 3] and [4, 5, 6]
    });



use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 1, 2, 1, 3, 4, 5, 5, 6]);
$stream
    ->pipe(new Operator\BufferUniqueCount(3))
    ->subscribe(function (array $values): void {
        // called twice with values: [1, 2, 3] and [4, 5, 6]
    });



use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 1, 2, 1, 3, 4, 5, 5, 6]);
$stream
    ->pipe(new Operator\Count())
    ->subscribe(function (int $count): void {
        // called once with value: 9
    });



use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Filter(function (int $number): bool {
        return $number % 2 === 0;
    }))
    ->subscribe(function (int $number): void {
        // called twice with values: 2, 4
    });



use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([
    [1, 2],
    [3, 4, 5]
]);
$stream
    ->pipe(new Operator\Flat())
    ->subscribe(function (int $number): void {
        // called 5 times with values: 1, 2, 3, 4, 5
        var_dump($number);
    });



use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Let(function (int $number): void {
        // do something with this number, no need to return anything
    }))
    ->subscribe(function (int $number): void {
        // called 5 times with values: 1, 2, 3, 4, 5
        var_dump($number);
    });



use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create(['lorem', 'ipsum', 'dolor', 'sit', 'amet']);
$stream
    ->pipe(new Operator\Map(function (string $word): int {
        return \strlen($word);
    }))
    ->subscribe(function (int $length): void {
        // called 5 times with values: 5, 5, 5, 3, 5
    });



use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Reduce(
        function (int $sum, int $number): int {
            return $sum + $number;
        },
        0
    ))
    ->subscribe(function (int $sum): void {
        // called once with value 15
    });



use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Scan(
        function (int $sum, int $number): int {
            return $sum + $number;
        },
        0
    ))
    ->subscribe(function (int $sum): void {
        // called 5 times with values: 1, 3, 6, 10, 15
    });



use Miquido\Observable\Stream\FromArray;
use Miquido\Observable\Operator;

$stream = FromArray::create([1, 2, 3, 4, 5]);
$stream
    ->pipe(new Operator\Sum())
    ->subscribe(function (int $sum): void {
        // called once with value 15
    });