<?php
/**
 * brokerTemplate.txt
 *
 * This is the template file for brokers.  It holds all the PHP code to create a new broker client/service - all you
 * need to do is configure the broker instance to be unique to all the other already-existing brokers, and to add
 * the event handlers.
 *
 * Speaking of, the template comes with the default event handlers for ping and shutdown.  Comments, such as this one,
 * are added to key places in the code to alert you to lines that should be modified and suggestions, where possible
 * for the range of inputs available.
 *
 * Please observe the Namaste coding standards when adding comments and comment blocks as your comments will be used
 * to create Namaste system documentation for other programmers.
 *
 * @author  mike@givingassistant.org     <--- todo change/update to your email
 * @version 1.0                          <--- todo in what namaste version did this module first appear?
 *
 * HISTORY:
 * ========
 * 01-28-20     mks     DB-144: original coding
 *
 */
use /** @noinspection PhpUnusedAliasInspection */ PhpAmqpLib\Connection\AMQPStreamConnection;
use /** @noinspection PhpUnusedAliasInspection */ PhpAmqpLib\Channel\AMQPChannel;
use /** @noinspection PhpUnusedAliasInspection */ PhpAmqpLib\Message\AMQPMessage;
use /** @noinspection PhpUnusedAliasInspection */ PhpAmqpLib\Exception\AMQPTimeoutException;

pcntl_async_signals(true);                                      // enable asynchronous signal handling (PHP 7.1)
$myPid = getmypid();
$_REDIRECT = true;
$topDir = dirname(__DIR__);
$thisWatcher = basename(__FILE__);
$thisWatcher = rtrim($thisWatcher, ".php");

// load the framework
@require_once($topDir . '/config/sneakerstrap.inc');                  // can't be constants b/c this loads the constants
$res = 'XXXX: ';    // todo <-- change this 4-char field to something unique for the console log

// todo ------------------------------------------------------------------------------------------------------------------------------------
// todo -- if your broker requires it's own configuration section (example here uses the migration section, then
// todo -- you'll need to add a relevant section to the XML configuration -- otherwise, delete this section
// before we do anything, ensure we have a "migration" section in the configuration
if (!array_key_exists(CONFIG_MIGRATION, gasConfig::$settings)     // todo <--- change CONFIG_MIGRATION
        or empty(gasConfig::$settings[CONFIG_MIGRATION])          // todo <--- change CONFIG_MIGRATION
        or !is_array(gasConfig::$settings[CONFIG_MIGRATION])) {   // todo <--- change CONFIG_MIGRATION
    // XML config for migration is not loaded or is empty or malformed - exit immediately
    consoleLog($res, CON_SYSTEM, ERROR_CONFIG_RESOURCE_404 . STRING_MIGRATION_CONFIG);  // todo <--- change STRING_MIGRATION_CONFIG
    exit(1);
}
// todo ------------------------------------------------------------------------------------------------------------------------------------

$childrenPidList = null;
$pidDir = $topDir . DIR_PIDS;
$eos = (isset($_SERVER['HTTP_USER_AGENT'])) ? '<br />' : PHP_EOL;
// event management for children
$appServerConfig = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_BROKER_APPSERVER];  // todo <--- change CONFIG_BROKER_APPSERVER
$numberChildren = $appServerConfig[CONFIG_BROKER_INSTANCES][CONFIG_BROKER_M_BROKER];       // todo <--- change CONFIG_BROKER_M_BROKER
$requestsPerInstance = (empty($appServerConfig[CONFIG_BROKER_REQUEST_LIMIT])) ? NUMBER_C : $appServerConfig[CONFIG_BROKER_REQUEST_LIMIT];
$numberChildren = ($numberChildren < 1) ? 1 : $numberChildren;  // todo -- should this be = 2??
$runningBrokers = $numberChildren;
$requestCounter = 0;
$myRequestsPerInstance = 0;
$startingMemory = 0;
// create the root guid
$groot = rtrim($res, COLON) . UDASH . guid();                 // root guid
consoleLog($res, CON_SUCCESS, sprintf(INFO_BROKER_STARTUP, substr(basename(__FILE__), 0, -4), $groot));

/** @var gacErrorLogger $parentLog */
$parentLog = new gacErrorLogger();

// todo - validate the broker environment as declared in the XML config

// get the location of the broker is supposed to be run
$brokerLocation = ENV_PRIME;    // todo <--- change the environment
if (!empty($argv) and !empty($argv[1])) {
    $brokerLocation = $argv[1];
}
$errors = null;
$file = rtrim(basename(__FILE__), DOT . FILE_TYPE_PHP);
$service = ENV_ADMIN;

if (!registerService($service)) {
    $hdr = sprintf(INFO_LOC, $file, __LINE__);
    $msg = sprintf(ERROR_SERVICE_REG, $file, $service);
    $parentLog->fatal($hdr . $msg);
    $parentLog->__destruct();
    unset($parentLog);
    exit(1);
}

//////////////////////////////////////////////////////////////////////////////////
// set-up the replacement signal handler that will be called on a child's death //
//////////////////////////////////////////////////////////////////////////////////
// declare( ticks = 1);
function sigHandler($_sig) {
    global $numberChildren;
    switch ($_sig) {
        case SIGCHLD :
            $numberChildren--;
            while (($pid = pcntl_wait($_sig, WNOHANG)) > 0) {
                @pcntl_wexitstatus($_sig);
            }
            break;
    }
}
pcntl_signal(SIGCLD, 'sigHandler');

/////////////////////////////////////////////////////////////////////////////////////////
// set-up the forking function so that it can be called initially or on a SIGCLD event //
/////////////////////////////////////////////////////////////////////////////////////////
function forkMe()
{
    global $thisWatcher, $eos, $res, $parentLog, $requestsPerInstance, $startingMemory, $myRequestsPerInstance, $groot;
    $startingMemory = memory_get_usage(true);
    $myRequestsPerInstance = $requestsPerInstance + (mt_rand(0, 2) * 10) + mt_rand(1, 9);
    $thisPid = pcntl_fork();

    switch ($thisPid) {
        case -1 :   // error
            $cmsg = ERROR_FORK_FAILED . $thisWatcher;
            $parentLog->fatal($cmsg);
            die(getDateTime() . CON_ERROR . $res . $cmsg . $eos);
            break;
        case 0 :    // child (broker daemon)
            // replace the sigcld signal handler
            pcntl_signal(SIGCLD, SIG_DFL);
            $thisPid = getmypid();

            // create the child logger object
            /** @var gacErrorLogger $childLog */
            $childLog = new gacErrorLogger();

            // generate a child guid for the forked child...
            $childGUID = rtrim($res, COLON) . UDASH . guid();

            // toss the childGUID unto cache because it does not propagate down to the callback method
            gasCache::sysAdd(($groot . UDASH . $thisPid), $childGUID);

            $queueTag = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_BROKER_QUEUE_TAG];
            $queue = $queueTag . BROKER_QUEUE_TBD; // todo <--- change BROKER_QUEUE_TBD

            /** @var AMQPStreamConnection $brokerConnection */
            $brokerConnection = gasResourceManager::fetchResource(RESOURCE_BROKER);
            if (is_null($brokerConnection)) {
                $childLog->fatal(ERROR_RESOURCE_404 . RESOURCE_BROKER);
                consoleLog($res, CON_ERROR, ERROR_RESOURCE_404 . RESOURCE_BROKER);
                exit(1);                                                    // shell-script exit value for fail
            }
            $brokerChannel = $brokerConnection->channel();
            try {
                // params:  queue name, passive, durable, exclusive, auto-delete
                $brokerChannel->queue_declare($queue, BROKER_QUEUE_DECLARE_PASSIVE, false, false, true);
            } catch (PhpAmqpLib\Exception\AMQPRuntimeException | Throwable $e) {
                $childLog->fatal($e->getMessage());
                consoleLog($res, CON_ERROR, ERROR_BROKER_QUEUE_DECLARE . $queue);
                exit(1);
            }

            // register the child-spawn event
            $data = [
                SYSTEM_EVENT_NAME => SYSEV_NAME_CHILD_REG,
                SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
                SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
                SYSTEM_EVENT_BROKER_GUID => $childGUID,
                SYSTEM_EVENT_KEY => SYSEV_CHILD_RPI,
                SYSTEM_EVENT_VAL => $myRequestsPerInstance,
                SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__
            ];
            @postBrokerEvent($data, $childGUID, $childLog);
            // todo -- add a broker name to this event so we know which broker is registering

            register_shutdown_function(BROKER_SHUTDOWN_FUNCTION, $brokerChannel, $brokerConnection, $res);
            $callback = function($_request)
            {
                $startTime = gasStatic::doingTime();
                /** @var AMQPChannel $brokerChannel */
                global $brokerChannel;
                /** @var AMQPStreamConnection $brokerConnection */
                global $brokerConnection;
                global $requestCounter, $res, $eos, $myRequestsPerInstance, $startingMemory, $groot, $service;
                $_request[STRING_SERVICE] = $service;
                $event = BROKER_QUEUE_TBD . '(';   // todo <--- change BROKER_QUEUE_TBD
                $requestCounter++;
                $aryRetData = null;
                $retData = null;
                $errorStack = [];
                $request = null;
                $eos = (isset($_SERVER['HTTP_USER_AGENT'])) ? '<br />' : PHP_EOL;
                $eventSuccess = false;
                $conMsg = '';
                $eventGUID = guid();
                $thisPid = getmypid();
                $eventTimer = false;   // certain events will toggle to true to log timer recording for the broker event
                $childGUID = gasCache::sysGet(($groot . UDASH . getmypid()));

                // set-up the call-back logger
                /** @var gacErrorLogger $callBackLog */
                $callBackLog = new gacErrorLogger($eventGUID);

                try {
                    if (!firstPassPayloadValidation($_request, $msg, $request, $eventGUID)) {
                        $conMsg = $msg;
                        $callBackLog->info($msg);
                        $aryRetData = buildReturnPayload([false, STATE_FAIL, null, $msg, null]);
                        $event .= ERROR_DATA_VALIDATION_FIRST_PASS . ')';
                    } elseif (!validateMetaData($request, $errorStack)) {
                        for ($index = 0, $last = count($errorStack); $index < $last; $index++) {
                            $conMsg .= $errorStack[$index] . $eos;
                            $callBackLog->error($errorStack[$index]);
                        }
                        $conMsg = rtrim($conMsg, $eos);
                        $aryRetData = buildReturnPayload([false, STATE_META_ERROR, $errorStack, null, null]);
                        $event .= ERROR_META_VALIDATION_SECOND_PASS . ')';
                    } else {
                        $event .= $request[BROKER_REQUEST] . ')';
                        if (is_null($request)) {
                            consoleLog($res, CON_ERROR, ERROR_REQUEST_404);
                        }

                        switch ($request[BROKER_REQUEST]) {
                            case BROKER_REQUEST_SHUTDOWN :
                                /** @noinspection PhpUndefinedFieldInspection PhpUndefinedMethodInspection */
                                $_request->delivery_info[BROKER_CHANNEL]->basic_cancel($_request->delivery_info[BROKER_DELIVERY_TAG]);
                                $conMsg = SUCCESS_SHUTDOWN;
                                $aryRetData = buildReturnPayload([true, STATE_SUCCESS, null, BROKER_REQUEST_SHUTDOWN, null]);
                                $eventSuccess = true;
                            break;

                            // test broker responsiveness
                            case BROKER_REQUEST_PING :
                                $conMsg = SUCCESS_PING . BROKER_QUEUE_TBD;   // todo <--- change BROKER_QUEUE_TBD
                                $aryRetData = buildReturnPayload([true, STATE_SUCCESS, null, (SUCCESS_PING . BROKER_QUEUE_TBD), null]);  // todo <--- change BROKER_QUEUE_M
                                $eventSuccess = true;
                            break;

                            // todo <--- your events for this broker start here

                            default :
                                $msg = ERROR_EVENT_404 . $request[BROKER_REQUEST];
                                $conMsg = $msg;
                                $aryRetData = buildReturnPayload([false, STATE_DOES_NOT_EXIST, $msg, null]);
                            break;
                        }
                    }
                } catch (Throwable $t) {
                    consoleLog($res, CON_SYSTEM, $t->getMessage());
                    $callBackLog->fatal($t->getMessage());
                    $aryRetData = buildReturnPayload([false, STATE_FRAMEWORK_FAIL, $t->getMessage(), $errorStack]);
                }

                // ensure we have a return-payload and a console message
                if (empty($aryRetData)) {
                    $msg = ERROR_NO_RET_DATA . '-' . __FILE__ . '-' . $request[BROKER_REQUEST];
                    $conMsg = BROKER_QUEUE_M . ' - ' . $msg;
                    $aryRetData = buildReturnPayload([false, STATE_FRAMEWORK_FAIL, null, $msg, null]);
                } elseif ($eventSuccess and empty($conMsg)) {
                    $callBackLog->warn(ERROR_NO_CON_MSG);
                    $conMsg = $request[BROKER_REQUEST] . ' - ' . STATE_SUCCESS;
                }

                // prepare the return payload...
                /** @noinspection PhpUndefinedMethodInspection */
                $msg = new AMQPMessage(gzcompress(json_encode($aryRetData)), array(BROKER_CORRELATION_ID => $_request->get(BROKER_CORRELATION_ID)));
                try {
                    /** @noinspection PhpUndefinedMethodInspection */
                    $_request->delivery_info[BROKER_CHANNEL]->basic_publish($msg, '', $_request->get(BROKER_REPLY_TO));
                    /** @noinspection PhpUndefinedMethodInspection */
                    $_request->delivery_info[BROKER_CHANNEL]->basic_ack($_request->delivery_info[BROKER_DELIVERY_TAG]);
                } catch (PhpAmqpLib\Exception\AMQPTimeoutException |
                         PhpAmqpLib\Exception\AMQPRuntimeException |
                         Throwable $e) {
                    $logMsg = ERROR_BROKER_EXCEPTION . $e->getMessage();
                    $callBackLog->fatal($logMsg);
                    consoleLog($res, CON_ERROR, $logMsg);
                }

                // if the event processing failed, reject the message, otherwise ack removing it from the queue
                // todo: core-452: publish the event payload to the sysEvent broker to capture the failed event

                consoleLog($res, (($eventSuccess) ? CON_SUCCESS : CON_ERROR), $conMsg . sprintf(ERROR_EVENT_COUNT, $requestCounter, $myRequestsPerInstance));
                unset($msg);

                // publish event metrics if we've toggled the switch on
                if ($eventTimer) {
                    // get the broker-event processing time
                    $eventTime = gasStatic::doingTime($startTime);
                    $data = [
                        SYSTEM_EVENT_NAME => SYSEV_NAME_EVENT_TIMER,
                        SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
                        SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
                        SYSTEM_EVENT_BROKER_GUID => $childGUID,
                        DB_EVENT_GUID => $eventGUID,
                        SYSTEM_EVENT_COUNT => $requestCounter,
                        SYSTEM_EVENT_COUNT_TOTAL => $myRequestsPerInstance,
                        SYSTEM_EVENT_TIMER => $eventTime,
                        SYSTEM_EVENT_BROKER_EVENT => $event,
                        SYSTEM_EVENT_META_DATA => $request[BROKER_META_DATA],
                        SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__
                    ];
                    if (!empty($childGUID)) $data[SYSTEM_EVENT_OGUID] = $childGUID;
                    @postBrokerEvent($data, $childGUID, $callBackLog);
                }

                // exit the child if we've reached the request limit
                if ($requestCounter >= $myRequestsPerInstance) {
                    if (getmypid() == $thisPid) {
                        $meta = [
                            META_SESSION_IP => STRING_SESSION_HOME,
                            META_SESSION_DAEMON => 1,
                            META_SESSION_MISC => INFO_BROKER_RECYCLE,
                            META_EVENT_GUID => $eventGUID
                        ];
                        $data = [
                            SYSTEM_EVENT_NAME => SYSEV_NAME_BROKER_RECYCLE,
                            SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
                            SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
                            SYSTEM_EVENT_BROKER_GUID => $childGUID,
                            DB_EVENT_GUID => $eventGUID,
                            SYSTEM_EVENT_START => $startingMemory,
                            SYSTEM_EVENT_PEAK => memory_get_peak_usage(true),
                            SYSTEM_EVENT_END => memory_get_usage(true),
                            SYSTEM_EVENT_BROKER_EVENT => $event,
                            SYSTEM_EVENT_COUNT => $requestCounter,
                            SYSTEM_EVENT_COUNT_TOTAL => $myRequestsPerInstance,
                            SYSTEM_EVENT_META_DATA => $meta,
                            SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__
                        ];
                        @postBrokerEvent($data, $eventGUID, $callBackLog);
                    }
                    consoleLog($res, CON_SYSTEM, INFO_BROKER_REQ_COUNT);
                    if (is_object($brokerChannel)) $brokerChannel->close();
                    if (is_object($brokerConnection)) $brokerConnection->close();
                    exit(0);
                }
            };
            consoleLog($res, CON_SYSTEM, sprintf(INFO_BROKER_QUEUE_ESTABLISHED, BROKER_QUEUE_TBD, $thisPid, $myRequestsPerInstance));  // todo <--- change BROKER_QUEUE_TBD
            $brokerChannel->basic_qos(null, 1, null);
            $brokerChannel->basic_consume($queue, '', false, false, false, false, $callback);
            while (count($brokerChannel->callbacks)) {
                $brokerChannel->wait();
            }
            break;

        case 1 :    // parent
            ;       // does nothing
            break;
    }
    return($thisPid);
}

for ($numBrokers = 0; $numBrokers < $runningBrokers; $numBrokers++) {
    $childrenPidList[] = forkMe();
}
consoleLog($res, CON_SUCCESS, sprintf(INFO_BROKER_PARENT_STARTED, count($childrenPidList), BROKER_QUEUE_TBD));  // todo <--- change BROKER_QUEUE_TBD

// "register" the broker instantiation event
$data = [
    SYSTEM_EVENT_NAME => SYSEV_NAME_GROOT_REG,
    SYSTEM_EVENT_TYPE => SYSEV_TYPE_BROKER,
    SYSTEM_EVENT_BROKER_ROOT_GUID => $groot,
    SYSTEM_EVENT_KEY => STRING_NUMBER_CHILDREN,
    SYSTEM_EVENT_VAL => $numberChildren,
    SYSTEM_EVENT_CODE_LOC => basename(__FILE__) . COLON . __LINE__,
    SYSTEM_EVENT_NOTES => BROKER_SYSEV_REG . rtrim($res, ": ")
];
@postBrokerEvent($data, $groot, $parentLog);

// the parent process continues to run, waking-up every second to monitor it's children...
// when a child dies, it's death-rattle is caught and the child is replaced with a new process.
while (count($childrenPidList)) {
    $lastPid = 0;
    $newPidList = null;
    $result = pcntl_waitpid(0, $status);        // detect any sigchld from the parent-group
    if (in_array($result, $childrenPidList)) {
        $key = array_search($result, $childrenPidList);
        array_splice($childrenPidList, $key, 1);
        // process has already exited -- restart it
        $childrenPidList[] = forkMe();
    }
}