brokerEvents tracking * 02-06-18 mks _INF-139: PHP 7.2 exception handling * 05-31-18 mks CORE-1011: update for new XML broker services configuration * 10-17-18 mks DB-72: audit event coding * 02-11-19 mks DB-100: offloaded a chunk of broker-event code into core for smaller footprint * 09-19-19 mks DB-136: better exception handling, moved log/metric code to respective brokers * fixed console log message where auditIn event always generating error message on success * 07-28-20 mks DB-156: broker self-registration installed * 09-17-20 mks DB-168: updated service registration, updated exception handling to current standard * */ //use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Channel\AMQPChannel; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Exception\AMQPChannelClosedException; use PhpAmqpLib\Exception\AMQPRuntimeException; use PhpAmqpLib\Exception\AMQPTimeoutException; //use PhpAmqpLib\Message\AMQPMessage; //use PhpAmqpLib\Exception\AMQPTimeoutException; pcntl_async_signals(true); // enable asynchronous signal handling (PHP 7.1) $_REDIRECT = true; $topDir = dirname(__DIR__); // load the framework @require_once($topDir . '/config/sneakerstrap.inc'); // can't be constants b/c this loads the constants $res = 'ADMI: '; // event management for children $adminServiceConfig = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_ADMIN]; $numberChildren = $adminServiceConfig[CONFIG_BROKER_INSTANCES][CONFIG_ADMIN_BROKER_IN]; $requestsPerInstance = (empty($adminServiceConfig[CONFIG_BROKER_REQUEST_LIMIT])) ? NUMBER_C : $adminServiceConfig[CONFIG_BROKER_REQUEST_LIMIT]; $numberChildren = ($numberChildren < 1) ? 1 : $numberChildren; // todo -- should this be = 2?? $runningBrokers = $numberChildren; $requestCounter = 0; $myRequestsPerInstance = 0; $startingMemory = 0; $groot = rtrim($res, COLON) . UDASH . guid(); // root guid consoleLog($res, CON_SUCCESS, sprintf(INFO_BROKER_STARTUP, substr(basename(__FILE__), 0, -4), $groot)); $parentLog = new gacErrorLogger(); $errors = null; $file = rtrim(basename(__FILE__), DOT . FILE_TYPE_PHP); $service = ENV_ADMIN; if (!validateService($service, $errors)) { $hdr = sprintf(INFO_LOC, $file, __LINE__); $msg = sprintf(ERROR_SERVICE_REG, $file, $service); $parentLog->fatal($hdr . $msg); $parentLog->__destruct(); unset($parentLog); exit(1); } ////////////////////////////////////////////////////////////////////////////////// // set-up the replacement signal handler that will be called on a child's death // ////////////////////////////////////////////////////////////////////////////////// //declare( ticks = 1); function sigHandler($_sig) { global $numberChildren; switch ($_sig) { case SIGCHLD : $numberChildren--; while (($pid = pcntl_wait($_sig, WNOHANG)) > 0) { @pcntl_wexitstatus($_sig); } break; } } pcntl_signal(SIGCLD, 'sigHandler'); ///////////////////////////////////////////////////////////////////////////////////////// // set-up the forking function so that it can be called initially or on a SIGCLD event // ///////////////////////////////////////////////////////////////////////////////////////// function forkMe() { global $thisWatcher, $eos, $res, $parentLog, $requestsPerInstance, $myRequestsPerInstance, $startingMemory, $groot; $myRequestsPerInstance = $requestsPerInstance + (mt_rand(0, 2) * 10) + mt_rand(0, 9); $startingMemory = memory_get_usage(true); $thisPid = pcntl_fork(); switch ($thisPid) { case -1 : // error $cmsg = ERROR_FORK_FAILED . $thisWatcher; $parentLog->fatal($cmsg); die(getDateTime() . CON_ERROR . $res . $cmsg . $eos); break; case 0 : // child (broker daemon) // replace the sigcld signal handler pcntl_signal(SIGCLD, SIG_DFL); $thisPid = getmypid(); $childGUID = rtrim($res, COLON) . UDASH . guid(); try { // toss the childGUID unto cache because it does not propagate down to the callback method gasCache::sysAdd(($groot . UDASH . $thisPid), $childGUID); // create the child logger object /** @var gacErrorLogger $childLog */ $childLog = new gacErrorLogger(); $queueTag = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_BROKER_QUEUE_TAG]; $queue = $queueTag . BROKER_QUEUE_AI; /** @var AMQPStreamConnection $brokerConnection */ $brokerConnection = gasResourceManager::fetchResource(RESOURCE_ADMIN); if (is_null($brokerConnection)) { $hdr = basename(__FILE__) . AT . __LINE__ . COLON; $childLog->fatal($hdr . ERROR_RESOURCE_404 . RESOURCE_ADMIN . COLON . BROKER_QUEUE_AI); consoleLog($res, CON_ERROR,$hdr . ERROR_RESOURCE_404 . RESOURCE_ADMIN . COLON . BROKER_QUEUE_AI); exit(1); // shell-script exit value for fail } $brokerChannel = $brokerConnection->channel(); // $brokerChannel->queue_declare($queue, BROKER_QUEUE_DECLARE_PASSIVE, false, false, true); $brokerChannel->queue_declare($queue); } catch (AMQPRuntimeException | AMQPTimeoutException | Throwable | TypeError $t) { $hdr = basename(__FILE__) . AT . __LINE__ . COLON; @handleExceptionMessaging($hdr, $t->getMessage(), $foo, true); exit(1); } // register the broker child start-up as a system-event $data = [ SYSTEM_EVENT_NAME => SYSEV_NAME_CHILD_REG, SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER, SYSTEM_EVENT_BROKER_ROOT_GUID => $groot, SYSTEM_EVENT_BROKER_GUID => $childGUID, SYSTEM_EVENT_KEY => SYSEV_CHILD_RPI, SYSTEM_EVENT_VAL => $myRequestsPerInstance, SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__ ]; @postSystemEvent($data, $childGUID, $childLog); register_shutdown_function(BROKER_SHUTDOWN_FUNCTION, $brokerChannel, $brokerConnection, $res); $callback = function($_request) { $startTime = gasStatic::doingTime(); global $requestCounter, $res, $eos, $myRequestsPerInstance, $startingMemory, $groot, $service; /** @var AMQPChannel $brokerChannel */ global $brokerChannel; /** @var PhpAmqpLib\Connection\AMQPStreamConnection $brokerConnection */ global $brokerConnection; $childGUID = gasCache::sysGet(($groot . UDASH . getmypid())); if (gasConfig::$settings[CONFIG_DEBUG]) { consoleLog($res, CON_DEBUG, 'Child GUID: ' . $childGUID); consoleLog($res,CON_DEBUG, 'root GUID: ' . $groot); } $requestCounter++; $returnData = null; $eventTimer = false; $request = null; $eventSuccess = false; $conMsg = ''; $errorList = array(); $thisPid = getmypid(); $eventGUID = guid(); $ogGUID = ''; // set-up the call-back logger $callBackLog = new gacErrorLogger($eventGUID, false); if (!firstPassPayloadValidation($_request, $service, $msg, $request, $eventGUID)) { $conMsg = $msg; $callBackLog->info($msg); $event = BROKER_QUEUE_AI . '(' . ERROR_DATA_VALIDATION_FIRST_PASS . ')'; } elseif (!validateMetaData($request, $errorList)) { for ($index = 0, $last = count($errorList); $index < $last; $index++) { $conMsg .= $errorList[$index] . $eos; $callBackLog->error($errorList[$index]); } $conMsg = rtrim($conMsg, $eos); $event = BROKER_QUEUE_AI . '(' . ERROR_META_VALIDATION_SECOND_PASS . ')'; } else { $event = BROKER_QUEUE_AI . '(' . $request[BROKER_REQUEST] . ')'; switch ($request[BROKER_REQUEST]) { case BROKER_REQUEST_SHUTDOWN : // $_request->delivery_info[BROKER_CHANNEL]->basic_cancel($_request->delivery_info[BROKER_DELIVERY_TAG]); $conMsg = SUCCESS_SHUTDOWN; $eventSuccess = true; break; case BROKER_REQUEST_PING : $conMsg = SUCCESS_PING . BROKER_QUEUE_AI; $eventSuccess = true; break; case BROKER_REQUEST_CREATE : $eventTimer = true; $conMsg = ''; // validate that we have a data-template in meta if (!isset($request[BROKER_META_DATA][META_TEMPLATE]) or empty($request[BROKER_META_DATA][META_TEMPLATE])) { $conMsg = ERROR_TEMPLATE_FILE_404; } elseif (!isset($request[BROKER_META_DATA][META_CLIENT]) or $request[BROKER_META_DATA][META_CLIENT] != CLIENT_SYSTEM) { $conMsg = ERROR_BROKER_CLIENT_NOT_AUTH; } else { $bh = new gacBrokerHelper(); $eventSuccess = $bh->create($request, $aryRetData, $conMsg); unset($bh); } break; case BROKER_REQUEST_UPDATE : $eventTimer = true; $conMsg = ''; if (!isset($request[BROKER_META_DATA][META_TEMPLATE]) or empty($request[BROKER_META_DATA][META_TEMPLATE])) { $conMsg = ERROR_TEMPLATE_FILE_404; } elseif (!isset($request[BROKER_META_DATA][META_CLIENT]) or $request[BROKER_META_DATA][META_CLIENT] != CLIENT_SYSTEM) { $conMsg = ERROR_BROKER_CLIENT_NOT_AUTH; } else { $bh = new gacBrokerHelper(); $eventSuccess = $bh->update($request, $aryRetData, $conMsg); unset($bh); } break; case BROKER_REQUEST_ADMIN_BROKER_EVENT: $eventTimer = true; // set to true if you want to log the processing-time for an event if (!isset($request[BROKER_DATA]) or empty($request[BROKER_DATA])) { $msg = ERROR_DATA_MISSING_ARRAY . STRING_DATA; $conMsg = $msg; $callBackLog->data($msg); } else { if (isset($request[BROKER_META_DATA][META_EVENT_GUID])) { $ogGUID = $request[BROKER_META_DATA][META_EVENT_GUID]; } // disable auditing so we don't get into an infinite loop creating a new system event record $metaCopy = $request[BROKER_META_DATA]; $metaCopy[META_AUDIT_EVENT] = 1; $tmpObj = new gacSystemEvents($metaCopy); if ($tmpObj->status) { $tmpObj->_createRecord($request[BROKER_DATA]); if ($tmpObj->status) { $conMsg = SUCCESS_EVENT . BROKER_REQUEST_CREATE; $eventSuccess = true; } else { $conMsg = FAIL_EVENT . BROKER_REQUEST_CREATE; } } if (is_object($tmpObj)) $tmpObj->__destruct(); unset($tmpObj); } break; // DB-72: Audit Event case BROKER_REQUEST_ADMIN_AUDIT_CREATE : $eventTimer = true; $journalData = []; $errorList = []; $haveJournal = false; if (!isset($request[BROKER_DATA]) or empty($request[BROKER_DATA])) { $conMsg = ERROR_DATA_MISSING_ARRAY . STRING_DATA; $callBackLog->data($conMsg); } elseif (!isset($request[BROKER_DATA][SYSTEM_EVENT_DATA]) or empty($request[BROKER_DATA][SYSTEM_EVENT_DATA])) { $conMsg = ERROR_DATA_MISSING_ARRAY . SYSTEM_EVENT_DATA; $callBackLog->data($conMsg); } else { try { // instantiate a system event object $objSysEv = new gacSystemEvents($request[BROKER_META_DATA]); if (!$objSysEv->status) { $conMsg = ERROR_TEMPLATE_INSTANTIATE . TEMPLATE_CLASS_SYS_EVENTS; $callBackLog->error($conMsg); } else { $objSysEv->_createRecord([$request[BROKER_DATA][SYSTEM_EVENT_DATA]], DATA_AUDT); unset($request[BROKER_DATA][SYSTEM_EVENT_DATA]); // grab journaling data if it exists and set a flag if (array_key_exists(STRING_JOURNAL_DATA, $request[BROKER_DATA])) { $journalData = $request[BROKER_DATA][STRING_JOURNAL_DATA]; unset($request[BROKER_DATA][STRING_JOURNAL_DATA]); $haveJournal = true; } if (!$objSysEv->status) { $conMsg = sprintf(ERROR_DATA_IMPORT, SYSTEM_EVENT_DATA, $objSysEv->class); $callBackLog->data($conMsg); } else { /** @var gacMongoDB $objAudit */ if (is_null($objAudit = grabWidget($request[BROKER_META_DATA], '', $errorList))) { foreach ($errorList as $error) $callBackLog->error($error); } else { $systemEventToken = $objSysEv->getColumn(DB_EVENT_GUID); $rc = $objAudit->launchAudit($request, $haveJournal, $systemEventToken, $journalData); $conMsg = ($rc) ? SUCCESS_AUDIT_EVENT : ERROR_AUDIT_GENERIC_FAIL; if (!$rc and count($objAudit->eventMessages)) { consoleLog($res, CON_ERROR, ERROR_AUDIT_FAIL); foreach ($objAudit->eventMessages as $errorMessage) { consoleLog($res, CON_ERROR, $errorMessage); } } elseif (!$rc) { consoleLog($res, CON_ERROR, ERROR_AUDIT_FAILED); $conMsg = ERROR_AUDIT_FAILED; } else { $eventSuccess = true; $conMsg = SUCCESS_EVENT . $request[BROKER_REQUEST]; } if (is_object($objAudit)) $objAudit->__destruct(); unset($objAudit); } } if (is_object($objSysEv)) $objSysEv->__destruct(); unset($objSysEv); } } catch (TypeError | Throwable $t) { $hdr = basename(__FILE__) . AT . __LINE__ . COLON; $conMsg = ERROR_TYPE_EXCEPTION; $errorList[] = $conMsg; consoleLog($res, CON_ERROR, $hdr . $conMsg); consoleLog($res, CON_ERROR, $t->getMessage()); } } break; case BROKER_REQUEST_NEW_SESSION : $eventTimer = true; if (!isset($request[BROKER_META_DATA][META_SESSION_GUID]) or empty($request[BROKER_META_DATA][META_SESSION_GUID])) { $conMsg = sprintf(ERROR_META_FIELD_404, META_SESSION_GUID); } elseif (!isset($request[BROKER_DATA][SYSTEM_EVENT_DURATION]) or empty($request[BROKER_DATA][SYSTEM_EVENT_DURATION])) { $conMsg = sprintf(ERROR_DATA_KEY_404 . SYSTEM_EVENT_DURATION); } elseif (!validateGUID($request[BROKER_META_DATA][META_SESSION_GUID])) { $conMsg = ERROR_INVALID_GUID . $request[BROKER_META_DATA][META_SESSION_GUID]; } else { $sessionToken = $request[BROKER_META_DATA][META_SESSION_GUID]; $duration = intval($request[BROKER_DATA][SYSTEM_EVENT_DURATION]); $rc = gasStatic::createATJob($duration, $sessionToken); if (!is_null($rc)) { // we successfully created the AT(1) job - update the sys-event record $tmpObj = new gacSystemEvents($request[BROKER_META_DATA]); if (!$tmpObj->status) { $conMsg = ERROR_TEMPLATE_INSTANTIATE . $request[BROKER_META_DATA][META_TEMPLATE]; } else { // fetch the system event record $tmpObj->fetchRecordBySessionGUID($sessionToken); if ($tmpObj->status) { // update the system-event record with the AT results $query = [SYSTEM_EVENT_FK_SESSION_GUID => [OPERAND_NULL => [OPERATOR_EQ => [$tmpObj->getColumn(SYSTEM_EVENT_FK_SESSION_GUID)]]]]; $update = [SYSTEM_EVENT_AT_RESULTS => $rc]; $data = [ STRING_QUERY_DATA => $query, STRING_UPDATE_DATA => $update ]; $tmpObj->_updateRecord($data); if ($tmpObj->status) { $eventSuccess = true; $conMsg = SUCCESS_EVENT . $request[BROKER_REQUEST]; } else { $callBackLog->warn(ERROR_MDB_SYS_EVENT_UPDATE); consoleLog($res, CON_SYSTEM, ERROR_MDB_SYS_EVENT_UPDATE); } } else { $callBackLog->warn(ERROR_MDB_SYS_EVENT_SAVE); consoleLog($res, CON_SYSTEM, ERROR_MDB_SYS_EVENT_SAVE); } } } else { $callBackLog->warn(ERROR_AT_SAVE); consoleLog($res, CON_SYSTEM, ERROR_AT_SAVE); } } break; case BROKER_REQUEST_ADMIN_CACHE_SMASH : $eventTimer = true; $errors = []; if (!isset($request[BROKER_DATA]) or empty($request[BROKER_DATA])) { $conMsg = ERROR_DATA_MISSING_ARRAY . STRING_DATA; $callBackLog->data($conMsg); } else { try { // calls the cache-smash method and passes the list of guids if (!gasCache::smashCache($request[BROKER_DATA][STRING_DATA], $errors)) consoleLog($res, CON_ERROR, ERROR_CACHE_SMASH_FAIL_USER); else consoleLog($res, CON_SUCCESS, SUCCESS_CACHE_SMASH); } catch (TypeError | Throwable $t) { $hdr = basename(__FILE__) . AT . __LINE__ . COLON; $conMsg = ERROR_TYPE_EXCEPTION; $errorList[] = $conMsg; consoleLog($res, CON_ERROR, $hdr . $conMsg); consoleLog($res, CON_ERROR, $t->getMessage()); } } break; default : $conMsg = ERROR_BROKER_EVENT_UNKNOWN . $request[BROKER_REQUEST]; $callBackLog->warn(ERROR_BROKER_EVENT_UNKNOWN . $request[BROKER_REQUEST]); // todo - not a supported event so log something dire break; } } if (!$eventSuccess and empty($conMsg)) { $conMsg = ERROR_FINE_PICKLE; } if (!empty($conMsg)) { consoleLog($res, (($eventSuccess) ? CON_SUCCESS : CON_ERROR), $conMsg . sprintf(ERROR_EVENT_COUNT, $requestCounter, $myRequestsPerInstance)); } // $_request->delivery_info[BROKER_CHANNEL]->basic_ack($_request->delivery_info[BROKER_DELIVERY_TAG]); // get the broker-event processing time $eventTime = gasStatic::doingTime($startTime); // log a system-event for the event -- unlike the other system events, we're not going to submit // this one via a broker - which is standard but, instead, we're going to write the record out // directly since doing otherwise would cause an infinite loop in processing. if ($eventTime and $eventTimer) { $data = [ SYSTEM_EVENT_NAME => SYSEV_NAME_EVENT_TIMER, SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER, SYSTEM_EVENT_BROKER_ROOT_GUID => $groot, SYSTEM_EVENT_BROKER_GUID => $childGUID, DB_EVENT_GUID => $eventGUID, SYSTEM_EVENT_COUNT => $requestCounter, SYSTEM_EVENT_COUNT_TOTAL => $myRequestsPerInstance, SYSTEM_EVENT_TIMER => $eventTime, SYSTEM_EVENT_BROKER_EVENT => $event, SYSTEM_EVENT_META_DATA => $request[BROKER_META_DATA], SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__ ]; if (!empty($ogGUID)) $data[SYSTEM_EVENT_OGUID] = $ogGUID; @postSystemEvent($data, $eventGUID, $callBackLog); } // exit the child if we've reached the request limit if ($requestCounter >= $myRequestsPerInstance) { if (getmypid() == $thisPid) { $meta = [ META_SESSION_IP => STRING_SESSION_HOME, META_SESSION_DAEMON => 1, META_SESSION_MISC => INFO_BROKER_RECYCLE, META_EVENT_GUID => $eventGUID ]; $data = [ SYSTEM_EVENT_NAME => SYSEV_NAME_BROKER_RECYCLE, SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER, SYSTEM_EVENT_BROKER_ROOT_GUID => $groot, SYSTEM_EVENT_BROKER_GUID => $childGUID, DB_EVENT_GUID => $eventGUID, SYSTEM_EVENT_START => $startingMemory, SYSTEM_EVENT_PEAK => memory_get_peak_usage(true), SYSTEM_EVENT_END => memory_get_usage(true), SYSTEM_EVENT_BROKER_EVENT => $event, SYSTEM_EVENT_COUNT => $requestCounter, SYSTEM_EVENT_COUNT_TOTAL => $myRequestsPerInstance, SYSTEM_EVENT_META_DATA => $meta, SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__ ]; @postSystemEvent($data, $eventGUID, $callBackLog); gasCache::sysDel(($groot . UDASH . $thisPid)); } consoleLog($res, CON_SYSTEM, INFO_BROKER_REQ_COUNT); if (is_object($brokerChannel)) $brokerChannel->close(); if (is_object($brokerConnection)) $brokerConnection->close(); exit(0); } }; consoleLog($res, CON_SYSTEM, sprintf(INFO_BROKER_QUEUE_ESTABLISHED, BROKER_QUEUE_AI, $thisPid, $myRequestsPerInstance)); $brokerChannel->basic_consume($queue, '', false, true, false, false, $callback); while (count($brokerChannel->callbacks)) { try { $brokerChannel->wait(); } catch (AMQPChannelClosedException | Throwable $t) { $hdr = basename(__FILE__) . AT . __LINE__ . COLON; @handleExceptionMessaging($hdr, $t->getMessage(), $foo, true); } } break; case 1 : // parent // does nothing break; } return($thisPid); } for ($numBrokers = 0; $numBrokers < $runningBrokers; $numBrokers++) { $childrenPidList[] = forkMe(); } consoleLog($res, CON_SUCCESS, sprintf(INFO_BROKER_PARENT_STARTED, count($childrenPidList), BROKER_QUEUE_AI)); // "register" the broker instantiation event $data = [ SYSTEM_EVENT_NAME => SYSEV_NAME_GROOT_REG, SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER, SYSTEM_EVENT_BROKER_ROOT_GUID => $groot, SYSTEM_EVENT_KEY => STRING_NUMBER_CHILDREN, SYSTEM_EVENT_VAL => $numberChildren, SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__, SYSTEM_EVENT_NOTES => BROKER_SYSEV_REG . rtrim($res, ": ") ]; @postSystemEvent($data, $groot, $parentLog); // the parent process continues to run, waking-up every second to monitor it's children... // when a child dies, it's death-rattle is caught and the child is replaced with a new process. while (count($childrenPidList)) { $lastPid = 0; $newPidList = null; $result = pcntl_waitpid(0, $status); // detect any sigchld from the parent-group if (in_array($result, $childrenPidList)) { $key = array_search($result, $childrenPidList); array_splice($childrenPidList, $key, 1); // process has already exited -- restart it $childrenPidList[] = forkMe(); } }