Files
namaste/scripts/startBrokers.php
gramps 373ebc8c93 Archive: Namaste PHP AMQP framework v1.0 (2017-2020)
952 days continuous production uptime, 40k+ tp/s single node.
Original corpo Bitbucket history not included — clean archive commit.
2026-04-05 09:49:30 -07:00

419 lines
20 KiB
PHP

<?php
/**
* startBrokers.php -- Namaste launch script
*
* this script is the broker-launch script for the Namaste framework.
*
* the script reads the current configuration for the brokers from the combined XML files and launches the
* appropriate number of brokers specified by the XML directives.
*
* script builds the launch command based on the provided configuration and then invokes a bash script to launch
* the brokers.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
*
* HISTORY:
* ========
* 06-12-17 mks original coding
* 06-15-17 mks added check for primeInstances to be populated before issuing exec command
* 11-30-17 mks CORE-594: added pre-check for running brokers; exit immediately if brokers already running
* 12-06-17 mks CORE-594: added chromium to fgrep stream for existing-broker check
* 03-13-18 mks CORE-764: allowing for broker non-starts if the number of children is not set, not-numeric,
* of not greater than zero - displays console message if any broker isn't started
* 05-31-18 mks CORE-1011: update for new XML broker services configuration
* console logging
* production checks, validate environment
* 06-08-18 mks CORE-1035: moved the service env registration to gasConfig class b/c PHP statics are weak af
* 07-09-18 mks CORE-1017: validating the XML config, ping brokers post-IPL to confirm successful start-up
* 07-24-18 mks CORE-1097: ensuring only services defined as isLocal are started during IPL
* 08-09-18 mks CORE-975: squelching a PHP notices and fixing a variable (lcv) reference error
* 01-31-19 mks DB-107: corrected logging message showing repeat resource identifier
* 09-21-20 mks DB-168: update for new services configuration, suppressed notice message
* 11-02-20 mks DB-171: replaced checks for local with checks for service(s) being active instead
*
*/
$res = 'STBR: ';
// check to see if brokers are already running -- filter out the grep command, the debugging session and this script
$command = 'ps -ef | grep -i brokers | fgrep -v grep | fgrep -v xdebug | fgrep -v chrom | fgrep -v opera | fgrep -v startBrokers | wc -l';
$results = shell_exec($command);
$results = (is_null($results)) ? $results = 0 : intval($results);
if ($results > 0) {
// can't use consoleLog b/c we haven't loaded it yet via bootstrap.inc
echo PHP_EOL . sprintf('%d Namaste brokers are already running!', $results) . PHP_EOL . PHP_EOL;
exit(1);
}
$logFile = dirname(__DIR__) . '/logs/namaste.log';
@require_once(dirname(__DIR__) . '/config/bootstrap.inc');
consoleLog($res, CON_SYSTEM, INFO_BROKERS_IPL . $logFile);
echo $eos . $eos;
$environments = null;
// rudimentary validation of selected XML settings
if (!in_array(gasConfig::$settings[CONFIG_ID][CONFIG_ID_ENV], [ ENV_PRODUCTION, ENV_QA, ENV_STAGING, ENV_DEVELOPMENT ])) {
$msg = sprintf(CONFIG_XML_ENV_UNK, gasConfig::$settings[CONFIG_ID][CONFIG_ID_ENV]);
consoleLog($res, CON_ERROR, $msg);
exit(1);
}
consoleLog($res, CON_SYSTEM, INFO_FW_IPL);
$launch = false;
$secondaryInstallations = [ BROKER_SEGUNDO, BROKER_TERCERO, BROKER_ADMIN ];
$registeredServices = gasConfig::$settings[CONFIG_REGISTERED_SERVICES];
// launch the standard brokers needed for all environments if enabled locally and active
// iow: do not launch brokers if the service is not local to the current environment and marked as an active service
// note: $numInstances is used to validate the value for launchBrokers.sh <-- this is what actually spins up the
// requisite number of brokers as configured in the XML file
foreach ($registeredServices as $service => $isActive) {
if ($isActive) {
$command = 'bash ' . $topDir . DIR_SCRIPTS . '/launchBrokers.sh';
if (!empty($thisConfig = gasConfig::$settings[CONFIG_BROKER_SERVICES][$service])) {
if (isset($thisConfig[CONFIG_BROKER_INSTANCES]) and is_array($thisConfig[CONFIG_BROKER_INSTANCES])) {
foreach ($thisConfig[CONFIG_BROKER_INSTANCES] as $broker => $numInstances) {
if (is_numeric($numInstances) and (intval($numInstances) > 0)) {
$command .= ' ' . $broker . COLON_NS . $service;
$launch = true;
} elseif (!is_numeric($numInstances)) {
// error: value in XML is not numeric
consoleLog($res, CON_ERROR, sprintf(ERROR_CONFIG_TYPE, CONFIG_BROKER_INSTANCES, gettype($numInstances)) . $numInstances);
exit;
}
}
} elseif (!isset($thisConfig[CONFIG_BROKER_INSTANCES])) {
// error: instances setting for this service is not set
consoleLog($res, CON_ERROR, ERROR_CONFIG_RESOURCE_404 . $service . AT . CONFIG_BROKER_INSTANCES);
exit;
}
} else {
// we have an active service without a configuration section!
consoleLog($res, CON_ERROR, ERROR_CONFIG_404 . COLON . $service);
exit;
}
if ($launch) {
echo exec($command . ">> ${logFile} &");
$hdr = basename(__FILE__) . AT . __LINE__ . COLON;
consoleLog($res, CON_SUCCESS, $hdr . $command);
}
}
}
//$appServerConfig = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_BROKER_APPSERVER];
//if (!empty($appServerConfig[CONFIG_BROKER_INSTANCES]) and intval(gasConfig::$settings[CONFIG_BROKER_APPSERVER][CONFIG_IS_LOCAL]) === 1) {
// foreach($appServerConfig[CONFIG_BROKER_INSTANCES] as $broker => $numInstances) {
// if (is_numeric($numInstances) and $numInstances > 0) {
// $command .= ' ' . $broker . ':' . ENV_APPSERVER;
// $launch = true;
// } else {
// $numInstances = (is_numeric($numInstances)) ? $numInstances : ERROR_STUB_NOTDEF;
// consoleLog($res, CON_ERROR, sprintf(ERROR_BROKER_IPL_FAIL, $broker, $numInstances));
// }
// }
// if ($launch) echo exec($command . ">> ${logFile} &");
//} elseif (intval(gasConfig::$settings[CONFIG_BROKER_APPSERVER][CONFIG_IS_LOCAL]) === 0) {
// consoleLog($res, CON_SYSTEM, sprintf(INFO_SERVICE_NOT_ENABLED, CONFIG_BROKER_APPSERVER));
//} elseif (empty($appServerConfig[CONFIG_BROKER_INSTANCES])) {
// consoleLog($res, CON_SYSTEM, ERROR_CONFIG_RESOURCE_404 . CONFIG_BROKER_APPSERVER . COLON_NS . CONFIG_BROKER_SERVICES);
//}
//
//$command = 'bash ' . $topDir . DIR_SCRIPTS . '/launchBrokers.sh';
//$launch = false;
//// launch the brokers specific to prime iff this is an active instance
//if (gasConfig::$settings[CONFIG_BROKER_APPSERVER][CONFIG_IS_LOCAL]) {
// if (!empty($appServerConfig[CONFIG_BROKER_PRIME_INSTANCES])) {
// foreach ($appServerConfig[CONFIG_BROKER_PRIME_INSTANCES] as $broker => $numInstances) {
// if (is_numeric($numInstances) and $numInstances > 0) {
// $command .= ' ' . $broker . ':' . BROKER_PRIME;
// $launch = true;
// } else {
// $numInstances = (is_numeric($numInstances)) ? $numInstances : ERROR_STUB_NOTDEF;
// consoleLog($res, CON_ERROR, sprintf(ERROR_BROKER_IPL_FAIL, $broker, $numInstances));
// }
// }
// if ($launch) {
// echo exec($command . ">> ${logFile} &");
// $hdr = basename(__FILE__) . AT . __LINE__ . COLON;
// consoleLog($res, CON_SUCCESS, $hdr . $command);
// }
// }
//}
//
//$command = 'bash ' . $topDir . DIR_SCRIPTS . '/launchBrokers.sh';
//$launch = false;
//// start-up secondary instances
//$config = gasConfig::$settings[CONFIG_BROKER_SERVICES];
//foreach ($secondaryInstallations as $installation) {
// if (!empty($config[$installation]) and intval(gasConfig::$settings[$installation][CONFIG_IS_LOCAL]) === 1) {
// foreach ($config[$installation][CONFIG_BROKER_INSTANCES] as $broker => $numInstances) {
// if (is_numeric($numInstances) and $numInstances > 0) {
// $command .= ' ' . $broker . ':' . $installation;
// $launch = true;
// } else {
// $numInstances = (is_numeric($numInstances)) ? $numInstances : ERROR_STUB_NOTDEF;
// consoleLog($res, CON_ERROR, sprintf(ERROR_BROKER_IPL_FAIL, $broker, $numInstances));
// }
// }
// if ($launch) {
// echo exec($command . ">> ${logFile} &");
// $hdr = basename(__FILE__) . AT . __LINE__ . COLON;
// consoleLog($res, CON_SUCCESS, $command);
// }
// } elseif (!isset($config[$installation][CONFIG_ACTIVE]) or intval(gasConfig::$settings[$installation][CONFIG_IS_LOCAL]) !== 1) {
// consoleLog($res, CON_SYSTEM, sprintf(INFO_SERVICE_NOT_ENABLED, $installation));
// }
//}
// core-1017 - ensuring the LOCAL brokers are active
consoleLog($res, CON_SYSTEM, INFO_IPL_REST);
sleep(2);
// set up the payload packages
$meta = [
META_SYSTEM_NOTES => STRING_ORIGIN_SB,
META_CLIENT => CLIENT_SYSTEM,
META_SESSION_IP => gethostname()
];
$request = [
BROKER_REQUEST => BROKER_REQUEST_PING,
BROKER_DATA => [],
BROKER_META_DATA => $meta
];
$pedigreeRequest = [
BROKER_REQUEST => BROKER_REQUEST_PEDIGREE,
BROKER_DATA => [],
BROKER_META_DATA => $meta
];
// set-up a brokerMap as an associative array were the key is the XML name of the broker and the value is the
// corresponding queue label so that we can instantiate the correct broker client and test connectivity.
$brokerMap = [
CONFIG_BROKER_R_BROKER => BROKER_QUEUE_R,
CONFIG_BROKER_W_BROKER => BROKER_QUEUE_W,
CONFIG_BROKER_M_BROKER => BROKER_QUEUE_M,
CONFIG_ADMIN_BROKER_IN => BROKER_QUEUE_AI,
CONFIG_ADMIN_BROKER_OUT => BROKER_QUEUE_AO,
CONFIG_BROKER_WH_BROKER => BROKER_QUEUE_WH,
CONFIG_LOG_BROKER => BROKER_QUEUE_LOGS,
CONFIG_SYSLOG_BROKER => BROKER_QUEUE_SYSLOG,
CONFIG_GRAPH_BROKER => BROKER_QUEUE_GRAPHS,
CONFIG_USER_BROKER => BROKER_QUEUE_U,
CONFIG_SESSION_BROKER => BROKER_QUEUE_S,
CONFIG_BROKER_C_BROKER => BROKER_QUEUE_C
];
// fake response for when we're "pinging" a non-RPC broker/exchange (for when ping response is console logged)
$fakeResponse = [
PAYLOAD_STATUS => true,
PAYLOAD_STATE => STATE_SUCCESS,
PAYLOAD_DIAGNOSTICS => null,
PAYLOAD_RESULTS => null
];
//$fakeResponse = gzcompress(json_encode($fakeResponse));
// load-up the services as a loop-control-variant (lcv)
foreach ($registeredServices as $service => $isActive) {
if ($isActive) {
foreach (gasConfig::$settings[CONFIG_BROKER_SERVICES][$service][CONFIG_BROKER_INSTANCES] as $instance => $instanceCount) {
if ($instance == CONFIG_ADMIN_BROKER_IN or $instance == CONFIG_SESSION_BROKER) continue;
if (intval($instanceCount)) {
$logs = true;
$bc = new gacLogClient();
switch ($instance) {
case CONFIG_LOG_BROKER :
$route = EXCHANGE_QUEUE_BINDING_ALL;
break;
case CONFIG_SYSLOG_BROKER :
$route = EXCHANGE_QUEUE_BINDING_LOGS;
break;
case CONFIG_GRAPH_BROKER :
$route = EXCHANGE_QUEUE_BINDING_METRICS;
break;
default :
$logs = false;
if (is_object($bc)) $bc->__destruct();
unset($bc);
$bc = new gacBrokerClient($brokerMap[$instance], basename(__METHOD__) . COLON_NS . __LINE__);
break;
}
if (!$bc->status) {
$hdr = sprintf(INFO_LOC, basename(__FILE__), __LINE__);
$msg = $hdr . ERROR_FAILED_TO_INSTANTIATE . $brokerMap[$instance];
consoleLog($res, CON_ERROR, $msg);
consoleLog($res, CON_ERROR, ERROR_IPL_STOP_BROKERS);
if (is_object($bc)) $bc->__destruct();
unset($bc);
exit(1);
}
// publish the AMQP >PING< event request
if ($logs) {
$bc->call(gzcompress(json_encode($request)), $route);
$response = $fakeResponse;
} elseif ($instance == CONFIG_ADMIN_BROKER_IN or $instance == CONFIG_SESSION_BROKER) {
$response = $bc->call(gzcompress(json_encode($request))); // rpc queue
$response = $fakeResponse;
} else {
$response = $bc->call(gzcompress(json_encode($request))); // rpc queue
$response = json_decode(gzuncompress($response), true);
}
if (is_array($response) and $response[PAYLOAD_STATUS] != true) {
$msg = sprintf(ERROR_IPL_BROKER_PING, $brokerMap[$instance]);
consoleLog($res, CON_ERROR, $msg);
consoleLog($res, CON_ERROR, ERROR_IPL_STOP_BROKERS);
if (is_object($bc)) $bc->__destruct();
unset($bc);
exit(1);
} else {
consoleLog($res, CON_SUCCESS, sprintf(INFO_IPL_BROKER_SUCCESS, $brokerMap[$instance]));
}
if (is_object($bc)) $bc->__destruct();
unset($bc);
}
// if ($service == RESOURCE_BROKER and !empty($config[$service][CONFIG_BROKER_PRIME_INSTANCES])) {
// foreach (gasConfig::$settings[CONFIG_BROKER_SERVICES][$service][CONFIG_BROKER_PRIME_INSTANCES] as $primeService => $primeCount) {
// if ($primeCount and array_key_exists($primeService, $brokerMap)) {
// $bc = new gacBrokerClient($brokerMap[$primeService], basename(__METHOD__) . COLON_NS . __LINE__);
// if (!$bc->status) {
// $msg = ERROR_FAILED_TO_INSTANTIATE . $brokerMap[$primeService];
// consoleLog($res, CON_ERROR, $msg);
// consoleLog($res, CON_ERROR, ERROR_IPL_STOP_BROKERS);
// if (is_object($bc)) $bc->__destruct();
// unset($bc);
// exit(1);
// }
// }
// // publish the AMQP >PING< event request
// $response = $bc->call(gzcompress(json_encode($request))); // rpc queue
// $response = json_decode(gzuncompress($response), true);
// if ($response[PAYLOAD_STATUS] != true) {
// $msg = sprintf(ERROR_IPL_BROKER_PING, $broker[$primeService]);
// consoleLog($res, CON_ERROR, $msg);
// consoleLog($res, CON_ERROR, ERROR_IPL_STOP_BROKERS);
// if (is_object($bc)) $bc->__destruct();
// unset($bc);
// exit(1);
// } else {
// consoleLog($res, CON_SUCCESS, sprintf(INFO_IPL_BROKER_SUCCESS, $brokerMap[$primeService]));
// }
// if (is_object($bc)) $bc->__destruct();
// unset($bc);
// }
// }
}
}
}
// PEDIGREE Requests match the remote configurations to the local config (version number, release info, etc.)
// This ensures that all of the remote repos are on the same source-code branch...
// get the namaste (master) config (always required)
//$bc = new gacBrokerClient(BROKER_QUEUE_W, __METHOD__ . AT . __LINE__);
//if (!$bc->status) {
// $msg = ERROR_FAILED_TO_INSTANTIATE . BROKER_QUEUE_W;
// consoleLog($res, CON_ERROR, $msg);
// consoleLog($res, CON_ERROR, ERROR_IPL_STOP_BROKERS);
// if (is_object($bc)) $bc->__destruct();
// unset($bc);
// exit(1);
//} else {
// $response = $bc->call(gzcompress(json_encode($pedigreeRequest)));
// $response = json_decode(gzuncompress($response), true);
// $namastePedigree = $response[PAYLOAD_RESULTS];
// if (is_object($bc)) $bc->__destruct();
// unset($bc);
//}
//// get the admin config (always required)
//$bc = new gacBrokerClient(BROKER_QUEUE_AO, __METHOD__ . AT . __LINE__);
//if (!$bc->status) {
// $msg = ERROR_FAILED_TO_INSTANTIATE . BROKER_QUEUE_AO;
// consoleLog($res, CON_ERROR, $msg);
// consoleLog($res, CON_ERROR, ERROR_IPL_STOP_BROKERS);
// if (is_object($bc)) $bc->__destruct();
// unset($bc);
// exit(1);
//} else {
// $response = $bc->call(gzcompress(json_encode($pedigreeRequest)));
// $response = json_decode(gzuncompress($response), true);
// $adminPedigree = $response[PAYLOAD_RESULTS];
// if (is_object($bc)) $bc->__destruct();
// unset($bc);
//}
//
//// validate namaste config against the admin config
//if (serialize($adminPedigree) !== serialize($namastePedigree)) {
// $msg = sprintf(ERROR_IPL_CONFIG, ENV_APPSERVER, ENV_ADMIN);
// consoleLog($res, CON_ERROR, $msg);
// var_export($namastePedigree);
// consoleLog($res, CON_ERROR, ERROR_IPL_STOP_BROKERS);
// exit(1);
//}
//
//// if segundo is local, get and validate it's pedigree...
//if (isset($registeredServices[CONFIG_BROKER_SEGUNDO]) and intval($registeredServices[CONFIG_BROKER_SEGUNDO]) == 1) {
// $bc = new gacBrokerClient(BROKER_QUEUE_WH, __METHOD__ . AT . __LINE__);
// if (!$bc->status) {
// $msg = ERROR_FAILED_TO_INSTANTIATE . BROKER_QUEUE_WH;
// consoleLog($res, CON_ERROR, $msg);
// consoleLog($res, CON_ERROR, ERROR_IPL_STOP_BROKERS);
// if (is_object($bc)) $bc->__destruct();
// unset($bc);
// exit(1);
// } else {
// $response = $bc->call(gzcompress(json_encode($pedigreeRequest)));
// $response = json_decode(gzuncompress($response), true);
// $segundoPedigree = $response[PAYLOAD_RESULTS];
// if (is_object($bc)) $bc->__destruct();
// unset($bc);
// }
// if (serialize($namastePedigree) !== serialize($segundoPedigree)) {
// $msg = sprintf(ERROR_IPL_CONFIG, ENV_APPSERVER, ENV_SEGUNDO);
// consoleLog($res, CON_ERROR, $msg);
// var_export($namastePedigree);
// consoleLog($res, CON_ERROR, ERROR_IPL_STOP_BROKERS);
// exit(1);
// }
//}
//
//// if tercero is active, get and validate it's pedigree...
//if (isset($activeServices[CONFIG_BROKER_TERCERO]) and intval($activeServices[CONFIG_BROKER_TERCERO]) == 1) {
// $bc = new gacBrokerClient(BROKER_QUEUE_TBD, __METHOD__ . AT . __LINE__);
// if (!$bc->status) {
// $msg = ERROR_FAILED_TO_INSTANTIATE . BROKER_QUEUE_TBD;
// consoleLog($res, CON_ERROR, $msg);
// consoleLog($res, CON_ERROR, ERROR_IPL_STOP_BROKERS);
// if (is_object($bc)) $bc->__destruct();
// unset($bc);
// exit(1);
// } else {
// $response = $bc->call(gzcompress(json_encode($pedigreeRequest)));
// $response = json_decode(gzuncompress($response), true);
// $terceroPedigree = $response[PAYLOAD_RESULTS];
// if (is_object($bc)) $bc->__destruct();
// unset($bc);
// }
// if (serialize($namastePedigree) !== serialize($terceroPedigree)) {
// $msg = sprintf(ERROR_IPL_CONFIG, ENV_APPSERVER, ENV_TERCERO);
// consoleLog($res, CON_ERROR, $msg);
// var_export($namastePedigree);
// consoleLog($res, CON_ERROR, ERROR_IPL_STOP_BROKERS);
// exit(1);
// }
//}
consoleLog($res, CON_SUCCESS, SUCCESS_IPL_ENV_CHECK);
if (intval(gasConfig::$settings[CONFIG_BROKER_APPSERVER][CONFIG_ACTIVE]) == 1) {
consoleLog($res, CON_SUCCESS, INFO_IPL_APPSERVER_SUCCESS);
}
if (intval(gasConfig::$settings[CONFIG_BROKER_SEGUNDO][CONFIG_ACTIVE]) == 1) {
consoleLog($res, CON_SUCCESS, INFO_IPL_SEGUNDO_SUCCESS);
}
if (isset(gasConfig::$settings[CONFIG_BROKER_TERCERO][CONFIG_ACTIVE]) and intval(gasConfig::$settings[CONFIG_BROKER_TERCERO][CONFIG_ACTIVE]) == 1) {
consoleLog($res, CON_SUCCESS, INFO_IPL_TERCERO_SUCCESS);
}
if (intval(gasConfig::$settings[CONFIG_ADMIN][CONFIG_ACTIVE]) == 1) {
consoleLog($res, CON_SUCCESS, INFO_IPL_ADMIN_SUCCESS);
}