1
0
Fork 0
mirror of synced 2025-04-05 06:03:34 +03:00

message handlers for symfony messenger

This commit is contained in:
Akolzin Dmitry 2021-03-26 17:48:04 +03:00
parent 7516ec60cb
commit b8c160dea4
8 changed files with 349 additions and 1 deletions

View file

@ -53,6 +53,11 @@ class Configuration implements ConfigurationInterface
->end()
->end()
->end()
->arrayNode('messenger')
->children()
->scalarNode('message_handler')->isRequired()->defaultValue('simple_console_runner')->end()
->end()
->end()
->end();
return $treeBuilder;

View file

@ -4,6 +4,7 @@ namespace RetailCrm\ServiceBundle\DependencyInjection;
use RetailCrm\ServiceBundle\ArgumentResolver\CallbackValueResolver;
use RetailCrm\ServiceBundle\ArgumentResolver\ClientValueResolver;
use RetailCrm\ServiceBundle\Messenger\MessageHandler;
use RetailCrm\ServiceBundle\Response\ErrorJsonResponseFactory;
use RetailCrm\ServiceBundle\Security\CallbackClientAuthenticator;
use RetailCrm\ServiceBundle\Security\FrontApiClientAuthenticator;
@ -49,6 +50,11 @@ class RetailCrmServiceExtension extends Extension
$config['request_schema']['client']['serializer']
);
$container->setParameter(
'retail_crm_service.messenger.message_handler',
$config['messenger']['message_handler']
);
$container
->register(SymfonySerializerAdapter::class)
->setAutowired(true);
@ -90,5 +96,23 @@ class RetailCrmServiceExtension extends Extension
$container
->register(FrontApiClientAuthenticator::class)
->setAutowired(true);
$container
->register(MessageHandler\SimpleConsoleRunner::class)
->setAutowired(true);
$container->setAlias('simple_console_runner', MessageHandler\SimpleConsoleRunner::class);
$container
->register(MessageHandler\InNewProcessRunner::class)
->setAutowired(true);
$container->setAlias('in_new_process_runner', MessageHandler\InNewProcessRunner::class);
$container
->register(MessageHandler::class)
->addTag('messenger.message_handler')
->setArguments([
new Reference($container->getParameter('retail_crm_service.messenger.message_handler'))
])
->setAutowired(true);
}
}

99
Messenger/Message.php Normal file
View file

@ -0,0 +1,99 @@
<?php
namespace RetailCrm\ServiceBundle\Messenger;
/**
* Class Message
*
* @package RetailCrm\ServiceBundle\Messenger
*/
abstract class Message
{
/** @var string */
protected $commandName;
/** @var array */
protected $options = [];
/** @var array */
protected $arguments = [];
/**
* @return string
*/
public function getCommandName(): string
{
return $this->commandName;
}
/**
* @param string $commandName
*/
public function setCommandName(string $commandName): void
{
$this->commandName = $commandName;
}
/**
* @return array
*/
public function getOptions(): array
{
return $this->options;
}
/**
* @param array $options
*/
public function setOptions(array $options): void
{
$this->options = $options;
}
/**
* @return array
*/
public function getArguments(): array
{
return $this->arguments;
}
/**
* @param array $arguments
*/
public function setArguments(array $arguments): void
{
$this->arguments = $arguments;
}
/**
* @param string $key
* @param string $value
*/
public function addOption(string $key, string $value): void
{
$this->options[$key] = $value;
}
/**
* @param string $key
* @param string $value
*/
public function addArgument(string $key, string $value): void
{
$this->arguments[$key] = $value;
}
/**
* @return array
*/
public function getFormattedOptions(): array
{
$options = [];
foreach ($this->options as $option => $value) {
$options['--' . $option] = $value;
}
return $options;
}
}

View file

@ -0,0 +1,40 @@
<?php
namespace RetailCrm\ServiceBundle\Messenger;
use RetailCrm\ServiceBundle\Messenger\MessageHandler\JobRunner;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Exception;
/**
* Class MessageHandler
*
* @package RetailCrm\ServiceBundle\Messenger
*/
class MessageHandler implements MessageHandlerInterface
{
/**
* @var JobRunner
*/
private $runner;
/**
* CommandQueueHandler constructor.
*
* @param JobRunner $runner
*/
public function __construct(JobRunner $runner)
{
$this->runner = $runner;
}
/**
* @param Message $message
*
* @throws Exception
*/
public function __invoke(Message $message): void
{
$this->runner->run($message);
}
}

View file

@ -0,0 +1,96 @@
<?php
namespace RetailCrm\ServiceBundle\Messenger\MessageHandler;
use Psr\Log\LoggerInterface;
use RetailCrm\ServiceBundle\Messenger\Message;
use Symfony\Component\HttpKernel\KernelInterface;
use Symfony\Component\Process\Exception\ProcessTimedOutException;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Process;
/**
* Class InNewProcessRunner
*
* @package RetailCrm\ServiceBundle\Messenger\MessageHandler
*/
class InNewProcessRunner implements JobRunner
{
/** @var int Default timeout for process */
public const DEFAULT_TIMEOUT = 3600;
/**
* @var LoggerInterface
*/
private $logger;
/**
* @var KernelInterface
*/
private $kernel;
/**
* CommandQueueHandler constructor.
*
* @param LoggerInterface $logger
* @param KernelInterface $kernel
*/
public function __construct(LoggerInterface $logger, KernelInterface $kernel)
{
$this->logger = $logger;
$this->kernel = $kernel;
}
/**
* {@inheritdoc}
*/
public function run(Message $message): void
{
$phpBinaryPath = (new PhpExecutableFinder)->find();
$consoleCommand = [
'php' => $phpBinaryPath ?: 'php',
'console' => sprintf('%s/bin/console', $this->kernel->getContainer()->getParameter('kernel.project_dir')),
'command' => $message->getCommandName()
];
$process = new Process(
array_merge(
array_values($consoleCommand),
array_values($message->getArguments()),
array_values($this->getOptions($message)),
)
);
try {
$process
->setTimeout(static::DEFAULT_TIMEOUT)
->run(static function(string $type, string $buffer) {
echo $buffer;
})
;
} catch (ProcessTimedOutException $processTimedOutException) {
$this->logger->error(
sprintf(
'Process "%s" killed after %d seconds of execution',
$processTimedOutException->getProcess()->getCommandLine(),
$processTimedOutException->getProcess()->getTimeout()
)
);
}
}
/**
* @param Message $message
*
* @return array
*/
private function getOptions(Message $message): array
{
$options = [];
foreach ($message->getFormattedOptions() as $option => $value) {
$options[] = $option . '=' . $value;
}
return $options;
}
}

View file

@ -0,0 +1,18 @@
<?php
namespace RetailCrm\ServiceBundle\Messenger\MessageHandler;
use RetailCrm\ServiceBundle\Messenger\Message;
/**
* Interface JobRunner
*
* @package RetailCrm\ServiceBundle\Messenger\MessageHandler
*/
interface JobRunner
{
/**
* @param Message $message
*/
public function run(Message $message): void;
}

View file

@ -0,0 +1,63 @@
<?php
namespace RetailCrm\ServiceBundle\Messenger\MessageHandler;
use Psr\Log\LoggerInterface;
use RetailCrm\ServiceBundle\Messenger\Message;
use Symfony\Bundle\FrameworkBundle\Console\Application;
use Symfony\Component\Console\Input\ArrayInput;
use Symfony\Component\Console\Output\BufferedOutput;
use Symfony\Component\HttpKernel\KernelInterface;
use Exception;
/**
* Class SimpleConsoleRunner
*
* @package RetailCrm\ServiceBundle\Messenger\MessageHandler
*/
class SimpleConsoleRunner implements JobRunner
{
/**
* @var LoggerInterface
*/
private $logger;
/**
* @var KernelInterface
*/
private $kernel;
/**
* CommandQueueHandler constructor.
*
* @param LoggerInterface $logger
* @param KernelInterface $kernel
*/
public function __construct(LoggerInterface $logger, KernelInterface $kernel)
{
$this->logger = $logger;
$this->kernel = $kernel;
}
/**
* {@inheritdoc}
*/
public function run(Message $message): void
{
$application = new Application($this->kernel);
$application->setAutoExit(false);
$input = new ArrayInput(
array_merge(
['command' => $message->getCommandName()],
$message->getFormattedOptions(),
$message->getArguments()
)
);
$output = new BufferedOutput();
$application->run($input, $output);
echo $output->fetch();
}
}

View file

@ -16,7 +16,10 @@
"symfony/serializer": "^5.2",
"symfony/http-kernel": "^4.0|^5.0",
"symfony/validator": "^4.0|^5.0",
"symfony/security-guard": "^4.0|^5.0"
"symfony/security-guard": "^4.0|^5.0",
"symfony/console": "^5.2",
"symfony/messenger": "^5.2",
"symfony/process": "^5.2"
},
"autoload": {
"psr-4": {