952 days continuous production uptime, 40k+ tp/s single node. Original corpo Bitbucket history not included — clean archive commit.
520 lines
29 KiB
PHP
520 lines
29 KiB
PHP
<?php
|
|
/**
|
|
* adminBrokerIn.php -- the admin-in broker client
|
|
*
|
|
* this broker is part of the administrative-services suite - and is intended to "live" on the admin instance.
|
|
*
|
|
* the primary purpose of this broker is to accept incoming system, audit or journaling events. This is a direct
|
|
* broker (fire-n-forget) that does not publish a response to the event.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 06-15-17 mks original coding
|
|
* 08-16-17 mks CORE-500: cleaned-up some IDE warnings
|
|
* 08-21-17 mks CORE-500: completed coding for systemEvents->brokerEvents tracking
|
|
* 02-06-18 mks _INF-139: PHP 7.2 exception handling
|
|
* 05-31-18 mks CORE-1011: update for new XML broker services configuration
|
|
* 10-17-18 mks DB-72: audit event coding
|
|
* 02-11-19 mks DB-100: offloaded a chunk of broker-event code into core for smaller footprint
|
|
* 09-19-19 mks DB-136: better exception handling, moved log/metric code to respective brokers
|
|
* fixed console log message where auditIn event always generating error message on success
|
|
* 07-28-20 mks DB-156: broker self-registration installed
|
|
* 09-17-20 mks DB-168: updated service registration, updated exception handling to current standard
|
|
*
|
|
*/
|
|
//use PhpAmqpLib\Connection\AMQPStreamConnection;
|
|
use PhpAmqpLib\Channel\AMQPChannel;
|
|
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
|
use PhpAmqpLib\Exception\AMQPChannelClosedException;
|
|
use PhpAmqpLib\Exception\AMQPRuntimeException;
|
|
use PhpAmqpLib\Exception\AMQPTimeoutException;
|
|
|
|
//use PhpAmqpLib\Message\AMQPMessage;
|
|
//use PhpAmqpLib\Exception\AMQPTimeoutException;
|
|
|
|
pcntl_async_signals(true); // enable asynchronous signal handling (PHP 7.1)
|
|
$_REDIRECT = true;
|
|
$topDir = dirname(__DIR__);
|
|
// load the framework
|
|
@require_once($topDir . '/config/sneakerstrap.inc'); // can't be constants b/c this loads the constants
|
|
$res = 'ADMI: ';
|
|
|
|
// event management for children
|
|
$adminServiceConfig = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_ADMIN];
|
|
$numberChildren = $adminServiceConfig[CONFIG_BROKER_INSTANCES][CONFIG_ADMIN_BROKER_IN];
|
|
$requestsPerInstance = (empty($adminServiceConfig[CONFIG_BROKER_REQUEST_LIMIT])) ? NUMBER_C : $adminServiceConfig[CONFIG_BROKER_REQUEST_LIMIT];
|
|
$numberChildren = ($numberChildren < 1) ? 1 : $numberChildren; // todo -- should this be = 2??
|
|
$runningBrokers = $numberChildren;
|
|
$requestCounter = 0;
|
|
$myRequestsPerInstance = 0;
|
|
$startingMemory = 0;
|
|
$groot = rtrim($res, COLON) . UDASH . guid(); // root guid
|
|
consoleLog($res, CON_SUCCESS, sprintf(INFO_BROKER_STARTUP, substr(basename(__FILE__), 0, -4), $groot));
|
|
$parentLog = new gacErrorLogger();
|
|
$errors = null;
|
|
$file = rtrim(basename(__FILE__), DOT . FILE_TYPE_PHP);
|
|
$service = ENV_ADMIN;
|
|
|
|
if (!validateService($service, $errors)) {
|
|
$hdr = sprintf(INFO_LOC, $file, __LINE__);
|
|
$msg = sprintf(ERROR_SERVICE_REG, $file, $service);
|
|
$parentLog->fatal($hdr . $msg);
|
|
$parentLog->__destruct();
|
|
unset($parentLog);
|
|
exit(1);
|
|
}
|
|
|
|
//////////////////////////////////////////////////////////////////////////////////
|
|
// set-up the replacement signal handler that will be called on a child's death //
|
|
//////////////////////////////////////////////////////////////////////////////////
|
|
//declare( ticks = 1);
|
|
function sigHandler($_sig) {
|
|
global $numberChildren;
|
|
switch ($_sig) {
|
|
case SIGCHLD :
|
|
$numberChildren--;
|
|
while (($pid = pcntl_wait($_sig, WNOHANG)) > 0) {
|
|
@pcntl_wexitstatus($_sig);
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
pcntl_signal(SIGCLD, 'sigHandler');
|
|
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
// set-up the forking function so that it can be called initially or on a SIGCLD event //
|
|
/////////////////////////////////////////////////////////////////////////////////////////
|
|
function forkMe()
|
|
{
|
|
global $thisWatcher, $eos, $res, $parentLog, $requestsPerInstance, $myRequestsPerInstance, $startingMemory, $groot;
|
|
$myRequestsPerInstance = $requestsPerInstance + (mt_rand(0, 2) * 10) + mt_rand(0, 9);
|
|
$startingMemory = memory_get_usage(true);
|
|
|
|
$thisPid = pcntl_fork();
|
|
|
|
switch ($thisPid) {
|
|
case -1 : // error
|
|
$cmsg = ERROR_FORK_FAILED . $thisWatcher;
|
|
$parentLog->fatal($cmsg);
|
|
die(getDateTime() . CON_ERROR . $res . $cmsg . $eos);
|
|
break;
|
|
case 0 : // child (broker daemon)
|
|
// replace the sigcld signal handler
|
|
pcntl_signal(SIGCLD, SIG_DFL);
|
|
$thisPid = getmypid();
|
|
|
|
$childGUID = rtrim($res, COLON) . UDASH . guid();
|
|
|
|
try {
|
|
// toss the childGUID unto cache because it does not propagate down to the callback method
|
|
gasCache::sysAdd(($groot . UDASH . $thisPid), $childGUID);
|
|
|
|
// create the child logger object
|
|
/** @var gacErrorLogger $childLog */
|
|
$childLog = new gacErrorLogger();
|
|
|
|
$queueTag = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_BROKER_QUEUE_TAG];
|
|
|
|
$queue = $queueTag . BROKER_QUEUE_AI;
|
|
/** @var AMQPStreamConnection $brokerConnection */
|
|
$brokerConnection = gasResourceManager::fetchResource(RESOURCE_ADMIN);
|
|
if (is_null($brokerConnection)) {
|
|
$hdr = basename(__FILE__) . AT . __LINE__ . COLON;
|
|
$childLog->fatal($hdr . ERROR_RESOURCE_404 . RESOURCE_ADMIN . COLON . BROKER_QUEUE_AI);
|
|
consoleLog($res, CON_ERROR,$hdr . ERROR_RESOURCE_404 . RESOURCE_ADMIN . COLON . BROKER_QUEUE_AI);
|
|
exit(1); // shell-script exit value for fail
|
|
}
|
|
$brokerChannel = $brokerConnection->channel();
|
|
// $brokerChannel->queue_declare($queue, BROKER_QUEUE_DECLARE_PASSIVE, false, false, true);
|
|
$brokerChannel->queue_declare($queue);
|
|
} catch (AMQPRuntimeException | AMQPTimeoutException | Throwable | TypeError $t) {
|
|
$hdr = basename(__FILE__) . AT . __LINE__ . COLON;
|
|
@handleExceptionMessaging($hdr, $t->getMessage(), $foo, true);
|
|
exit(1);
|
|
}
|
|
|
|
// register the broker child start-up as a system-event
|
|
$data = [
|
|
SYSTEM_EVENT_NAME => SYSEV_NAME_CHILD_REG,
|
|
SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
|
|
SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
|
|
SYSTEM_EVENT_BROKER_GUID => $childGUID,
|
|
SYSTEM_EVENT_KEY => SYSEV_CHILD_RPI,
|
|
SYSTEM_EVENT_VAL => $myRequestsPerInstance,
|
|
SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__
|
|
];
|
|
@postSystemEvent($data, $childGUID, $childLog);
|
|
|
|
register_shutdown_function(BROKER_SHUTDOWN_FUNCTION, $brokerChannel, $brokerConnection, $res);
|
|
$callback = function($_request) {
|
|
$startTime = gasStatic::doingTime();
|
|
global $requestCounter, $res, $eos, $myRequestsPerInstance, $startingMemory, $groot, $service;
|
|
/** @var AMQPChannel $brokerChannel */
|
|
global $brokerChannel;
|
|
/** @var PhpAmqpLib\Connection\AMQPStreamConnection $brokerConnection */
|
|
global $brokerConnection;
|
|
|
|
$childGUID = gasCache::sysGet(($groot . UDASH . getmypid()));
|
|
if (gasConfig::$settings[CONFIG_DEBUG]) {
|
|
consoleLog($res, CON_DEBUG, 'Child GUID: ' . $childGUID);
|
|
consoleLog($res,CON_DEBUG, 'root GUID: ' . $groot);
|
|
}
|
|
|
|
$requestCounter++;
|
|
$returnData = null;
|
|
$eventTimer = false;
|
|
$request = null;
|
|
$eventSuccess = false;
|
|
$conMsg = '';
|
|
$errorList = array();
|
|
$thisPid = getmypid();
|
|
$eventGUID = guid();
|
|
$ogGUID = '';
|
|
|
|
// set-up the call-back logger
|
|
$callBackLog = new gacErrorLogger($eventGUID, false);
|
|
if (!firstPassPayloadValidation($_request, $service, $msg, $request, $eventGUID)) {
|
|
$conMsg = $msg;
|
|
$callBackLog->info($msg);
|
|
$event = BROKER_QUEUE_AI . '(' . ERROR_DATA_VALIDATION_FIRST_PASS . ')';
|
|
} elseif (!validateMetaData($request, $errorList)) {
|
|
for ($index = 0, $last = count($errorList); $index < $last; $index++) {
|
|
$conMsg .= $errorList[$index] . $eos;
|
|
$callBackLog->error($errorList[$index]);
|
|
}
|
|
$conMsg = rtrim($conMsg, $eos);
|
|
$event = BROKER_QUEUE_AI . '(' . ERROR_META_VALIDATION_SECOND_PASS . ')';
|
|
} else {
|
|
$event = BROKER_QUEUE_AI . '(' . $request[BROKER_REQUEST] . ')';
|
|
switch ($request[BROKER_REQUEST]) {
|
|
case BROKER_REQUEST_SHUTDOWN :
|
|
// $_request->delivery_info[BROKER_CHANNEL]->basic_cancel($_request->delivery_info[BROKER_DELIVERY_TAG]);
|
|
$conMsg = SUCCESS_SHUTDOWN;
|
|
$eventSuccess = true;
|
|
break;
|
|
|
|
case BROKER_REQUEST_PING :
|
|
$conMsg = SUCCESS_PING . BROKER_QUEUE_AI;
|
|
$eventSuccess = true;
|
|
break;
|
|
|
|
case BROKER_REQUEST_CREATE :
|
|
$eventTimer = true;
|
|
$conMsg = '';
|
|
// validate that we have a data-template in meta
|
|
if (!isset($request[BROKER_META_DATA][META_TEMPLATE]) or empty($request[BROKER_META_DATA][META_TEMPLATE])) {
|
|
$conMsg = ERROR_TEMPLATE_FILE_404;
|
|
} elseif (!isset($request[BROKER_META_DATA][META_CLIENT]) or $request[BROKER_META_DATA][META_CLIENT] != CLIENT_SYSTEM) {
|
|
$conMsg = ERROR_BROKER_CLIENT_NOT_AUTH;
|
|
} else {
|
|
$bh = new gacBrokerHelper();
|
|
$eventSuccess = $bh->create($request, $aryRetData, $conMsg);
|
|
unset($bh);
|
|
}
|
|
break;
|
|
|
|
case BROKER_REQUEST_UPDATE :
|
|
$eventTimer = true;
|
|
$conMsg = '';
|
|
if (!isset($request[BROKER_META_DATA][META_TEMPLATE]) or empty($request[BROKER_META_DATA][META_TEMPLATE])) {
|
|
$conMsg = ERROR_TEMPLATE_FILE_404;
|
|
} elseif (!isset($request[BROKER_META_DATA][META_CLIENT]) or $request[BROKER_META_DATA][META_CLIENT] != CLIENT_SYSTEM) {
|
|
$conMsg = ERROR_BROKER_CLIENT_NOT_AUTH;
|
|
} else {
|
|
$bh = new gacBrokerHelper();
|
|
$eventSuccess = $bh->update($request, $aryRetData, $conMsg);
|
|
unset($bh);
|
|
}
|
|
break;
|
|
|
|
case BROKER_REQUEST_ADMIN_BROKER_EVENT:
|
|
$eventTimer = true; // set to true if you want to log the processing-time for an event
|
|
if (!isset($request[BROKER_DATA]) or empty($request[BROKER_DATA])) {
|
|
$msg = ERROR_DATA_MISSING_ARRAY . STRING_DATA;
|
|
$conMsg = $msg;
|
|
$callBackLog->data($msg);
|
|
} else {
|
|
if (isset($request[BROKER_META_DATA][META_EVENT_GUID])) {
|
|
$ogGUID = $request[BROKER_META_DATA][META_EVENT_GUID];
|
|
}
|
|
// disable auditing so we don't get into an infinite loop creating a new system event record
|
|
$metaCopy = $request[BROKER_META_DATA];
|
|
$metaCopy[META_AUDIT_EVENT] = 1;
|
|
$tmpObj = new gacSystemEvents($metaCopy);
|
|
if ($tmpObj->status) {
|
|
$tmpObj->_createRecord($request[BROKER_DATA]);
|
|
if ($tmpObj->status) {
|
|
$conMsg = SUCCESS_EVENT . BROKER_REQUEST_CREATE;
|
|
$eventSuccess = true;
|
|
} else {
|
|
$conMsg = FAIL_EVENT . BROKER_REQUEST_CREATE;
|
|
}
|
|
}
|
|
if (is_object($tmpObj)) $tmpObj->__destruct();
|
|
unset($tmpObj);
|
|
}
|
|
break;
|
|
|
|
// DB-72: Audit Event
|
|
case BROKER_REQUEST_ADMIN_AUDIT_CREATE :
|
|
$eventTimer = true;
|
|
$journalData = [];
|
|
$errorList = [];
|
|
$haveJournal = false;
|
|
if (!isset($request[BROKER_DATA]) or empty($request[BROKER_DATA])) {
|
|
$conMsg = ERROR_DATA_MISSING_ARRAY . STRING_DATA;
|
|
$callBackLog->data($conMsg);
|
|
} elseif (!isset($request[BROKER_DATA][SYSTEM_EVENT_DATA]) or empty($request[BROKER_DATA][SYSTEM_EVENT_DATA])) {
|
|
$conMsg = ERROR_DATA_MISSING_ARRAY . SYSTEM_EVENT_DATA;
|
|
$callBackLog->data($conMsg);
|
|
} else {
|
|
try {
|
|
// instantiate a system event object
|
|
$objSysEv = new gacSystemEvents($request[BROKER_META_DATA]);
|
|
if (!$objSysEv->status) {
|
|
$conMsg = ERROR_TEMPLATE_INSTANTIATE . TEMPLATE_CLASS_SYS_EVENTS;
|
|
$callBackLog->error($conMsg);
|
|
} else {
|
|
$objSysEv->_createRecord([$request[BROKER_DATA][SYSTEM_EVENT_DATA]], DATA_AUDT);
|
|
unset($request[BROKER_DATA][SYSTEM_EVENT_DATA]);
|
|
// grab journaling data if it exists and set a flag
|
|
if (array_key_exists(STRING_JOURNAL_DATA, $request[BROKER_DATA])) {
|
|
$journalData = $request[BROKER_DATA][STRING_JOURNAL_DATA];
|
|
unset($request[BROKER_DATA][STRING_JOURNAL_DATA]);
|
|
$haveJournal = true;
|
|
}
|
|
if (!$objSysEv->status) {
|
|
$conMsg = sprintf(ERROR_DATA_IMPORT, SYSTEM_EVENT_DATA, $objSysEv->class);
|
|
$callBackLog->data($conMsg);
|
|
} else {
|
|
/** @var gacMongoDB $objAudit */
|
|
if (is_null($objAudit = grabWidget($request[BROKER_META_DATA], '', $errorList))) {
|
|
foreach ($errorList as $error)
|
|
$callBackLog->error($error);
|
|
} else {
|
|
$systemEventToken = $objSysEv->getColumn(DB_EVENT_GUID);
|
|
$rc = $objAudit->launchAudit($request, $haveJournal, $systemEventToken, $journalData);
|
|
$conMsg = ($rc) ? SUCCESS_AUDIT_EVENT : ERROR_AUDIT_GENERIC_FAIL;
|
|
if (!$rc and count($objAudit->eventMessages)) {
|
|
consoleLog($res, CON_ERROR, ERROR_AUDIT_FAIL);
|
|
foreach ($objAudit->eventMessages as $errorMessage) {
|
|
consoleLog($res, CON_ERROR, $errorMessage);
|
|
}
|
|
} elseif (!$rc) {
|
|
consoleLog($res, CON_ERROR, ERROR_AUDIT_FAILED);
|
|
$conMsg = ERROR_AUDIT_FAILED;
|
|
} else {
|
|
$eventSuccess = true;
|
|
$conMsg = SUCCESS_EVENT . $request[BROKER_REQUEST];
|
|
}
|
|
if (is_object($objAudit)) $objAudit->__destruct();
|
|
unset($objAudit);
|
|
}
|
|
}
|
|
if (is_object($objSysEv)) $objSysEv->__destruct();
|
|
unset($objSysEv);
|
|
}
|
|
} catch (TypeError | Throwable $t) {
|
|
$hdr = basename(__FILE__) . AT . __LINE__ . COLON;
|
|
$conMsg = ERROR_TYPE_EXCEPTION;
|
|
$errorList[] = $conMsg;
|
|
consoleLog($res, CON_ERROR, $hdr . $conMsg);
|
|
consoleLog($res, CON_ERROR, $t->getMessage());
|
|
}
|
|
}
|
|
break;
|
|
|
|
case BROKER_REQUEST_NEW_SESSION :
|
|
$eventTimer = true;
|
|
if (!isset($request[BROKER_META_DATA][META_SESSION_GUID]) or empty($request[BROKER_META_DATA][META_SESSION_GUID])) {
|
|
$conMsg = sprintf(ERROR_META_FIELD_404, META_SESSION_GUID);
|
|
} elseif (!isset($request[BROKER_DATA][SYSTEM_EVENT_DURATION]) or empty($request[BROKER_DATA][SYSTEM_EVENT_DURATION])) {
|
|
$conMsg = sprintf(ERROR_DATA_KEY_404 . SYSTEM_EVENT_DURATION);
|
|
} elseif (!validateGUID($request[BROKER_META_DATA][META_SESSION_GUID])) {
|
|
$conMsg = ERROR_INVALID_GUID . $request[BROKER_META_DATA][META_SESSION_GUID];
|
|
} else {
|
|
$sessionToken = $request[BROKER_META_DATA][META_SESSION_GUID];
|
|
$duration = intval($request[BROKER_DATA][SYSTEM_EVENT_DURATION]);
|
|
$rc = gasStatic::createATJob($duration, $sessionToken);
|
|
if (!is_null($rc)) {
|
|
// we successfully created the AT(1) job - update the sys-event record
|
|
$tmpObj = new gacSystemEvents($request[BROKER_META_DATA]);
|
|
if (!$tmpObj->status) {
|
|
$conMsg = ERROR_TEMPLATE_INSTANTIATE . $request[BROKER_META_DATA][META_TEMPLATE];
|
|
} else {
|
|
// fetch the system event record
|
|
$tmpObj->fetchRecordBySessionGUID($sessionToken);
|
|
if ($tmpObj->status) {
|
|
// update the system-event record with the AT results
|
|
$query = [SYSTEM_EVENT_FK_SESSION_GUID => [OPERAND_NULL => [OPERATOR_EQ => [$tmpObj->getColumn(SYSTEM_EVENT_FK_SESSION_GUID)]]]];
|
|
$update = [SYSTEM_EVENT_AT_RESULTS => $rc];
|
|
$data = [ STRING_QUERY_DATA => $query, STRING_UPDATE_DATA => $update ];
|
|
$tmpObj->_updateRecord($data);
|
|
if ($tmpObj->status) {
|
|
$eventSuccess = true;
|
|
$conMsg = SUCCESS_EVENT . $request[BROKER_REQUEST];
|
|
} else {
|
|
$callBackLog->warn(ERROR_MDB_SYS_EVENT_UPDATE);
|
|
consoleLog($res, CON_SYSTEM, ERROR_MDB_SYS_EVENT_UPDATE);
|
|
}
|
|
} else {
|
|
$callBackLog->warn(ERROR_MDB_SYS_EVENT_SAVE);
|
|
consoleLog($res, CON_SYSTEM, ERROR_MDB_SYS_EVENT_SAVE);
|
|
}
|
|
}
|
|
} else {
|
|
$callBackLog->warn(ERROR_AT_SAVE);
|
|
consoleLog($res, CON_SYSTEM, ERROR_AT_SAVE);
|
|
}
|
|
}
|
|
break;
|
|
|
|
case BROKER_REQUEST_ADMIN_CACHE_SMASH :
|
|
$eventTimer = true;
|
|
$errors = [];
|
|
if (!isset($request[BROKER_DATA]) or empty($request[BROKER_DATA])) {
|
|
$conMsg = ERROR_DATA_MISSING_ARRAY . STRING_DATA;
|
|
$callBackLog->data($conMsg);
|
|
} else {
|
|
try {
|
|
// calls the cache-smash method and passes the list of guids
|
|
if (!gasCache::smashCache($request[BROKER_DATA][STRING_DATA], $errors))
|
|
consoleLog($res, CON_ERROR, ERROR_CACHE_SMASH_FAIL_USER);
|
|
else
|
|
consoleLog($res, CON_SUCCESS, SUCCESS_CACHE_SMASH);
|
|
} catch (TypeError | Throwable $t) {
|
|
$hdr = basename(__FILE__) . AT . __LINE__ . COLON;
|
|
$conMsg = ERROR_TYPE_EXCEPTION;
|
|
$errorList[] = $conMsg;
|
|
consoleLog($res, CON_ERROR, $hdr . $conMsg);
|
|
consoleLog($res, CON_ERROR, $t->getMessage());
|
|
}
|
|
}
|
|
break;
|
|
|
|
default :
|
|
$conMsg = ERROR_BROKER_EVENT_UNKNOWN . $request[BROKER_REQUEST];
|
|
$callBackLog->warn(ERROR_BROKER_EVENT_UNKNOWN . $request[BROKER_REQUEST]);
|
|
// todo - not a supported event so log something dire
|
|
break;
|
|
}
|
|
}
|
|
if (!$eventSuccess and empty($conMsg)) {
|
|
$conMsg = ERROR_FINE_PICKLE;
|
|
}
|
|
if (!empty($conMsg)) {
|
|
consoleLog($res, (($eventSuccess) ? CON_SUCCESS : CON_ERROR), $conMsg . sprintf(ERROR_EVENT_COUNT, $requestCounter, $myRequestsPerInstance));
|
|
}
|
|
// $_request->delivery_info[BROKER_CHANNEL]->basic_ack($_request->delivery_info[BROKER_DELIVERY_TAG]);
|
|
|
|
// get the broker-event processing time
|
|
$eventTime = gasStatic::doingTime($startTime);
|
|
|
|
// log a system-event for the event -- unlike the other system events, we're not going to submit
|
|
// this one via a broker - which is standard but, instead, we're going to write the record out
|
|
// directly since doing otherwise would cause an infinite loop in processing.
|
|
if ($eventTime and $eventTimer) {
|
|
$data = [
|
|
SYSTEM_EVENT_NAME => SYSEV_NAME_EVENT_TIMER,
|
|
SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
|
|
SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
|
|
SYSTEM_EVENT_BROKER_GUID => $childGUID,
|
|
DB_EVENT_GUID => $eventGUID,
|
|
SYSTEM_EVENT_COUNT => $requestCounter,
|
|
SYSTEM_EVENT_COUNT_TOTAL => $myRequestsPerInstance,
|
|
SYSTEM_EVENT_TIMER => $eventTime,
|
|
SYSTEM_EVENT_BROKER_EVENT => $event,
|
|
SYSTEM_EVENT_META_DATA => $request[BROKER_META_DATA],
|
|
SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__
|
|
];
|
|
if (!empty($ogGUID)) $data[SYSTEM_EVENT_OGUID] = $ogGUID;
|
|
@postSystemEvent($data, $eventGUID, $callBackLog);
|
|
}
|
|
|
|
// exit the child if we've reached the request limit
|
|
if ($requestCounter >= $myRequestsPerInstance) {
|
|
if (getmypid() == $thisPid) {
|
|
$meta = [
|
|
META_SESSION_IP => STRING_SESSION_HOME,
|
|
META_SESSION_DAEMON => 1,
|
|
META_SESSION_MISC => INFO_BROKER_RECYCLE,
|
|
META_EVENT_GUID => $eventGUID
|
|
];
|
|
$data = [
|
|
SYSTEM_EVENT_NAME => SYSEV_NAME_BROKER_RECYCLE,
|
|
SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
|
|
SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
|
|
SYSTEM_EVENT_BROKER_GUID => $childGUID,
|
|
DB_EVENT_GUID => $eventGUID,
|
|
SYSTEM_EVENT_START => $startingMemory,
|
|
SYSTEM_EVENT_PEAK => memory_get_peak_usage(true),
|
|
SYSTEM_EVENT_END => memory_get_usage(true),
|
|
SYSTEM_EVENT_BROKER_EVENT => $event,
|
|
SYSTEM_EVENT_COUNT => $requestCounter,
|
|
SYSTEM_EVENT_COUNT_TOTAL => $myRequestsPerInstance,
|
|
SYSTEM_EVENT_META_DATA => $meta,
|
|
SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__
|
|
];
|
|
@postSystemEvent($data, $eventGUID, $callBackLog);
|
|
gasCache::sysDel(($groot . UDASH . $thisPid));
|
|
}
|
|
consoleLog($res, CON_SYSTEM, INFO_BROKER_REQ_COUNT);
|
|
if (is_object($brokerChannel)) $brokerChannel->close();
|
|
if (is_object($brokerConnection)) $brokerConnection->close();
|
|
exit(0);
|
|
}
|
|
};
|
|
consoleLog($res, CON_SYSTEM, sprintf(INFO_BROKER_QUEUE_ESTABLISHED, BROKER_QUEUE_AI, $thisPid, $myRequestsPerInstance));
|
|
$brokerChannel->basic_consume($queue, '', false, true, false, false, $callback);
|
|
while (count($brokerChannel->callbacks)) {
|
|
try {
|
|
$brokerChannel->wait();
|
|
} catch (AMQPChannelClosedException | Throwable $t) {
|
|
$hdr = basename(__FILE__) . AT . __LINE__ . COLON;
|
|
@handleExceptionMessaging($hdr, $t->getMessage(), $foo, true);
|
|
}
|
|
}
|
|
break;
|
|
|
|
case 1 : // parent
|
|
// does nothing
|
|
break;
|
|
}
|
|
return($thisPid);
|
|
}
|
|
|
|
for ($numBrokers = 0; $numBrokers < $runningBrokers; $numBrokers++) {
|
|
$childrenPidList[] = forkMe();
|
|
}
|
|
consoleLog($res, CON_SUCCESS, sprintf(INFO_BROKER_PARENT_STARTED, count($childrenPidList), BROKER_QUEUE_AI));
|
|
|
|
// "register" the broker instantiation event
|
|
$data = [
|
|
SYSTEM_EVENT_NAME => SYSEV_NAME_GROOT_REG,
|
|
SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
|
|
SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
|
|
SYSTEM_EVENT_KEY => STRING_NUMBER_CHILDREN,
|
|
SYSTEM_EVENT_VAL => $numberChildren,
|
|
SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__,
|
|
SYSTEM_EVENT_NOTES => BROKER_SYSEV_REG . rtrim($res, ": ")
|
|
];
|
|
@postSystemEvent($data, $groot, $parentLog);
|
|
|
|
// the parent process continues to run, waking-up every second to monitor it's children...
|
|
// when a child dies, it's death-rattle is caught and the child is replaced with a new process.
|
|
while (count($childrenPidList)) {
|
|
$lastPid = 0;
|
|
$newPidList = null;
|
|
$result = pcntl_waitpid(0, $status); // detect any sigchld from the parent-group
|
|
if (in_array($result, $childrenPidList)) {
|
|
$key = array_search($result, $childrenPidList);
|
|
array_splice($childrenPidList, $key, 1);
|
|
// process has already exited -- restart it
|
|
$childrenPidList[] = forkMe();
|
|
}
|
|
} |