Files
namaste/brokers/sBroker.php
gramps 373ebc8c93 Archive: Namaste PHP AMQP framework v1.0 (2017-2020)
952 days continuous production uptime, 40k+ tp/s single node.
Original corpo Bitbucket history not included — clean archive commit.
2026-04-05 09:49:30 -07:00

382 lines
20 KiB
PHP

<?php
/**
* sBroker.php -- the tercero session (one-way) broker
*
* The session broker is a "system" broker designed to basically do one thing - handle requests to expire existing
* sessions on tercero from the admin service.
*
* This broker was created so as to not overly-burden the user-broker with administrative requests. The broker is
* a "fire-n-forget" broker meaning that no response is published (returned) to the calling client.
*
* If there is an error in processing, then we'll communicate that error back to admin by publishing a system event.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
*
* HISTORY:
* ========
* 10-02-20 mks DB-168: original coding
*
*
*/
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPChannelClosedException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
pcntl_async_signals(true); // enable asynchronous signal handling (PHP 7.1)
$_REDIRECT = true;
$topDir = dirname(__DIR__);
// load the framework
@require_once($topDir . '/config/sneakerstrap.inc'); // can't be constants b/c this loads the constants
$res = 'SESS: ';
// event management for children
$appServerConfig = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_BROKER_TERCERO];
$numberChildren = $appServerConfig[CONFIG_BROKER_INSTANCES][CONFIG_SESSION_BROKER];
$requestsPerInstance = (empty($appServerConfig[CONFIG_BROKER_REQUEST_LIMIT])) ? NUMBER_C : $appServerConfig[CONFIG_BROKER_REQUEST_LIMIT];
$numberChildren = ($numberChildren < 1) ? 1 : $numberChildren; // todo -- should this be = 2?? (Yes, but only if prod)
$runningBrokers = $numberChildren;
$requestCounter = 0;
$myRequestsPerInstance = 0;
$startingMemory = 0;
$groot = rtrim($res, COLON) . UDASH . guid(); // root guid
consoleLog($res, CON_SUCCESS, sprintf(INFO_BROKER_STARTUP, substr(basename(__FILE__), 0, -4), $groot));
$parentLog = new gacErrorLogger();
$errors = null;
$file = rtrim(basename(__FILE__), DOT . FILE_TYPE_PHP);
$service = ENV_TERCERO;
if (!validateService($service, $errors)) {
$hdr = sprintf(INFO_LOC, $file, __LINE__);
$msg = sprintf(ERROR_SERVICE_REG, $file, $service);
$parentLog->fatal($hdr . $msg);
$parentLog->__destruct();
unset($parentLog);
exit(1);
}
//////////////////////////////////////////////////////////////////////////////////
// set-up the replacement signal handler that will be called on a child's death //
//////////////////////////////////////////////////////////////////////////////////
//declare( ticks = 1);
function sigHandler($_sig) {
global $numberChildren;
switch ($_sig) {
case SIGCHLD :
$numberChildren--;
while (($pid = pcntl_wait($_sig, WNOHANG)) > 0) {
@pcntl_wexitstatus($_sig);
}
break;
}
}
pcntl_signal(SIGCLD, 'sigHandler');
/////////////////////////////////////////////////////////////////////////////////////////
// set-up the forking function so that it can be called initially or on a SIGCLD event //
/////////////////////////////////////////////////////////////////////////////////////////
function forkMe()
{
global $thisWatcher, $eos, $res, $parentLog, $requestsPerInstance, $myRequestsPerInstance, $startingMemory, $groot;
$myRequestsPerInstance = $requestsPerInstance + (mt_rand(0, 2) * 10) + mt_rand(0, 9);
$startingMemory = memory_get_usage(true);
$thisPid = pcntl_fork();
switch ($thisPid) {
case -1 : // error
$cmsg = ERROR_FORK_FAILED . $thisWatcher;
$parentLog->fatal($cmsg);
die(getDateTime() . CON_ERROR . $res . $cmsg . $eos);
break;
case 0 : // child (broker daemon)
// replace the sigcld signal handler
pcntl_signal(SIGCLD, SIG_DFL);
$thisPid = getmypid();
$childGUID = rtrim($res, COLON) . UDASH . guid();
try {
// toss the childGUID unto cache because it does not propagate down to the callback method
gasCache::sysAdd(($groot . UDASH . $thisPid), $childGUID);
// create the child logger object
/** @var gacErrorLogger $childLog */
$childLog = new gacErrorLogger();
$queueTag = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_BROKER_QUEUE_TAG];
$queue = $queueTag . BROKER_QUEUE_S;
/** @var AMQPStreamConnection $brokerConnection */
$brokerConnection = gasResourceManager::fetchResource(RESOURCE_TERCERO);
if (is_null($brokerConnection)) {
$hdr = basename(__FILE__) . AT . __LINE__ . COLON;
$childLog->fatal($hdr . ERROR_RESOURCE_404 . RESOURCE_TERCERO . COLON . BROKER_QUEUE_S);
consoleLog($res, CON_ERROR,$hdr . ERROR_RESOURCE_404 . RESOURCE_TERCERO . COLON . BROKER_QUEUE_S);
exit(1); // shell-script exit value for fail
}
$brokerChannel = $brokerConnection->channel();
// $brokerChannel->queue_declare($queue, BROKER_QUEUE_DECLARE_PASSIVE, false, false, true);
$brokerChannel->queue_declare($queue);
} catch (AMQPRuntimeException | AMQPTimeoutException | Throwable $t) {
$hdr = basename(__FILE__) . AT . __LINE__ . COLON;
@handleExceptionMessaging($hdr, $t->getMessage(), $foo, true);
exit(1);
}
// register the broker child start-up as a system-event
$data = [
SYSTEM_EVENT_NAME => SYSEV_NAME_CHILD_REG,
SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
SYSTEM_EVENT_BROKER_GUID => $childGUID,
SYSTEM_EVENT_KEY => SYSEV_CHILD_RPI,
SYSTEM_EVENT_VAL => $myRequestsPerInstance,
SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__
];
@postSystemEvent($data, $childGUID, $childLog);
register_shutdown_function(BROKER_SHUTDOWN_FUNCTION, $brokerChannel, $brokerConnection, $res);
$callback = function($_request) {
$startTime = gasStatic::doingTime();
global $requestCounter, $res, $eos, $myRequestsPerInstance, $startingMemory, $groot, $service;
/** @var AMQPChannel $brokerChannel */
global $brokerChannel;
/** @var PhpAmqpLib\Connection\AMQPStreamConnection $brokerConnection */
global $brokerConnection;
$file = basename(__FILE__);
$childGUID = gasCache::sysGet(($groot . UDASH . getmypid()));
if (gasConfig::$settings[CONFIG_DEBUG]) {
consoleLog($res, CON_DEBUG, 'Child GUID: ' . $childGUID);
consoleLog($res,CON_DEBUG, 'root GUID: ' . $groot);
}
$requestCounter++;
$returnData = null;
$eventTimer = false;
$request = null;
$eventSuccess = false;
$conMsg = '';
$errorList = array();
$thisPid = getmypid();
$eventGUID = guid();
$ogGUID = '';
/** @var gacMongoDB $obj */
$obj = null;
// set-up the call-back logger
$callBackLog = new gacErrorLogger($eventGUID, false);
if (!firstPassPayloadValidation($_request, $service, $msg, $request, $eventGUID)) {
$conMsg = $msg;
$callBackLog->info($msg);
$event = BROKER_QUEUE_S . '(' . ERROR_DATA_VALIDATION_FIRST_PASS . ')';
} elseif (!validateMetaData($request, $errorList)) {
for ($index = 0, $last = count($errorList); $index < $last; $index++) {
$conMsg .= $errorList[$index] . $eos;
$callBackLog->error($errorList[$index]);
}
$conMsg = rtrim($conMsg, $eos);
$event = BROKER_QUEUE_S . '(' . ERROR_META_VALIDATION_SECOND_PASS . ')';
} else {
$event = BROKER_QUEUE_S . '(' . $request[BROKER_REQUEST] . ')';
switch ($request[BROKER_REQUEST]) {
case BROKER_REQUEST_SHUTDOWN :
// $_request->delivery_info[BROKER_CHANNEL]->basic_cancel($_request->delivery_info[BROKER_DELIVERY_TAG]);
$conMsg = SUCCESS_SHUTDOWN;
$eventSuccess = true;
break;
case BROKER_REQUEST_PING :
$conMsg = SUCCESS_PING . BROKER_QUEUE_S;
$eventSuccess = true;
break;
case BROKER_REQUEST_EXPIRE_SESSION :
$errors = [];
/** var gacMongoDB $obj */
if (!is_null($obj = grabWidget($request[BROKER_META_DATA], '', $errors))) {
// we have widget - update the session record
/** @var gatSessions $template */
$template = $obj->template;
$bc = new gacWorkQueueClient($file . AT . __LINE__, BROKER_QUEUE_AI);
if (!$bc->status) {
$hdr = sprintf(INFO_LOC, $file, __LINE__);
@handleExceptionMessaging($hdr, ERROR_BROKER_CLIENT_DECLARE . BROKER_QUEUE_AI, $foo, true);
$conMsg = FAIL_EVENT . $request[BROKER_REQUEST];
} elseif (!is_null($payload = $template->buildExpireSessionPayload($request[BROKER_DATA], $errors))) {
$obj->_updateRecord($payload);
if ($obj->status) {
// publish a request back to admin to expire the system-event record indicating success in closing the session
// (because both of these queues are type f-n-f...)
$bc->call($template->buildCloseSysEventPayload($request[BROKER_DATA][STRING_GUID_KEY]));
// successful session update - console log message and done (no return to client)
$conMsg = SUCCESS_EVENT . $request[BROKER_REQUEST];
$eventSuccess = true;
} else {
// create failed-session record for failure to update session record
if (count($errors))
foreach ($errors as $error)
$callBackLog->error($error);
$conMsg = FAIL_EVENT . $request[BROKER_REQUEST];
$data = [
MONGO_FAILED_EVENT_GUID => $request[BROKER_DATA][STRING_GUID_KEY],
MONGO_FAILED_EVENT_NAME => $request[BROKER_REQUEST],
MONGO_FAILED_EVENT_DESC => basename(__FILE__) . AT . __LINE__,
MONGO_FAILED_EVENT_SEV => ERROR_WARN
];
$meta = [
META_TEMPLATE => TEMPLATE_CLASS_FAILED_SESSIONS,
META_CLIENT => CLIENT_SYSTEM,
META_DO_CACHE => 0,
META_SESSION_ID => $request[BROKER_DATA][STRING_GUID_KEY],
];
$request = [
BROKER_REQUEST => BROKER_REQUEST_CREATE,
BROKER_DATA => [$data],
BROKER_META_DATA => $meta
];
$bc->call(gzcompress(json_encode($request)));
if (is_object($bc)) $bc->__destruct();
unset($bd);
}
} else {
// we somehow failed to build the data payload based on the request data
if (count($errors))
foreach ($errors as $error)
$callBackLog->error($error);
$conMsg = FAIL_EVENT . $request[BROKER_REQUEST];
}
} else {
$hdr = sprintf(INFO_LOC, $file, __LINE__);
$conMsg = ERROR_TEMPLATE_INSTANTIATE . $request[BROKER_META_DATA][META_TEMPLATE];
$obj->eventMessages[] = $conMsg;
$callBackLog->warn($hdr . $conMsg);
}
break;
default :
$conMsg = ERROR_BROKER_EVENT_UNKNOWN . $request[BROKER_REQUEST];
$callBackLog->warn(ERROR_BROKER_EVENT_UNKNOWN . $request[BROKER_REQUEST]);
break;
}
}
if (!$eventSuccess and empty($conMsg)) {
$conMsg = ERROR_FINE_PICKLE;
}
if (!empty($conMsg)) {
consoleLog($res, (($eventSuccess) ? CON_SUCCESS : CON_ERROR), $conMsg . sprintf(ERROR_EVENT_COUNT, $requestCounter, $myRequestsPerInstance));
}
// $_request->delivery_info[BROKER_CHANNEL]->basic_ack($_request->delivery_info[BROKER_DELIVERY_TAG]);
// get the broker-event processing time
$eventTime = gasStatic::doingTime($startTime);
// log a system-event for the event -- unlike the other system events, we're not going to submit
// this one via a broker - which is standard but, instead, we're going to write the record out
// directly since doing otherwise would cause an infinite loop in processing.
if ($eventTime and $eventTimer) {
$data = [
SYSTEM_EVENT_NAME => SYSEV_NAME_EVENT_TIMER,
SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
SYSTEM_EVENT_BROKER_GUID => $childGUID,
DB_EVENT_GUID => $eventGUID,
SYSTEM_EVENT_COUNT => $requestCounter,
SYSTEM_EVENT_COUNT_TOTAL => $myRequestsPerInstance,
SYSTEM_EVENT_TIMER => $eventTime,
SYSTEM_EVENT_BROKER_EVENT => $event,
SYSTEM_EVENT_META_DATA => $request[BROKER_META_DATA],
SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__
];
if (!empty($ogGUID)) $data[SYSTEM_EVENT_OGUID] = $ogGUID;
@postSystemEvent($data, $eventGUID, $callBackLog);
}
// exit the child if we've reached the request limit
if ($requestCounter >= $myRequestsPerInstance) {
if (getmypid() == $thisPid) {
$meta = [
META_SESSION_IP => STRING_SESSION_HOME,
META_SESSION_DAEMON => 1,
META_SESSION_MISC => INFO_BROKER_RECYCLE,
META_EVENT_GUID => $eventGUID
];
$data = [
SYSTEM_EVENT_NAME => SYSEV_NAME_BROKER_RECYCLE,
SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
SYSTEM_EVENT_BROKER_GUID => $childGUID,
DB_EVENT_GUID => $eventGUID,
SYSTEM_EVENT_START => $startingMemory,
SYSTEM_EVENT_PEAK => memory_get_peak_usage(true),
SYSTEM_EVENT_END => memory_get_usage(true),
SYSTEM_EVENT_BROKER_EVENT => $event,
SYSTEM_EVENT_COUNT => $requestCounter,
SYSTEM_EVENT_COUNT_TOTAL => $myRequestsPerInstance,
SYSTEM_EVENT_META_DATA => $meta,
SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__
];
@postSystemEvent($data, $eventGUID, $callBackLog);
gasCache::sysDel(($groot . UDASH . $thisPid));
}
consoleLog($res, CON_SYSTEM, INFO_BROKER_REQ_COUNT);
if (is_object($brokerChannel)) $brokerChannel->close();
if (is_object($brokerConnection)) $brokerConnection->close();
exit(0);
}
};
consoleLog($res, CON_SYSTEM, sprintf(INFO_BROKER_QUEUE_ESTABLISHED, BROKER_QUEUE_S, $thisPid, $myRequestsPerInstance));
$brokerChannel->basic_consume($queue, '', false, true, false, false, $callback);
while (count($brokerChannel->callbacks)) {
try {
$brokerChannel->wait();
} catch (AMQPChannelClosedException | Throwable $t) {
$hdr = basename(__FILE__) . AT . __LINE__ . COLON;
@handleExceptionMessaging($hdr, $t->getMessage(), $foo, true);
}
}
break;
case 1 : // parent
// does nothing
break;
}
return($thisPid);
}
for ($numBrokers = 0; $numBrokers < $runningBrokers; $numBrokers++) {
$childrenPidList[] = forkMe();
}
consoleLog($res, CON_SUCCESS, sprintf(INFO_BROKER_PARENT_STARTED, count($childrenPidList), BROKER_QUEUE_S));
// "register" the broker instantiation event
$data = [
SYSTEM_EVENT_NAME => SYSEV_NAME_GROOT_REG,
SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
SYSTEM_EVENT_KEY => STRING_NUMBER_CHILDREN,
SYSTEM_EVENT_VAL => $numberChildren,
SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__,
SYSTEM_EVENT_NOTES => BROKER_SYSEV_REG . rtrim($res, ": ")
];
@postSystemEvent($data, $groot, $parentLog);
// the parent process continues to run, waking-up every second to monitor it's children...
// when a child dies, it's death-rattle is caught and the child is replaced with a new process.
while (count($childrenPidList)) {
$lastPid = 0;
$newPidList = null;
$result = pcntl_waitpid(0, $status); // detect any sigchld from the parent-group
if (in_array($result, $childrenPidList)) {
$key = array_search($result, $childrenPidList);
array_splice($childrenPidList, $key, 1);
// process has already exited -- restart it
$childrenPidList[] = forkMe();
}
}