Работа в PHP по протоколу Stomp
В последнее время все более активно продвигается идея построения событийно-ориентированной архитектуры программного обеспечения. Такая архитектура предполагает, что её компоненты - отдельные сервисы, общаются между собой путем отправки асинхронных событий через общую шину ESB. Событие – это сообщение (например в терминологии AMQP), у которого есть название и набор параметров. Обычно событием является запрос какой-то информации, либо это может быть просто уведомление о чем-то заинтересованных сторон. В общем случае, событие отправляется безадресно, то есть без конкретных знаний, какой сервис должен получить данное событие. Конкретные же сервисы слушаю общую шину на предмет появления в них тех или иных событий или запросов с целью реагирования. Прелесть построения событийно-ориентированной архитектуры в том, что сервис-отправить может отправить событие в шину и, не дожидаясь ответа, приступить к выполнению других важных задач. Ответ будет получен позже также через шину.

Схема использования ESB
Шина обычно представляет собой набор очередей, внутри которых и находятся сообщения. В очередь можно как записать сообщение, так и получить его, удалив из очереди. Программное обеспечение, реализующее и управляющее очередями, предоставляющее API для работы с ними, называется брокером сообщений. Наиболее известные брокеры сообщений это ZeroMQ, ActiveMQ, RabbitMQ.

Очереди сообщений
В PHP для взаимодействия с брокерами сообщений обычно используется протокол Stomp. Уже давно существует PECL-расширение для PHP, реализующее программный интерфейс для работы по этому протоколу. Его-то и следует использовать. Инструкция по установке PECL-расширений
Как было описано выше, взаимодействие сервисов в контексте событийно-ориентированной архитектуры приложений сводится к двум задачам:
- возможность отправлять событие в общую шину;
- возможность слушать общую шину на предмет интересных и нужных данному компоненту событий.
Для решения этих задач можно воспользоваться довольно простой библиотекой stomp-utils. Установить можно с помощью команды composer:
composer require stomp-utils
Теперь можно его использовать. Для начала разберемся, как отправлять сообщения в шину - это более простая задача:
publish.php:
<?php
use TochkaIntegrationStomp{
StompClient,
Publisher
};
//указываем реквизиты для установки подключения к брокеру-сообщений по протоколу Stomp
$stompClient = new StompClient('tcp://localhost-smx:61613', 'admin', 'password');
//создаем объект, который умеет отправлять сообщения
$publisher = new Publisher($stompClient);
// формируем сообщение на отправку:
$requestContent = <<<REQ
<?xml version="1.0" encoding="UTF-8"?>
<request sender="test" timestamp="2016-12-07T07:27:26.503+00:00">
<data code="1234567" />
</request>
REQ;
// Указываем необходимые заголовки
$headers = [
'field' => 'test data'
];
// отправляем сообщение в очередь
$publisher->send('q.test.queue', $requestContent, $headers, uniqid());
Запустив скрипт publish.php отправляется сообщение в очередь. Все очень просто.
Более сложной задачей является получение сообщений из шины. Для этого необходимо знать, из какой-очереди нужно получать сообщения.
Для начала необходимо реализовать класс, который будет обрабатывать полученное сообщение. Он должен наследоваться от класса TochkaIntegrationStompBaseWorker и реализовывать метод handle:
TestWorker.php:
<?php
use TochkaIntegrationStompBaseWorker;
class TestWorker extends BaseWorker {
public function handle(): bool {
echo 'Processing: ' . print_r($this->mapper) . PHP_EOL;
return true;
}
}
Теперь необходимо реализовать класс, который будет слушать сообщения из очереди. Класс должен наследоваться от TochkaIntegrationStompListener. В классе нужно переопределить метод generateHandler, который должен возвращать объект-обработчик сообщения, который только что был написан:
TestListener.php:
<?php
use TochkaIntegrationStomp{
Listener,
FrameMapper,
BaseWorker
};
class TestListener extends Listener {
protected function generateHandler(FrameMapper $mapper): BaseWorker {
return new TestWorker($mapper);
}
}
Теперь можно написать скрипт, который будет слушать очередь:
listen.php:
<?php
require __DIR__ . '/../vendor/autoload.php';
require __DIR__ . '/TestWorker.php';
require __DIR__ . '/TestListener.php';
$stompClient = new TochkaIntegrationStompStompClient('tcp://localhost-smx:61613', 'admin', 'password');
$listener = new TestListener("q.test.queue", $stompClient);
$listener->pull();Скрипт listen.php работает постоянно подключен к очереди, и если в ней появится новое необработанное сообщение, скрипт обязательно его считает.
Comments