data is moved from HOT storage to COOL storage meaning that the original format of the data is preserved. * COLD --> data is moved from HOT or COOL storage to COLD storage -- which is TBD and is expected to be a CSV type * format using on of the AWS Storage-as-a-Service options. * WARM --> data is moved COLD to COOL storage. (so that it can be queried) * HOT --> data is moved COOL or COLD to HOT storage. (data de-archiving/recovery) * * For design notes, please refer to Jira case INF-188. * * This is a non-blocking RPC broker. Once a request is received from Namaste, we validate the request. If the * request passed validation and verification, then we'll immediately return a tracking GUID back to Namaste while * starting the migration process. The client has the responsibility to monitor the wareHousing record (created in * the ADMIN service database) progress and completion status. The GUID returned back to Namaste is the data * wareHousing record GUID. * * * @author mike@givingassistant.org * @version 1.0.0 * * HISTORY: * ======== * 04-10-18 mks INF-201: Original coding (begins) * 05-31-18 mks CORE-1011: update for new XML broker services configuration * 06-07-18 mks CORE-1013: remote-fetch event added * 07-09-18 mks CORE-1017: pedigree fetch event added * 07-28-20 mks DB-156: broker self-registration installed * */ use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Channel\AMQPChannel; use /** @noinspection PhpUnusedAliasInspection */ PhpAmqpLib\Message\AMQPMessage; use /** @noinspection PhpUnusedAliasInspection */ PhpAmqpLib\Exception\AMQPTimeoutException; pcntl_async_signals(true); // enable asynchronous signal handling (PHP 7.1) $myPid = getmypid(); $_REDIRECT = true; $topDir = dirname(__DIR__); $thisWatcher = basename(__FILE__); $thisWatcher = rtrim($thisWatcher, ".php"); // load the framework @require_once($topDir . '/config/sneakerstrap.inc'); // can't be constants b/c this loads the constants $res = 'DATW: '; // dat warehouse $childrenPidList = null; $pidDir = $topDir . DIR_PIDS; $eos = (isset($_SERVER['HTTP_USER_AGENT'])) ? '
' : PHP_EOL; // event management for children $whServiceSettings = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_BROKER_WH]; $numberChildren = $whServiceSettings[CONFIG_BROKER_INSTANCES][CONFIG_BROKER_WH_BROKER]; $requestsPerInstance = (empty($whServiceSettings[CONFIG_BROKER_REQUEST_LIMIT])) ? NUMBER_C : $whServiceSettings[CONFIG_BROKER_REQUEST_LIMIT]; $numberChildren = ($numberChildren < 1) ? 1 : $numberChildren; // todo -- should this be = 2?? $runningBrokers = $numberChildren; $requestCounter = 0; $myRequestsPerInstance = 0; $startingMemory = 0; // create the root guid $groot = rtrim($res, COLON) . UDASH . guid(); // root guid consoleLog($res, CON_SUCCESS, sprintf(INFO_BROKER_STARTUP, substr(basename(__FILE__), 0, -4), $groot)); consoleLog($res, CON_SUCCESS, sprintf(INFO_BROKER_NUM_CHILD, substr(basename(__FILE__), 0, -4), $numberChildren)); /** @var gacErrorLogger $parentLog */ $parentLog = new gacErrorLogger(); // todo - validate the broker environment as declared in the XML config // get the location of the broker is supposed to be run $brokerLocation = ENV_SEGUNDO; if (!empty($argv) and !empty($argv[1])) { $brokerLocation = $argv[1]; } $errors = null; $file = rtrim(basename(__FILE__), DOT . FILE_TYPE_PHP); $service = ENV_SEGUNDO; if (!validateService($service, $errors)) { $hdr = sprintf(INFO_LOC, $file, __LINE__); $msg = sprintf(ERROR_SERVICE_REG, $file, $service); $parentLog->fatal($hdr . $msg); $parentLog->__destruct(); unset($parentLog); exit(1); } ////////////////////////////////////////////////////////////////////////////////// // set-up the replacement signal handler that will be called on a child's death // ////////////////////////////////////////////////////////////////////////////////// function sigHandler($_sig) { global $numberChildren; switch ($_sig) { case SIGCHLD : $numberChildren--; while (($pid = pcntl_wait($_sig, WNOHANG)) > 0) { @pcntl_wexitstatus($_sig); } break; } } pcntl_signal(SIGCLD, 'sigHandler'); ///////////////////////////////////////////////////////////////////////////////////////// // set-up the forking function so that it can be called initially or on a SIGCLD event // ///////////////////////////////////////////////////////////////////////////////////////// function forkMe() { global $thisWatcher, $eos, $res, $parentLog, $requestsPerInstance, $startingMemory, $myRequestsPerInstance, $groot, $file; $startingMemory = memory_get_usage(true); $myRequestsPerInstance = $requestsPerInstance + (mt_rand(0, 2) * 10) + mt_rand(1, 9); $thisPid = pcntl_fork(); switch ($thisPid) { case -1 : // error $cmsg = ERROR_FORK_FAILED . $thisWatcher; $parentLog->fatal($cmsg); die(getDateTime() . CON_ERROR . $res . $cmsg . $eos); break; case 0 : // child (broker daemon) try { // replace the sigcld signal handler pcntl_signal(SIGCLD, SIG_DFL); $thisPid = getmypid(); // create the child logger object /** @var gacErrorLogger $childLog */ $childLog = new gacErrorLogger(); // generate a child guid for the forked child... $childGUID = rtrim($res, COLON) . UDASH . guid(); // toss the childGUID unto cache because it does not propagate down to the callback method gasCache::sysAdd(($groot . UDASH . $thisPid), $childGUID); $queue = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_BROKER_QUEUE_TAG] . BROKER_QUEUE_WH; /** @var AMQPStreamConnection $brokerConnection */ $brokerConnection = gasResourceManager::fetchResource(RESOURCE_SEGUNDO); if (is_null($brokerConnection)) { $childLog->fatal(ERROR_RESOURCE_404 . RESOURCE_SEGUNDO . COLON . BROKER_QUEUE_WH); consoleLog($res, CON_ERROR . ERROR_RESOURCE_404 . RESOURCE_SEGUNDO . COLON . BROKER_QUEUE_WH); exit(1); // shell-script exit value for fail } $brokerChannel = $brokerConnection->channel(); // params: queue name, passive, durable, exclusive, auto-delete $brokerChannel->queue_declare($queue, BROKER_QUEUE_DECLARE_PASSIVE, false, false, true); } catch (PhpAmqpLib\Exception\AMQPRuntimeException | Throwable $t) { $hdr = sprintf(INFO_LOC, $file, __LINE__); @handleExceptionMessaging($hdr, $t->getMessage(), $foo, true); exit(1); } // register the child-spawn event $data = [ SYSTEM_EVENT_NAME => SYSEV_NAME_CHILD_REG, SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER, SYSTEM_EVENT_BROKER_ROOT_GUID => $groot, SYSTEM_EVENT_BROKER_GUID => $childGUID, SYSTEM_EVENT_KEY => SYSEV_CHILD_RPI, SYSTEM_EVENT_VAL => $myRequestsPerInstance, SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__ ]; @postSystemEvent($data, $childGUID, $childLog); register_shutdown_function(BROKER_SHUTDOWN_FUNCTION, $brokerChannel, $brokerConnection, $res); $callback = function($_request) { $startTime = gasStatic::doingTime(); $postNormalResponse = true; /** @var AMQPChannel $brokerChannel */ global $brokerChannel; /** @var AMQPStreamConnection $brokerConnection */ global $brokerConnection; global $requestCounter, $res, $eos, $myRequestsPerInstance, $startingMemory, $groot, $service, $file; $event = BROKER_QUEUE_M . '('; $requestCounter++; $aryRetData = null; $retData = null; $errorStack = []; $request = null; $eos = (isset($_SERVER['HTTP_USER_AGENT'])) ? '
' : PHP_EOL; $eventSuccess = false; $conMsg = ''; $eventGUID = guid(); $thisPid = getmypid(); $eventTimer = false; // certain events will toggle to true to log timer recording for the broker event $childGUID = gasCache::sysGet(($groot . UDASH . getmypid())); // set-up the call-back logger /** @var gacErrorLogger $callBackLog */ $callBackLog = new gacErrorLogger($eventGUID); try { if (!firstPassPayloadValidation($_request, $service, $msg, $request, $eventGUID)) { $conMsg = $msg; $callBackLog->info($msg); $aryRetData = buildReturnPayload([false, STATE_FAIL, null, $msg, null]); $event .= ERROR_DATA_VALIDATION_FIRST_PASS . ')'; } elseif (!validateMetaData($request, $errorStack)) { for ($index = 0, $last = count($errorStack); $index < $last; $index++) { $conMsg .= $errorStack[$index] . $eos; $callBackLog->error($errorStack[$index]); } $conMsg = rtrim($conMsg, $eos); $aryRetData = buildReturnPayload([false, STATE_META_ERROR, $errorStack, null, null]); $event .= ERROR_META_VALIDATION_SECOND_PASS . ')'; } else { $event .= $request[BROKER_REQUEST] . ')'; if (is_null($request)) { consoleLog($res, CON_ERROR, ERROR_REQUEST_404); } switch ($request[BROKER_REQUEST]) { case BROKER_REQUEST_SHUTDOWN : $_request->delivery_info[BROKER_CHANNEL]->basic_cancel($_request->delivery_info[BROKER_DELIVERY_TAG]); $conMsg = SUCCESS_SHUTDOWN; $aryRetData = buildReturnPayload([true, STATE_SUCCESS, null, BROKER_REQUEST_SHUTDOWN, null]); $eventSuccess = true; break; // test broker responsiveness case BROKER_REQUEST_PING : $conMsg = SUCCESS_PING . BROKER_QUEUE_WH; $aryRetData = buildReturnPayload([true, STATE_SUCCESS, null, (SUCCESS_PING . BROKER_QUEUE_WH), null]); $eventSuccess = true; break; case BROKER_REQUEST_PEDIGREE : $conMsg = SUCCESS_EVENT . BROKER_REQUEST_PEDIGREE; $aryRetData = buildReturnPayload([true, STATE_SUCCESS, null, gasConfig::getPedigree()]); $eventSuccess = true; break; case BROKER_REQUEST_WAREHOUSE : $eventSuccess = false; $eventTimer = false; $objMigrate = new gacMigrations($request[BROKER_DATA], $request[BROKER_META_DATA], EVENT_WAREHOUSE); if (!$objMigrate->status) { $conMsg = FAIL_EVENT . BROKER_REQUEST_WAREHOUSE; $aryRetData = buildReturnPayload([false, $objMigrate->state, $objMigrate->errorStack, null]); } else { $guid = $objMigrate->objWarehouseMeta->getColumn(DB_TOKEN); // validate return guid if (!validateGUID($guid)) { $conMsg = ERROR_EVENT . BROKER_REQUEST_WAREHOUSE; $aryRetData = buildReturnPayload([ false, FAIL_EVENT, $objMigrate->errorStack, ERROR_BROKER_REQUEST_FAILED]); } else { $conMsg = SUCCESS_EVENT . BROKER_REQUEST_WAREHOUSE; $aryRetData = buildReturnPayload([true, SUCCESS_EVENT, $objMigrate->errorStack, $guid]); $eventSuccess = true; } // send the guid back to the calling client now so we can resume the warehousing... postResponse($aryRetData, $_request, $callBackLog, $res); $postNormalResponse = false; // dive back into the objMigration class and perform the warehouse request if (!$objMigrate->whData()) { $conMsg = FAIL_EVENT . BROKER_REQUEST_WAREHOUSE; } else { $conMsg = SUCCESS_EVENT . BROKER_REQUEST_WAREHOUSE; $eventSuccess = true; } } break; case BROKER_REQUEST_REMOTE_FETCH : $eventTimer = true; $errors = []; /** @var gacMongoDB $tmpObj */ if (is_null($tmpObj = grabWidget($request[BROKER_META_DATA], '', $errors))) { foreach ($errors as $error) $callBackLog->error($error); } else { $tmpObj->_fetchRecords($request[BROKER_DATA]); if ($tmpObj->status) { $eventSuccess = true; $tmpObj->eventMessages[] = STRING_REC_COUNT_RET . $tmpObj->recordsReturned; $conMsg = SUCCESS_EVENT . BROKER_REQUEST_FETCH; $queryMeta = [ STRING_REC_COUNT_RET => $tmpObj->recordsReturned, STRING_REC_COUNT_TOT => $tmpObj->recordsInCollection ]; // recordsInQuery is a PDO thing so let's see if it exists in the class object if (isset($tmpObj->recordsInQuery) and $tmpObj->recordsInQuery) { $queryMeta[STRING_REC_COUNT_QUERY] = $tmpObj->recordsInQuery; } if (isset($request[BROKER_META_DATA][META_DONUT_FILTER]) and $request[BROKER_META_DATA][META_DONUT_FILTER] == 1) { $queryResults = $tmpObj->getData(); } elseif ($tmpObj->useCache or (isset($request[BROKER_META_DATA][META_DO_CACHE]) and $request[BROKER_META_DATA][META_DO_CACHE])) { // todo - this is supposed to return the list of cache keys, or the single reference cache key - fix! $queryResults = $tmpObj->cacheMap; } else { $queryResults = $tmpObj->getData(); } $retData = [STRING_QUERY_RESULTS => $queryResults, STRING_QUERY_DATA => $queryMeta]; $aryRetData = buildReturnPayload([true, STATE_SUCCESS, $tmpObj->eventMessages, $retData]); } else { $conMsg = FAIL_EVENT . BROKER_REQUEST_FETCH; $aryRetData = buildReturnPayload([false, $tmpObj->state, $tmpObj->eventMessages, null]); } if (is_object($tmpObj)) $tmpObj->__destruct(); unset($tmpObj); } break; default : $msg = ERROR_EVENT_404 . $request[BROKER_REQUEST]; $conMsg = $msg; $aryRetData = buildReturnPayload([false, STATE_DOES_NOT_EXIST, $msg, null]); break; } } } catch (Throwable $t) { $hdr = sprintf(INFO_LOC, $file, __LINE__); @handleExceptionMessaging($hdr, $t->getMessage(), $foo, true); $aryRetData = buildReturnPayload([false, STATE_FRAMEWORK_FAIL, $t->getMessage(), $errorStack]); } // ensure we have a return-payload and a console message if (empty($aryRetData) and $postNormalResponse) { $msg = ERROR_NO_RET_DATA . '-' . __FILE__ . '-' . $request[BROKER_REQUEST]; $conMsg = BROKER_QUEUE_M . ' - ' . $msg; $aryRetData = buildReturnPayload([false, STATE_FRAMEWORK_FAIL, null, $msg, null]); } elseif ($eventSuccess and empty($conMsg)) { $callBackLog->warn(ERROR_NO_CON_MSG); $conMsg = $request[BROKER_REQUEST] . ' - ' . STATE_SUCCESS; } // prepare and send the return payload if we've not already sent it... if ($postNormalResponse) postResponse($aryRetData, $_request, $callBackLog, $res); // if the event processing failed, reject the message, otherwise ack removing it from the queue // todo: core-452: publish the event payload to the sysEvent broker to capture the failed event consoleLog($res, (($eventSuccess) ? CON_SUCCESS : CON_ERROR), $conMsg . sprintf(ERROR_EVENT_COUNT,$requestCounter, $myRequestsPerInstance)); unset($msg); // publish event metrics if we've toggled the switch on if ($eventTimer) { // get the broker-event processing time $eventTime = gasStatic::doingTime($startTime); $data = [ SYSTEM_EVENT_NAME => SYSEV_NAME_EVENT_TIMER, SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER, SYSTEM_EVENT_BROKER_ROOT_GUID => $groot, SYSTEM_EVENT_BROKER_GUID => $childGUID, DB_EVENT_GUID => $eventGUID, SYSTEM_EVENT_COUNT => $requestCounter, SYSTEM_EVENT_COUNT_TOTAL => $myRequestsPerInstance, SYSTEM_EVENT_TIMER => $eventTime, SYSTEM_EVENT_BROKER_EVENT => $event, SYSTEM_EVENT_META_DATA => $request[BROKER_META_DATA], SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__ ]; if (!empty($childGUID)) $data[SYSTEM_EVENT_OGUID] = $childGUID; @postSystemEvent($data, $childGUID, $callBackLog); } // exit the child if we've reached the request limit if ($requestCounter >= $myRequestsPerInstance) { if (getmypid() == $thisPid) { $meta = [ META_SESSION_IP => STRING_SESSION_HOME, META_SESSION_DAEMON => 1, META_SESSION_MISC => INFO_BROKER_RECYCLE, META_EVENT_GUID => $eventGUID ]; $data = [ SYSTEM_EVENT_NAME => SYSEV_NAME_BROKER_RECYCLE, SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER, SYSTEM_EVENT_BROKER_ROOT_GUID => $groot, SYSTEM_EVENT_BROKER_GUID => $childGUID, DB_EVENT_GUID => $eventGUID, SYSTEM_EVENT_START => $startingMemory, SYSTEM_EVENT_PEAK => memory_get_peak_usage(true), SYSTEM_EVENT_END => memory_get_usage(true), SYSTEM_EVENT_BROKER_EVENT => $event, SYSTEM_EVENT_COUNT => $requestCounter, SYSTEM_EVENT_COUNT_TOTAL => $myRequestsPerInstance, SYSTEM_EVENT_META_DATA => $meta, SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__ ]; @postSystemEvent($data, $eventGUID, $callBackLog); } consoleLog($res, CON_SYSTEM, INFO_BROKER_REQ_COUNT); if (is_object($brokerChannel)) $brokerChannel->close(); if (is_object($brokerConnection)) $brokerConnection->close(); exit(0); } }; consoleLog($res, CON_SYSTEM, sprintf(INFO_BROKER_QUEUE_ESTABLISHED, BROKER_QUEUE_WH, $thisPid, $myRequestsPerInstance)); $brokerChannel->basic_qos(null, 1, null); $brokerChannel->basic_consume($queue, '', false, false, false, false, $callback); while (count($brokerChannel->callbacks)) { try { $brokerChannel->wait(); } catch (Throwable $t) { $hdr = sprintf(INFO_LOC, $file, __LINE__); @handleExceptionMessaging($hdr, $t->getMessage(), $foo, true); } } break; case 1 : // parent // does nothing break; } return($thisPid); } for ($numBrokers = 0; $numBrokers < $runningBrokers; $numBrokers++) { $childrenPidList[] = forkMe(); } consoleLog($res, CON_SUCCESS, sprintf(INFO_BROKER_PARENT_STARTED, count($childrenPidList), BROKER_QUEUE_WH)); // "register" the broker instantiation event $data = [ SYSTEM_EVENT_NAME => SYSEV_NAME_GROOT_REG, SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER, SYSTEM_EVENT_BROKER_ROOT_GUID => $groot, SYSTEM_EVENT_KEY => STRING_NUMBER_CHILDREN, SYSTEM_EVENT_VAL => $numberChildren, SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__, SYSTEM_EVENT_NOTES => BROKER_SYSEV_REG . rtrim($res, ": ") ]; @postSystemEvent($data, $groot, $parentLog); // the parent process continues to run, waking-up every second to monitor it's children... // when a child dies, it's death-rattle is caught and the child is replaced with a new process. while (count($childrenPidList)) { $lastPid = 0; $newPidList = null; $result = pcntl_waitpid(0, $status); // detect any sigchld from the parent-group if (in_array($result, $childrenPidList)) { $key = array_search($result, $childrenPidList); array_splice($childrenPidList, $key, 1); // process has already exited -- restart it $childrenPidList[] = forkMe(); } }