My class consumer, wait message from Rabbit:
<?php
namespace App\Infrastructure;
use App\Application\Interfaces\BankServiceInterface;
use App\Application\Interfaces\ConsumerInterface;
use App\Application\Interfaces\MailAgentInterface;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class Consumer implements ConsumerInterface
{
private AMQPStreamConnection $connection;
private AMQPChannel $channel;
private BankServiceInterface $service;
private MailAgentInterface $mailAgent;
public function __construct(AMQPStreamConnection $connection, BankServiceInterface $service, MailAgentInterface $mailAgent)
{
$this->connection = $connection;
$this->channel = $connection->channel();
$this->service = $service;
$this->mailAgent = $mailAgent;
}
public function runFromQueue(string $queueName): void
{
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false
);
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$queueName,
'',
false,
false,
false,
false,
$this->onConsume()
);
while(count($this->channel->callbacks)) {
$this->channel->wait();
}
$this->closeConnection();
}
public function closeConnection(): void
{
$this->channel->close();
$this->connection->close();
}
private function onConsume()
{
return function ($request) {
echo '<pre>';
var_dump($request->getBody());
echo '</pre>';
};
}
}
When initialization this class, his work fails with 504 Gateway Time-out. But if delete invoke method getBody()
from method onConsume
:
private function onConsume()
{
return function ($request) {
echo '<pre>';
var_dump($request);
echo '</pre>';
};
this is work, and print message object with message body in property.
Also i get 504 Gateway Time-out error, if try access property body
in this object:
private function onConsume()
{
return function ($request) {
echo '<pre>';
var_dump($request->body);
echo '</pre>';
};
It`s like loop in method runFromQueue()
, loops and server off this scrip by timeout. After i tried replace loop in runFromQueue()
with method consume()
:
public function runFromQueue(string $queueName): void
{
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false
);
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$queueName,
'',
false,
false,
false,
false,
$this->onConsume()
);
try {
$this->channel->consume();
} catch (\Throwable $exception) {
echo $exception->getMessage();
}
$this->closeConnection();
}
But result will be same like a loop, i get 504 Gateway Time-out error if invoke method getBody()
or property body
, and this work with simple print $request
.
Then i make simple modification, add in class new property:
private $isMessageRead = true;
and place this in loop condition:
public function runFromQueue(string $queueName): void
{
$this->channel->queue_declare(
$queueName,
false,
true,
false,
false
);
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume(
$queueName,
'',
false,
false,
false,
false,
$this->onConsume()
);
while($this->isMessageRead) {
$this->channel->wait();
}
$this->closeConnection();
}
Value this property change in onConsume()
:
private function onConsume()
{
return function ($request) {
echo '<pre>';
var_dump($request->getBody());
echo '</pre>';
$this->isMessageRead = false;
};
}
This work, but i not satisfied with this solution. What is problem? How make it without property isMessageRead
?