Archive: Namaste PHP AMQP framework v1.0 (2017-2020)
952 days continuous production uptime, 40k+ tp/s single node. Original corpo Bitbucket history not included — clean archive commit.
This commit is contained in:
575
brokers/rBroker.php
Normal file
575
brokers/rBroker.php
Normal file
@@ -0,0 +1,575 @@
|
||||
<?php
|
||||
/**
|
||||
* readBroker (rBroker.php) -- persistent (daemon) PHP application program
|
||||
*
|
||||
* This is a forking-broker. Which means that, upon execution, the broker program will iteratively start-up a
|
||||
* specific number of child-processes (XML config) and then, as the parent process, will monitor each child.
|
||||
* On a child's death, the signal is trapped via a replacement, custom, signal handler, and a replacement child
|
||||
* is restarted.
|
||||
*
|
||||
* Children are only allowed to execute a finite number of broker events (XML config) before they self-terminate and
|
||||
* are re-incarnated by the parent. This is to mitigate the memory leaks inherent in PHP as PHP applications were
|
||||
* never intended to be used as TSR programs.
|
||||
*
|
||||
* NOTES:
|
||||
* ------
|
||||
* - only the parent PID is written to the PID directory. This feature is for a monitoring program that will restart
|
||||
* the parent broker, but will not monitor/restart the children; only the parent daemon may re-incarnate new children
|
||||
* - custom signal handler for trapping SIGCLD and updating the global child counter
|
||||
* - signals sent to a child (other than SIGKILL) are diverted to the shutDown event RMQ resources are freed
|
||||
*
|
||||
*
|
||||
* @author mike@givingassistant.org
|
||||
* @version 1.0
|
||||
*
|
||||
* HISTORY:
|
||||
* ========
|
||||
* 06-14-17 mks original coding
|
||||
* 08-24-17 mks CORE-500: broker events
|
||||
* 03-14-18 mks CORE-833: fetch event tests for recordsInQuery class member being set before including in the
|
||||
* return-data payload b/c recordsInQuery is only a PDO thing
|
||||
* 05-31-18 mks CORE-1011: update for new XML broker services configuration
|
||||
* 01-03-19 mks DB-78: fixed bug in fetch event where pre-supplied event-GUID value was being over-written
|
||||
* 01-29-20 mks DB-145: router code for tercero requests added to default section in $callback method
|
||||
* 04-03-20 mks ECI-107: added sub-collection fetch, exception trapping on wait(), IDE directives cleaned-up
|
||||
* 07-28-20 mks DB-156: broker self-registration installed
|
||||
*
|
||||
*/
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
use PhpAmqpLib\Channel\AMQPChannel;
|
||||
use PhpAmqpLib\Exception\AMQPChannelClosedException;
|
||||
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
|
||||
use PhpAmqpLib\Exception\AMQPRuntimeException;
|
||||
use PhpAmqpLib\Exception\AMQPTimeoutException;
|
||||
use PhpAmqpLib\Message\AMQPMessage;
|
||||
|
||||
pcntl_async_signals(true); // enable asynchronous signal handling (PHP 7.1)
|
||||
$myPid = getmypid();
|
||||
$_REDIRECT = true; // all output to logfile
|
||||
$topDir = dirname(__DIR__);
|
||||
$thisWatcher = basename(__FILE__);
|
||||
$thisWatcher = rtrim($thisWatcher, ".php");
|
||||
|
||||
// load the framework
|
||||
@require_once($topDir . '/config/sneakerstrap.inc'); // can't be constants b/c this loads the constants
|
||||
|
||||
$childrenPidList = null; // contains list of the pids of the children spawned by this watcher
|
||||
$pidDir = $topDir . DIR_PIDS;
|
||||
$eos = (isset($_SERVER['HTTP_USER_AGENT'])) ? '<br />' : PHP_EOL;
|
||||
$res = 'RBRK: ';
|
||||
$appServerConfig = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_BROKER_APPSERVER];
|
||||
$numberChildren = $appServerConfig[CONFIG_BROKER_INSTANCES][CONFIG_BROKER_R_BROKER];
|
||||
$requestsPerInstance = (empty($appServerConfig[CONFIG_BROKER_REQUEST_LIMIT])) ? NUMBER_C : $appServerConfig[CONFIG_BROKER_REQUEST_LIMIT];
|
||||
$numberChildren = ($numberChildren < 1) ? 1 : $numberChildren;
|
||||
$runningBrokers = $numberChildren;
|
||||
$myRequestsPerInstance = 0;
|
||||
$startingMemory = 0;
|
||||
$file = basename(__FILE__);
|
||||
// create the root guid
|
||||
$groot = rtrim($res, COLON) . UDASH . guid(); // root guid
|
||||
consoleLog($res, CON_SUCCESS, sprintf(INFO_BROKER_STARTUP, substr(basename(__FILE__), 0, -4), $groot));
|
||||
|
||||
/** @var gacErrorLogger $parentLog */
|
||||
$parentLog = new gacErrorLogger();
|
||||
|
||||
// get the location of the broker is supposed to be run
|
||||
$brokerLocation = ENV_APPSERVER;
|
||||
if (!empty($argv) and !empty($argv[1])) {
|
||||
$brokerLocation = $argv[1];
|
||||
}
|
||||
$errors = null;
|
||||
$file = rtrim(basename(__FILE__), DOT . FILE_TYPE_PHP);
|
||||
$service = CONFIG_BROKER_APPSERVER;
|
||||
|
||||
if (!validateService($service, $errors)) {
|
||||
$hdr = sprintf(INFO_LOC, $file, __LINE__);
|
||||
$msg = sprintf(ERROR_SERVICE_REG, $file, $service);
|
||||
$parentLog->fatal($hdr . $msg);
|
||||
$parentLog->__destruct();
|
||||
unset($parentLog);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// set-up the replacement signal handler that will be called on a child's death
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
//declare( ticks = 1);
|
||||
function sigHandler($_sig) {
|
||||
global $numberChildren;
|
||||
switch ($_sig) {
|
||||
case SIGCHLD :
|
||||
$numberChildren--;
|
||||
while (($pid = pcntl_wait($_sig, WNOHANG)) > 0) {
|
||||
@pcntl_wexitstatus($_sig);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
pcntl_signal(SIGCLD, 'sigHandler');
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////////////
|
||||
// set-up the forking function so that it can be called initially or on a SIGCLD event
|
||||
//////////////////////////////////////////////////////////////////////////////////////
|
||||
function forkMe()
|
||||
{
|
||||
global $thisWatcher, $eos, $res, $parentLog, $requestsPerInstance, $myRequestsPerInstance, $startingMemory, $groot, $file;
|
||||
$myRequestsPerInstance = $requestsPerInstance + (mt_rand(0, 2) * 10) + mt_rand(1, 9);
|
||||
$startingMemory = memory_get_usage(true);
|
||||
$thisPid = pcntl_fork();
|
||||
|
||||
switch ($thisPid) {
|
||||
case -1 : // error!!!
|
||||
$cmsg = ERROR_FORK_FAILED . $thisWatcher;
|
||||
$parentLog->fatal($cmsg);
|
||||
die(getDateTime() . CON_ERROR . $res . $cmsg . $eos);
|
||||
break;
|
||||
case 0 : // child (broker daemon)
|
||||
// remove the signal handlers in the child code
|
||||
pcntl_signal(SIGCLD, SIG_DFL);
|
||||
$thisPid = getmypid();
|
||||
|
||||
try {
|
||||
// set-up the child error logger
|
||||
$childLog = new gacErrorLogger();
|
||||
|
||||
// generate a child guid for the forked child...
|
||||
$childGUID = rtrim($res, COLON) . UDASH . guid();
|
||||
|
||||
// toss the childGUID unto cache because it does not propagate down to the callback method
|
||||
gasCache::sysAdd(($groot . UDASH . $thisPid), $childGUID);
|
||||
|
||||
// ---- broker code begins ---- //
|
||||
$queueTag = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_BROKER_QUEUE_TAG];
|
||||
//$exchange = BROKER_EXCHANGE_RO;
|
||||
$queue = $queueTag . BROKER_QUEUE_R;
|
||||
/** @var AMQPStreamConnection $brokerConnection */
|
||||
$brokerConnection = gasResourceManager::fetchResource(RESOURCE_BROKER);
|
||||
if (is_null($brokerConnection)) {
|
||||
$childLog->fatal(ERROR_RESOURCE_404 . RESOURCE_BROKER);
|
||||
consoleLog($res, CON_ERROR, ERROR_RESOURCE_404 . RESOURCE_BROKER);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
/** @var AMQPChannel $brokerChannel */
|
||||
$brokerChannel = $brokerConnection->channel();
|
||||
|
||||
// set up the RPC queue for RO service
|
||||
// params: queue name, passive, durable, exclusive, auto-delete
|
||||
//$brokerChannel->queue_declare($queue, BROKER_QUEUE_DECLARE_PASSIVE, false, false, true);
|
||||
$brokerChannel->queue_declare($queue, BROKER_QUEUE_DECLARE_PASSIVE, false, false, true);
|
||||
} catch (PhpAmqpLib\Exception\AMQPRuntimeException | Throwable | TypeError $t) {
|
||||
$hdr = sprintf(INFO_LOC, $file, __LINE__);
|
||||
@handleExceptionMessaging($hdr, $t->getMessage(), $foo, true);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
// register the child-spawn event
|
||||
$data = [
|
||||
SYSTEM_EVENT_NAME => SYSEV_NAME_CHILD_REG,
|
||||
SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
|
||||
SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
|
||||
SYSTEM_EVENT_BROKER_GUID => $childGUID,
|
||||
SYSTEM_EVENT_KEY => SYSEV_CHILD_RPI,
|
||||
SYSTEM_EVENT_VAL => $myRequestsPerInstance,
|
||||
SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__
|
||||
];
|
||||
@postSystemEvent($data, $childGUID, $childLog);
|
||||
|
||||
register_shutdown_function(BROKER_SHUTDOWN_FUNCTION, $brokerChannel, $brokerConnection, $res);
|
||||
|
||||
$callback = function($_request)
|
||||
{
|
||||
$startTime = gasStatic::doingTime();
|
||||
/** @var AMQPChannel $brokerChannel */
|
||||
global $brokerChannel;
|
||||
/** @var AMQPConnection $brokerConnection */
|
||||
global $brokerConnection;
|
||||
global $requestCounter, $groot, $res, $eos, $myRequestsPerInstance, $startingMemory, $service;
|
||||
$requestCounter++;
|
||||
$aryRetData = null;
|
||||
$retData = null;
|
||||
$request = null;
|
||||
$errorList = [];
|
||||
$eos = (isset($_SERVER['HTTP_USER_AGENT'])) ? '<br />' : PHP_EOL;
|
||||
$eventSuccess = false;
|
||||
$conMsg = '';
|
||||
$eventGUID = guid();
|
||||
$thisPid = getmypid();
|
||||
$eventTimer = false; // certain events will toggle to true to log timer recording for the broker event
|
||||
$childGUID = gasCache::sysGet(($groot . UDASH . getmypid()));
|
||||
|
||||
// set-up the callBack log; logger object for the callback function
|
||||
$callBackLog = new gacErrorLogger($eventGUID);
|
||||
|
||||
if (!firstPassPayloadValidation($_request, $service, $msg, $request, $eventGUID)) {
|
||||
$conMsg = $msg;
|
||||
$aryRetData = buildReturnPayload([false, STATE_FAIL, $msg, null, null]);
|
||||
$callBackLog->info($msg);
|
||||
$event = BROKER_QUEUE_R . '(' . ERROR_DATA_VALIDATION_FIRST_PASS . ')';
|
||||
} elseif (!validateMetaData($request, $errorList)) {
|
||||
if (count($errorList) == 0) {
|
||||
$callBackLog->error(ERROR_DATA_META_REJECTED . STRING_UNKNOWN);
|
||||
$errorList[] = ERROR_DATA_META_REJECTED . STRING_UNKNOWN;
|
||||
$conMsg = FAIL_EVENT . $request[BROKER_REQUEST];
|
||||
} else {
|
||||
for ($index = 0, $last = count($errorList); $index < $last; $index++) {
|
||||
$conMsg .= $errorList[$index] . $eos;
|
||||
$callBackLog->error($errorList[$index]);
|
||||
}
|
||||
$conMsg = rtrim($conMsg, $eos);
|
||||
}
|
||||
$aryRetData = buildReturnPayload([false, STATE_META_ERROR, $errorList, null]);
|
||||
$event = BROKER_QUEUE_R . '(' . ERROR_META_VALIDATION_SECOND_PASS . ')';
|
||||
} else {
|
||||
$event = BROKER_QUEUE_R . '(' . $request[BROKER_REQUEST] . ')';
|
||||
if (is_null($request)) consoleLog($res, CON_ERROR, ERROR_BROKER_REQUEST_404);
|
||||
|
||||
switch ($request[BROKER_REQUEST]) {
|
||||
case BROKER_REQUEST_SHUTDOWN :
|
||||
$_request->delivery_info[BROKER_CHANNEL]->basic_cancel($_request->delivery_info[BROKER_DELIVERY_TAG]);
|
||||
$conMsg = SUCCESS_SHUTDOWN . BROKER_QUEUE_R;
|
||||
$aryRetData = buildReturnPayload([true, STATE_SUCCESS, null, BROKER_REQUEST_SHUTDOWN]);
|
||||
$eventSuccess = false;
|
||||
break;
|
||||
|
||||
case BROKER_REQUEST_PING :
|
||||
$conMsg = SUCCESS_PING . BROKER_QUEUE_R;
|
||||
$aryRetData = buildReturnPayload([true, STATE_SUCCESS, null, SUCCESS_PING . BROKER_QUEUE_R]);
|
||||
$eventSuccess = true;
|
||||
break;
|
||||
|
||||
// request class schema map
|
||||
case BROKER_REQUEST_SCHEMA :
|
||||
$eventTimer = true;
|
||||
if (empty($request[BROKER_META_DATA]) or empty($request[BROKER_META_DATA][META_TEMPLATE])) {
|
||||
$conMsg = ERROR_DATA_404;
|
||||
$aryRetData = buildReturnPayload([false, STATE_DATA_ERROR, null, ERROR_DATA_404]);
|
||||
} else {
|
||||
$obj = null;
|
||||
$errorList = array();
|
||||
$processObject = false;
|
||||
// instantiate the new template class and return a schema report...
|
||||
try {
|
||||
// cant instantiate remove-service objects in production, so we'll inject an skip
|
||||
// directive for the env-check...
|
||||
$obj = new gacFactory($request[BROKER_META_DATA], FACTORY_EVENT_SCHEMA_REQUEST, '', $errorList);
|
||||
$processObject = true;
|
||||
} catch (TypeError $e) {
|
||||
$callBackLog->mirror = true;
|
||||
$callBackLog->warn($e->getMessage());
|
||||
$callBackLog->mirror = false;
|
||||
$conMsg = ERROR_EXCEPTION;
|
||||
$aryRetData = buildReturnPayload([false, STATE_FRAMEWORK_FAIL, [$msg], null]);
|
||||
}
|
||||
if ($processObject) {
|
||||
if (!$obj->status) {
|
||||
$msg = ERROR_CLASS_SCHEMA_404 . COLON . $request[BROKER_META_DATA][META_TEMPLATE];
|
||||
$conMsg = $msg;
|
||||
$callBackLog->error($msg);
|
||||
$errorList[] = $msg;
|
||||
if (!empty($obj->eventMessages)) $errorList = array_merge($errorList, $obj->eventMessages);
|
||||
$aryRetData = buildReturnPayload([false, STATE_TEMPLATE_ERROR, $errorList, null]);
|
||||
} else {
|
||||
$eventSuccess = true;
|
||||
$conMsg = SUCCESS_EVENT . $request[BROKER_REQUEST];
|
||||
$aryRetData = buildReturnPayload([true, STATE_SUCCESS, null, $obj->schema]);
|
||||
}
|
||||
}
|
||||
if (is_object($obj)) $obj->__destruct();
|
||||
unset($obj);
|
||||
}
|
||||
break;
|
||||
|
||||
case BROKER_REQUEST_FETCH :
|
||||
$eventTimer = true;
|
||||
if (!isset($request[BROKER_META_DATA][META_TEMPLATE]) or empty($request[BROKER_META_DATA][META_TEMPLATE])) {
|
||||
$conMsg = ERROR_TEMPLATE_FILE_404;
|
||||
$aryRetData = buildReturnPayload([false, STATE_META_ERROR, ERROR_TEMPLATE_FILE_404, BROKER_REQUEST_CREATE]);
|
||||
} else {
|
||||
// invoke the broker-helper to execute the fetch request
|
||||
$bh = new gacBrokerHelper();
|
||||
$eventSuccess = $bh->fetch($request, $aryRetData, $conMsg);
|
||||
unset($bh);
|
||||
}
|
||||
break;
|
||||
|
||||
case BROKER_REQUEST_SUBC_FETCH :
|
||||
if (!isset($request[BROKER_META_DATA][META_TEMPLATE]) or empty($request[BROKER_META_DATA][META_TEMPLATE])) {
|
||||
$conMsg = ERROR_TEMPLATE_FILE_404;
|
||||
$aryRetData = buildReturnPayload([false, STATE_DATA_ERROR, ERROR_TEMPLATE_FILE_404, BROKER_REQUEST_SUBC_FETCH]);
|
||||
} elseif (!isset($request[BROKER_DATA][STRING_SUBC_COL]) or empty($request[BROKER_DATA][STRING_SUBC_COL])) {
|
||||
$conMsg = ERROR_DATA_KEY_404 . STRING_SUBC_COL;
|
||||
$aryRetData = buildReturnPayload([false, STATE_DATA_ERROR, $conMsg, BROKER_REQUEST_SUBC_FETCH]);
|
||||
} elseif (!isset($request[BROKER_DATA][STRING_SUBC_DATA]) or empty($request[BROKER_DATA][STRING_SUBC_DATA])) {
|
||||
$conMsg = ERROR_DATA_KEY_404 . STRING_SUBC_DATA;
|
||||
$aryRetData = buildReturnPayload([false, STATE_DATA_ERROR, $conMsg, BROKER_REQUEST_SUBC_FETCH]);
|
||||
} else {
|
||||
$errors = [];
|
||||
/** @var gacMongoDB $objClass */
|
||||
if (is_null($objClass = grabWidget($request[BROKER_META_DATA], '', $errorList))) {
|
||||
foreach ($errorList as $error)
|
||||
$callBackLog->error($error);
|
||||
} else {
|
||||
// todo -- for now, sub-collection fetches are only allowed on appServer
|
||||
// check that this is a mongo object, exit if it is not
|
||||
if ($objClass->schema != TEMPLATE_DB_MONGO) {
|
||||
$conMsg = ERROR_SCHEMA_MISMATCH . $request[BROKER_META_DATA][META_TEMPLATE];
|
||||
$errors[] = $conMsg;
|
||||
$errors[] = INFO_SCHEMA . $objClass->schema;
|
||||
$aryRetData = buildReturnPayload([false, STATE_FAIL, $errors, null]);
|
||||
} else {
|
||||
$objClass->fetchSubCollectionRecord($request[BROKER_DATA]);
|
||||
if (!$objClass->status) {
|
||||
$conMsg = ERROR_SUBC_FETCH;
|
||||
$aryRetData = buildReturnPayload([ false, $objClass->state, $objClass->eventMessages, null]);
|
||||
} else {
|
||||
$eventSuccess = true;
|
||||
$conMsg = SUCCESS_EVENT . BROKER_REQUEST_SUBC_FETCH;
|
||||
$queryMeta = [
|
||||
STRING_REC_COUNT_RET => $objClass->recordsReturned,
|
||||
STRING_REC_COUNT_QUERY => $objClass->recordsInQuery,
|
||||
STRING_REC_COUNT_TOT => $objClass->recordsInCollection
|
||||
];
|
||||
if ($objClass->state == STATE_NOT_FOUND and $objClass->count == 0) {
|
||||
$retData = [STRING_QUERY_RESULTS => null, STRING_QUERY_DATA => $queryMeta];
|
||||
} else {
|
||||
// cacheMapping call
|
||||
if (!gasCache::mapOutboundPayload($objClass, $errors)) {
|
||||
$queryResults = $objClass->getData();
|
||||
} else {
|
||||
// cache mapping succeeded - return the cache key
|
||||
$queryResults = $objClass->getCK();
|
||||
}
|
||||
$retData = [STRING_QUERY_RESULTS => $queryResults, STRING_QUERY_DATA => $queryMeta];
|
||||
}
|
||||
$aryRetData = buildReturnPayload([true, $objClass->state, $objClass->eventMessages, $retData]);
|
||||
}
|
||||
}
|
||||
if (is_object($objClass)) $objClass->__destruct();
|
||||
unset($objClass);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case BROKER_REQUEST_QUERY_COUNT :
|
||||
$eventTimer = true;
|
||||
if (!isset($request[BROKER_META_DATA][META_TEMPLATE]) or empty($request[BROKER_META_DATA][META_TEMPLATE])) {
|
||||
$conMsg = ERROR_TEMPLATE_FILE_404;
|
||||
$aryRetData = buildReturnPayload([false, STATE_META_ERROR, ERROR_TEMPLATE_FILE_404, BROKER_REQUEST_CREATE]);
|
||||
} else {
|
||||
$errors = [];
|
||||
/** @var gacMongoDB $objClass */
|
||||
if (is_null($objClass = grabWidget($request[BROKER_META_DATA], '', $errors))) {
|
||||
foreach ($errors as $error)
|
||||
$callBackLog->error($error);
|
||||
} else {
|
||||
if (!$objClass->_getQC($request[BROKER_DATA])) {
|
||||
$conMsg = FAIL_EVENT . BROKER_REQUEST_QUERY_COUNT;
|
||||
$aryRetData = buildReturnPayload([ false, $objClass->state, $objClass->eventMessages, null]);
|
||||
} else {
|
||||
$eventSuccess = true;
|
||||
$conMsg = SUCCESS_EVENT . BROKER_REQUEST_QUERY_COUNT;
|
||||
$aryRetData = buildReturnPayload([ true, STATE_SUCCESS, $objClass->eventMessages, [$objClass->recordsInQuery, $objClass->recordsInCollection ] ] );
|
||||
}
|
||||
if (is_object($objClass)) $objClass->__destruct();
|
||||
unset($objClass);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
case BROKER_REQUEST_TERCERO :
|
||||
$eventTimer = true;
|
||||
// just as a reminder, we don't check for the existence of META_TEMPLATE in the validateMetaData()
|
||||
// function because not all events require it - hence the seemingly repetitive check in the event code.
|
||||
if (!isset($request[BROKER_META_DATA][META_TEMPLATE]) or empty($request[BROKER_META_DATA][META_TEMPLATE])) {
|
||||
$conMsg = ERROR_TEMPLATE_FILE_404;
|
||||
$aryRetData = buildReturnPayload([false, STATE_META_ERROR, ERROR_TEMPLATE_FILE_404, null]);
|
||||
} elseif (!isset($request[OLD_REQUEST]) or empty($request[OLD_REQUEST])) {
|
||||
$conMsg = ERROR_REQUEST_404 . COLON . OLD_REQUEST;
|
||||
$aryRetData = buildReturnPayload([false, STATE_DATA_ERROR, $conMsg, null]);
|
||||
} else {
|
||||
// this is a request for tercero - replace the event, instantiate a tercero client,
|
||||
// and cross-service publish the request and return the response back to the caller
|
||||
$bc = new gacBrokerClient(BROKER_QUEUE_U, sprintf(INFO_LOC, basename(__FILE__), __LINE__));
|
||||
if (!$bc->status) {
|
||||
$conMsg = ERROR_BROKER_CLIENT_DECLARE . BROKER_QUEUE_U;
|
||||
$aryRetData = buildReturnPayload([ false, STATE_FRAMEWORK_WARNING, $conMsg, null]);
|
||||
} else {
|
||||
$request[BROKER_REQUEST] = $request[OLD_REQUEST];
|
||||
$aryRetData = json_decode(gzuncompress($bc->call(gzcompress(json_encode($request)))),true);
|
||||
if ($aryRetData[PAYLOAD_STATUS]) {
|
||||
$eventSuccess = true;
|
||||
$conMsg = SUCCESS_EVENT . $request[OLD_REQUEST] . ' for ' . BROKER_TERCERO;
|
||||
} else {
|
||||
$conMsg = FAIL_EVENT . $request[OLD_REQUEST] . ' for ' . BROKER_TERCERO;
|
||||
}
|
||||
}
|
||||
if (is_object($bc)) $bc->__destruct();
|
||||
unset($bc);
|
||||
}
|
||||
break;
|
||||
|
||||
default :
|
||||
// check for user template in meta payload and, if exists, publish the request to the user
|
||||
// and pass the return payload back to the requesting client
|
||||
if (isset($request[BROKER_META_DATA][META_TEMPLATE]) and $request[BROKER_META_DATA][META_TEMPLATE] == TEMPLATE_CLASS_USERS) {
|
||||
$ubc = new gacBrokerClient(BROKER_QUEUE_U, basename(__FILE__) . AT . __LINE__);
|
||||
if (!$ubc->status) {
|
||||
$msg = sprintf(INFO_LOC, basename(__FILE__), __LINE__) . ERROR_BROKER_CLIENT_DECLARE . BROKER_QUEUE_U;
|
||||
$conMsg = $msg;
|
||||
$aryRetData = buildReturnPayload([false, STATE_FRAMEWORK_FAIL, null, $msg]);
|
||||
} else {
|
||||
$response = $ubc->call($request);
|
||||
$response = json_decode(gzuncompress($response), true);
|
||||
$aryRetData = buildReturnPayload([$response[PAYLOAD_STATUS], $response[PAYLOAD_STATE], $response[PAYLOAD_DIAGNOSTICS], $response[PAYLOAD_RESULTS]]);
|
||||
if ($response[PAYLOAD_STATUS]) {
|
||||
$conMsg = SUCCESS_EVENT;
|
||||
$eventSuccess = true;
|
||||
} else $conMsg = FAIL_EVENT;
|
||||
$conMsg .= $request[BROKER_REQUEST];
|
||||
if (is_object($ubc)) $ubc->__destruct();
|
||||
unset($ubc);
|
||||
}
|
||||
} else {
|
||||
$msg = ERROR_EVENT_404 . $request[BROKER_REQUEST];
|
||||
$conMsg = $msg;
|
||||
$aryRetData = buildReturnPayload([false, STATE_DOES_NOT_EXIST, null, $msg]);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// make doubly-damn sure we have a return-payload and a console message
|
||||
if (empty($aryRetData)) {
|
||||
$msg = ERROR_NO_RET_DATA;
|
||||
$conMsg = BROKER_QUEUE_R . ' - ' . $msg;
|
||||
$aryRetData = buildReturnPayload([false, STATE_FRAMEWORK_FAIL, null, $msg, null]);
|
||||
} elseif ($eventSuccess and empty($conMsg)) {
|
||||
$callBackLog->warn(ERROR_NO_CON_MSG);
|
||||
$conMsg = $request[BROKER_REQUEST] . ' - ' . STATE_SUCCESS;
|
||||
}
|
||||
|
||||
// prepare the return payload...
|
||||
// $eventSuccess = false;
|
||||
try {
|
||||
/** @noinspection PhpUndefinedMethodInspection */
|
||||
$msg = new AMQPMessage(gzcompress(json_encode($aryRetData)), array(BROKER_CORRELATION_ID => $_request->get(BROKER_CORRELATION_ID)));
|
||||
/** @noinspection PhpUndefinedMethodInspection */
|
||||
$_request->delivery_info[BROKER_CHANNEL]->basic_publish($msg, '', $_request->get(BROKER_REPLY_TO));
|
||||
$_request->delivery_info[BROKER_CHANNEL]->basic_ack($_request->delivery_info[BROKER_DELIVERY_TAG]);
|
||||
} catch (AMQPTimeoutException | TypeError | AMQPRuntimeException | Throwable $e) {
|
||||
$m = $e->getMessage();
|
||||
$callBackLog->fatal($m);
|
||||
consoleLog($res, CON_ERROR, $m);
|
||||
}
|
||||
|
||||
// if the event processing failed, we want to publish the failed event to the admin queue
|
||||
// if (!$eventSuccess) {
|
||||
// todo - CORE-452 - publish the event(payload) to the admin queue to capture the failed event
|
||||
// }
|
||||
|
||||
unset($msg);
|
||||
consoleLog($res, (($eventSuccess) ? CON_SUCCESS : CON_ERROR), $conMsg . sprintf(ERROR_EVENT_COUNT,$requestCounter, $myRequestsPerInstance));
|
||||
|
||||
// publish event metrics if we've toggled the switch on
|
||||
if ($eventTimer) {
|
||||
// get the broker-event processing time
|
||||
$eventTime = gasStatic::doingTime($startTime);
|
||||
$data = [
|
||||
SYSTEM_EVENT_NAME => SYSEV_NAME_EVENT_TIMER,
|
||||
SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
|
||||
SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
|
||||
SYSTEM_EVENT_BROKER_GUID => $childGUID,
|
||||
DB_EVENT_GUID => $eventGUID,
|
||||
SYSTEM_EVENT_COUNT => $requestCounter,
|
||||
SYSTEM_EVENT_COUNT_TOTAL => $myRequestsPerInstance,
|
||||
SYSTEM_EVENT_TIMER => $eventTime,
|
||||
SYSTEM_EVENT_BROKER_EVENT => $event,
|
||||
SYSTEM_EVENT_META_DATA => $request[BROKER_META_DATA],
|
||||
SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__
|
||||
];
|
||||
if (!empty($childGUID)) $data[SYSTEM_EVENT_OGUID] = $childGUID;
|
||||
@postSystemEvent($data, $childGUID, $callBackLog);
|
||||
}
|
||||
|
||||
if ($requestCounter >= $myRequestsPerInstance) {
|
||||
if (getmypid() == $thisPid) {
|
||||
$meta = [
|
||||
META_SESSION_IP => STRING_SESSION_HOME,
|
||||
META_SESSION_DAEMON => 1,
|
||||
META_SESSION_MISC => INFO_BROKER_RECYCLE,
|
||||
META_EVENT_GUID => $eventGUID
|
||||
];
|
||||
$data = [
|
||||
SYSTEM_EVENT_NAME => SYSEV_NAME_BROKER_RECYCLE,
|
||||
SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
|
||||
SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
|
||||
SYSTEM_EVENT_BROKER_GUID => $childGUID,
|
||||
DB_EVENT_GUID => $eventGUID,
|
||||
SYSTEM_EVENT_START => $startingMemory,
|
||||
SYSTEM_EVENT_PEAK => memory_get_peak_usage(true),
|
||||
SYSTEM_EVENT_END => memory_get_usage(true),
|
||||
SYSTEM_EVENT_BROKER_EVENT => $event,
|
||||
SYSTEM_EVENT_COUNT => $requestCounter,
|
||||
SYSTEM_EVENT_COUNT_TOTAL => $myRequestsPerInstance,
|
||||
SYSTEM_EVENT_META_DATA => $meta,
|
||||
SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__
|
||||
];
|
||||
@postSystemEvent($data, $eventGUID, $callBackLog);
|
||||
}
|
||||
consoleLog($res, CON_SYSTEM, INFO_BROKER_REQ_COUNT);
|
||||
if (is_object($brokerChannel)) $brokerChannel->close();
|
||||
if (is_object($brokerConnection)) $brokerConnection->disconnect(); // changed from close() which DNE
|
||||
exit(0);
|
||||
}
|
||||
};
|
||||
consoleLog($res, CON_SYSTEM, sprintf(INFO_BROKER_QUEUE_ESTABLISHED, BROKER_QUEUE_R, $thisPid, $myRequestsPerInstance));
|
||||
$brokerChannel->basic_qos(null, 1, null);
|
||||
$brokerChannel->basic_consume($queue, '', false, false, false, false, $callback);
|
||||
while (count($brokerChannel->callbacks)) {
|
||||
try {
|
||||
$brokerChannel->wait();
|
||||
} catch (AMQPChannelClosedException | AMQPInvalidArgumentException | AMQPRuntimeException| Throwable $t) {
|
||||
$hdr = sprintf(INFO_LOC, basename(__FILE__), __LINE__);
|
||||
@handleExceptionMessaging($hdr, $t->getMessage(), $foo, true);
|
||||
}
|
||||
}
|
||||
// ---- broker code ends ---- //
|
||||
break;
|
||||
case 1 : // parent
|
||||
// do nothing
|
||||
break;
|
||||
}
|
||||
return($thisPid);
|
||||
}
|
||||
|
||||
for ($numBrokers = 0; $numBrokers < $runningBrokers; $numBrokers++) {
|
||||
$childrenPidList[] = forkMe();
|
||||
}
|
||||
consoleLog($res, CON_SUCCESS, sprintf(INFO_BROKER_PARENT_STARTED, count($childrenPidList), BROKER_QUEUE_R));
|
||||
|
||||
// "register" the broker instantiation event
|
||||
$data = [
|
||||
SYSTEM_EVENT_NAME => SYSEV_NAME_GROOT_REG,
|
||||
SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
|
||||
SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
|
||||
SYSTEM_EVENT_KEY => STRING_NUMBER_CHILDREN,
|
||||
SYSTEM_EVENT_VAL => $numberChildren,
|
||||
SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__,
|
||||
SYSTEM_EVENT_NOTES => BROKER_SYSEV_REG . rtrim($res, ": ")
|
||||
];
|
||||
@postSystemEvent($data, $groot, $parentLog);
|
||||
|
||||
// the parent process continues to run...it looks for any of the children in it's process group to die...
|
||||
// when a child dies, it's death-rattle is caught and the child is replaced with a new process.
|
||||
while (count($childrenPidList)) {
|
||||
$lastPid = 0;
|
||||
$newPidList = null;
|
||||
$result = pcntl_waitpid(0, $status); // detect any sigchld from the parent-group
|
||||
if (in_array($result, $childrenPidList)) {
|
||||
$key = array_search($result, $childrenPidList);
|
||||
array_splice($childrenPidList, $key, 1);
|
||||
// process has already exited -- restart it
|
||||
$childrenPidList[] = forkMe();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user