Files
namaste/classes/gacMigrations.class.inc
gramps 373ebc8c93 Archive: Namaste PHP AMQP framework v1.0 (2017-2020)
952 days continuous production uptime, 40k+ tp/s single node.
Original corpo Bitbucket history not included — clean archive commit.
2026-04-05 09:49:30 -07:00

3645 lines
179 KiB
PHP

<?php
/**
* gacMigrations -- class file
*
* this class is a new Namaste class and is intended for use only in the backend migration operation where we'll
* migrate all data from one database table or collection to a Namaste database table or collection.
*
* As of 3/25/18 the class also supports warehousing operations which is meant to import records either internal or
* external (to namaste) and move them into COOL or COLD storage. Cold storage is not implemented at this time until
* a schema is determined -- assuming we're writing to SaaS cold-storage, a new API will have to be integrated to
* handle COLD (and COLD->WARM) requests.
*
* Since the source is accessing, or has the capacity to access, production-level data, we need to create a new class
* outside of the Namaste framework that limits the capability of the class to read-only operations as a function of
* security and protecting against (un)intentional destructive queries against the source.
*
* The source data must be defined in the env.xml file - NOT in the namaste.xml file.
*
* The destination object is instantiated as a Namaste class since we'll be using the vetted procedures to move the
* source data into (potentially) a production database. In order to successfully migrate data into Namaste, the
* destination template file must have a pre-defined section ("migrationMap") defined that maps the source columns
* to the destination columns, including data-types (casting to a new type may happen because different schemas)
*
* All of the migration functionality will be initiated via this class and invoked by only by the broker event.
*
* Note that you also have the option to migrate data within the same schema.
*
* Remember -- the point of migration is to import data (all the records) from one table in a database defined
* externally to Namaste, to a database that is internal to Namaste.
*
* Supported Migrations:
* ---------------------
* 1. mysql -> mongo
* 2. mongo -> mysql
* 3. mysql -> mysql
* 4. mongo -> mongo
*
* Supported Warehousing:
* ----------------------
* 1. mysql (external) -> COOL
* 2. mysql (namaste) -> COOL
*
* To be completed:
* ----------------
* 1. mongo (external) -> COOL
* 2. mongo (namaste) -> COOL
* 3. COLD (all)
* 4. WARM (all)
*
* Note:
* -----
* Source data is accessed using non-destructive queries only for both migration and warehousing!
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* HISTORY:
* ========
* 01-24-18 mks _INF-139: original coding
* 03-22-18 mks CORE-852: mysql destination coding
* 05-25-18 mks _INF-188: completed code update for data warehousing
* 07-31-18 mks CORE-774: PHP7.2 exception handling
* 01-07-20 mks DB-150: PHP7.4 class member type casting
*
*/
class gacMigrations
{
// data is migrated by making calls to the respective objects for the source and destination data.
// public $objSource = null; // pointer to the source class object
public string $state; // a STATE container
public bool $status; // Boolean status container
public array $errorStack; // container for the class error-stack
public string $migrationReport; // container for the migration results report
private bool $testMode = false; // must be over-ridden by broker payload
private array $config; // container for the configuration XML
private array $allowedSchemas = [ // authority container for valid schemas
CONFIG_SCHEMA_MYSQL,
CONFIG_SCHEMA_MONGO
];
private array $brokerData; // container for a copy of the broker request data
private string $res = 'MGRA: '; // resource identifier for console logging
private object $resRemote; // pointer to the remote database resource
private bool $remoteAvailable = false; // quick indicator that confirms remote source availability
private object $widget; // pointer to the gacFactory destination widget
private string $schema; // identifies, by name, the source schema
private string $destinationSchema; // identifies, by name, the destination schema
private int $recordsInSourceTable; // count of the total # records in the source table
private array $mysqlSourceSchema; // used to store output from "desc table" mysql command
private array $mysqlFieldList; // list of mysql source field names
private array $mongoSourceSchema; // used to store mongo schema for the source table
private array $migrationMap; // stores the migration map found in the namaste source template
private string $migrationSortKey; // stores the sort-key that we'll sort SOURCE data by
private array $migrationStatusKV; // stores the status-key for the SOURCE data for soft-deletes
private int $coverage; // coverage as a percent of SOURCE data that maps successfully
private bool $filterSoftDeletes = false; // toggle for importing soft-deleted records
private gacMongoDB $objMigrate; // pointer to the Migration class object (system object)
public object $objWarehouseMeta; // container for the Warehouse class object (system object)
public object $objWarehouse; // container for the warehouse class object (storage object)
private array $migratedFields; // list of columns that will be migrated from SOURCE -> DEST
private array $omittedFields; // list of columns dropped in SOURCE during migration
private string $lastRecordWritten; // json-encoded string of the last record written
private int $droppedRecords = 0; // count of the number of records dropped during mapping
private int $recordLimit = 0; // XML config value = max num records fetched from SOURCE
private string $migrateStatus; // promoted objMigrate status value
private int $recordCount = 0; // counts the total number of records fetched from SOURCE
private array $nQuery; // container for the namaste query (array)
private string $query; // text container for query string sans skip-limit
private array $meta = []; // container for the broker event meta data
private gacErrorLogger $logger; // error logging object
private string $whereClause = ''; // wh where clause built during validation (1st pass)
private array $whSettings; // container for template warehouse array (settings)
private bool $remoteSource = false; // for WH, set to true if source is remote to Namaste\
private string $whSourceURI = ''; // for WH, the URI of the remote source resource if required
// I'm setting this to contain either the mongo or mysql configuration but, as of now, it's not in-use or accessed
private array $remoteSourceConfig; // container for the remote source configuration
private array $queryData; // container for remote date (query results)
private bool $noData = false; // indicator if a query successfully exec'd but ret'd no data
private string $eventType; // tells the current instantiation if WH or MIG request
private bool $isWebRequest = false; // determines if output is for CLI or HTML
// meta data payload for WH -> admin requests
private array $adminWHMetaData = [
META_CLIENT => CLIENT_SYSTEM,
META_TEMPLATE => TEMPLATE_CLASS_WAREHOUSE,
META_DONUT_FILTER => 1
];
// migration report output headers
private string $TestHeader =
'=================================================
= T E S T M O D E A C T I V E =
=================================================';
private string $TestFooter =
'=================================================
= Test Mode Means: =
= ---------------- =
= No data was written to the destination table. =
= =
= Record fetch from source was limited to the =
= number of records specified in the migration =
= configuration. =
= The data dump is provided so you can validate =
= data migration and schema changes. =
=================================================';
private string $TestFooterWH =
'=================================================
= Test Mode Means: =
= ---------------- =
= No data was deleted from the source table. =
= =
= Data was migrated to the destination WH table =
= successfully, but the original data has been =
= left un-touched because the request submitted =
= mandated test mode. =
=================================================';
private string $TestBody =
'=================================================
= TEST MODE RECORD DUMP =
=-----------------------------------------------=
= Check records to ensure that all data mapped =
= in the destination template was covered, and =
= that all data maintained data type. Validate =
= injected fields such as timestamps and GUIDs =
= that are required by Namaste. =
= NOTE: tokens are not generated in test mode =
=================================================';
private string $TestBodyWH =
'=================================================
= TEST MODE INFORMATION =
=-----------------------------------------------=
= Check the destination WH table to ensure that =
= the correct number of records reported were =
= copied to the destination table. =
= Validate that injected fields (timestamps, =
= GUIDS, etc.) were added. =
=================================================';
//
/**
* gacMigrations constructor.
*
* the class constructor requires the broker payload as an input parameter to the method. Within this array
* parameter, the following fields should be defined and, if they're not, then the request to instantiate will
* fail:
*
* 1. the source schema
* 2. the source table name
* 3. the destination schema
* 4. the destination table name
* 5. the name of the source status field, if it exists
* 6. the value of the deleted field that indicates the record has been deleted
* 7. partial validation as a boolean - if true, then records that partially pass validation, as opposed to
* passing ALL validation, are migrated
* 8. soft-deletes -- if set to true, then records that are soft-deleted will be migrated
* 9. migrationTestMode - a boolean that, if true, limits the number of records returned from the source
* collection and output is returned as a payload instead of being written to the database.
*
* The remaining parameters are provided/passed as part of the broker-data payload. If they are omitted, then
* the defaults are assumed. Parameters can be overridden either by the passed-parameter list or by adding
* the appropriate fields in the $_brokerData payload (param 1).
*
* About Partials:
* ---------------
* If the partials option is set to true (if defaults to false), then a record will be migrated if any of the
* fields are validated. Fields that fail validation may be dropped depending on the destination schema so it's
* NOT RECOMMENDED that you enable this option when importing into a mysql-schema... Data that is rejected
* as migration candidates will be returned in a list of primary keys to the client identifying them as records
* that failed to migrate along with a total-count of the number of records that failed to migrate.
*
* If the $_partials is not overridden (false), then any record that fails the migration requirement will cause the
* entire request to fail. Identifying information about the failed record will be returned to the calling client.
*
* About Deleted:
* --------------
* Submitting a request with this variable set to Boolean(true) requires that the source-status-file parameter
* be set, defining the column name in the source table for the status field AND the value of delete column that
* defines a deleted record. (In most cases, this should be a string.)
*
* In order to migrate soft-deleted records, the following conditions must align:
* a. the deleted parameter is set to Boolean(true)
* b. the source-table deleted-field name is defined and included in the payload
* c. the source-table deleted-field VALUE is defined and included in the payload
* If all of these conditions are met, and a record is found that have the specified value in the specified column
* while the deleted parameter is set to true, then the source soft-deleted record will be migrated.
*
* Validation:
* -----------
* The following checks are immediately made:
*
* 1. That the source and destination schema and tables exist and are valid
* 2. That the destination template exists
* 3. That the destination template can be instantiated
* 4. That the destination template contains a migration map
* 5. That we can connect to the source DB (defined in gasConfig)
* 6. That we can retrieve a sample/example record from the source
*
* If all this succeeds, then we'll continue processing. The call to migrate data from one schema to another is
* pretty much a one-shot deal in terms of class instantiation and data access. Any processing or validation
* failure will cause processing to immediately cease and return an appropriate error message.
*
* If we're writing data to a schema that supports transactions, then all of the data writes will be under a single
* transaction so that if a processing failure is raised, then all of the data will be backed out of the destination
* table.
*
* If the destination table does not support transactions, then we'll keep a list, internally, of all the new
* GUIDs (primary key identifies) that were created and, prior to returning control to the calling client, all
* of these "new" records will be hard-deleted from the collection.
*
* Return Payload:
* ---------------
* Data that we will return to the calling client will include:
*
* 1. a general success or failure message
* 2. if the request failed, we'll return an error stack showing the root cause of the failure
* 3. if the request succeeded, the following data report will generated and returned:
* 3a. the source schema information
* 3b. the destination schema information
* 3c. the number of records in the source table
* 3d. the number of records successfully moved to the new schema
* 3e. if partials are allowed, the number of partial records migrated
* 3f. if partials are allowed, a list of the partials migrated...OR...
* if partials are not allowed, a list of the partials not migrated
* 3g. if deleted is false, the number of records not migrated b/c deleted status
* 3h. if deleted is false, a list of the deleted records not migrated
* 3i. the number of records in the source table prior to the operation
* 3j. the number of records in the source table after the operation
* 3k. the coverage percent of individual records -- number of columns mapped / total number of columns
*
*
* @param array|null $_brokerData the data payload as received by the broker
* @param array|null $_meta the meta-data payload as received by the broker
* @param string $_rt the request type (either EVENT_MIGRATE or EVENT_WH_COOL|COLD|WARM|HOT
*
* HISTORY:
* ========
* 01-24-18 mks _INF-139: original coding
* 03-22-18 mks CORE-852: mysql destination coding
* 04-10-18 mks _INF-188: introduced warehousing (cool) functionality
* 10-04-18 mks DB-43: updated for HTML webApp (reporting in HTML)
*
*/
public function __construct(array $_brokerData = [], array $_meta = null, $_rt = EVENT_MIGRATE)
{
register_shutdown_function(array($this, '__destruct'));
// todo -- validate the meta-client field (make it required) CORE-858
// todo -- timer event, system event CORE-859
// set-up class member variables
$this->errorStack = [];
$this->state = STATE_DATA_ERROR;
$this->status = false;
if (!isset(gasConfig::$settings[CONFIG_MIGRATION]) or (empty(gasConfig::$settings[CONFIG_MIGRATION]))) {
$this->errorStack[] = ERROR_CONFIG_RESOURCE_404 . STRING_MIGRATION_CONFIG;
return;
}
if (isset($_meta[BROKER_XML_DATA]) and is_array($_meta[BROKER_XML_DATA])) {
$this->config = $_meta[BROKER_XML_DATA];
$this->isWebRequest = true;
} else {
$this->config = gasConfig::$settings[CONFIG_MIGRATION];
}
try {
switch ($_rt) {
case EVENT_MIGRATE :
// validate payload data that's been pulled into class vars - return control to calling client if false value
if (!$this->validateMigrationData($_brokerData)) return;
$this->brokerData = $_brokerData;
$_meta[META_TEMPLATE] = $this->brokerData[MIGRATION_DEST_TABLE];
$this->meta = $_meta;
$this->eventType = STRING_MIGRATION;
$this->doMigration();
break;
case EVENT_WAREHOUSE :
$this->brokerData = $_brokerData;
$this->meta = $_meta;
$this->logger = new gacErrorLogger($this->meta[META_EVENT_GUID]);
$this->eventType = STRING_WAREHOUSING;
$this->startWHRequest();
break;
default :
$msg = ERROR_EVENT_404 . $_rt;
$this->errorStack[] = $msg;
consoleLog($this->res, CON_ERROR, $msg);
break;
}
} catch (Throwable $t) {
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available) {
$this->logger->error($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
}
}
/**
* whData() -- public method
*
* This method is invoked from the whBroker event once validation has successfully completed and we've returned
* the wh-guid back to the calling client.
*
* This method does a quick check to confirm that critical data hasn't been dropped and is still present. Once
* that check has been passed, we invoke a private method (moveWHData()) to perform the actual data move/copy
* request.
*
* the method requires no input parameters and returns a boolean value to indicate processing success.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @return bool
*
*
* HISTORY:
* ========
* 04-26-18 mks _INF-188: original coding
*
*/
public function whData(): bool
{
$error = null;
// ensure we still have all the generated data
if (!isset($this->widget) or empty($this->widget))
$error[] = ERROR_WH_MISSING_WIDGET;
if (!isset($this->objWarehouseMeta) or empty($this->objWarehouseMeta))
$error[] = ERROR_WH_MISSING_WH_OBJ;
if (!isset($this->whSettings) or empty($this->whSettings))
$error[] = ERROR_WH_MISSING_SETTINGS;
if (!isset($this->brokerData) or empty($this->brokerData))
$error[] = ERROR_WH_MISSING_BROKER_DATA;
if (!isset($this->meta) or empty($this->meta))
$error[] = ERROR_WH_MISSING_META_DATA;
if (!isset($this->whereClause) or empty($this->whereClause))
$error[] = ERROR_WH_MISSING_WHERE_CLAUSE;
if (!is_null($error)) {
$this->errorStack = array_merge($this->errorStack, $error);
$this->state = STATE_DATA_ERROR;
$this->status = false;
return false;
}
// step 1: get the count of the number of records to be moved -- this becomes our LCV
if (!$this->getWHRecCount()) return false;
// return immediately if no records in the source satisfy the WH query
if ($this->recordCount == 0) {
$this->errorStack[] = INFO_WH_NO_QUALIFIED_DATA;
$this->state = STATE_NOT_FOUND;
$this->status = false;
return false;
}
// NOTE: the count of the number of records to wh is stored in $recordCount
// step 2: instantiate the WH data class object
$whMD = $this->meta;
$whMD[META_TEMPLATE] = TEMPLATE_CLASS_WHC1_PROD_REG;
$whMD[META_CLIENT] = CLIENT_SYSTEM;
$whErrors = [];
try {
$tmpObj = new gacFactory($whMD, FACTORY_EVENT_NEW_CLASS, '', $whErrors);
} catch (Throwable $t) {
$this->errorStack[] = ERROR_THROWABLE_EXCEPTION;
$this->errorStack[] = $t->getMessage();
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($t->getMessage());
else
consoleLog($this->res, CON_ERROR, $t->getMessage());
$this->state = STATE_FRAMEWORK_WARNING;
$this->status = false;
return false;
}
if (!$tmpObj->status) {
$error = ERROR_TEMPLATE_INSTANTIATE . TEMPLATE_CLASS_WHC1_PROD_REG;
$this->errorStack[] = $error;
if (is_array($whErrors)) $this->errorStack = array_merge($this->errorStack, $whErrors);
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($error);
else
consoleLog($this->res, CON_ERROR, $error);
$this->state = STATE_CLASS_INSTANTIATION_ERROR;
$this->status = false;
return false;
}
$this->objWarehouse = $tmpObj->widget;
// set warehouse specific variables for create request
$this->objWarehouse->isWHRequest = true; // exceed record limit in broker payloads
$this->objWarehouse->whType = $this->brokerData[WH_TYPE]; // set the wh-type for resource management
$this->migrationMap = ($this->remoteSource) ? $this->objWarehouse->template->migrationMap : $this->widget->migrationConfig[STRING_MIGRATION_MAP];
if (is_object($tmpObj)) $tmpObj->__destruct();
unset($tmpObj, $whMD, $whErrors);
try {
// step 3: perform the fetch-put part of the warehousing request
if (!$this->processWHData()) return false;
// step 4: if not in test mode and source is not remote, then delete the source records
if (!isset($this->brokerData[MWH_TEST_MODE]) and !$this->remoteSource)
if (!$this->deleteWHSourceData()) return false;
// step 5: generate the WH report and save the meta data
$this->generateEventReport();
} catch (Throwable $t) {
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
return true;
}
/**
* processWHData() -- private method
*
* This method is called by the whData() method as one of a series of calls to process a WH request.
*
* There are no input parameters to the method because the relevent data has already been stored within the
* class members:
*
* $this->brokerData :: contains the original payload request
* $this->config :: pulled from the xml section: <migration> which should only be defined in the env.xml file
* $this->meta :: copy of the broker meta data
* $this->nQuery :: the namaste (array) query built during validation
* $this->objWarehouse :: instantiation of the admin WH class object
* $this->recordCount :: count of the number of records to be warehoused (LCV)
* $this->remoteAvailable :: boolean telling me if the remote resource is available
* $this->resRemote :: defines the API resource to the remote schema, if source is remote
* $this->schema :: defines the source schema
* $this->whereClause :: string containing the query where-clause in dest-schema form
* $this->whSettings :: destination class template settings for warehousing
* $this->whSourceURI :: string containing the (FQ) warehouse URI
* $this->widget :: instantiation of the destination class object
* $this->widget->whTemplate :: name of the destination (WH) template
*
* The method contains the control loop (number of total records to warehouse / recordsFetchedPerRequest)
* and will iterate until the count of the denominator exceeds the numerator.
*
* This method builds the fetch (source data) query remotely for mySQL or mongo repos, and calls a method
* which, in turn, queries the remote repos directly via the URI set-up in the XML config. If the data to
* be warehoused is Namaste sourced, then we'll build a query/fetch request and submit the request to the
* appServer (Namaste) service.
*
* NOTE:
* -----
* If the data is remote sourced, the destination template must contain a migration map!
*
* Once we've fetched the source data, (and, if the source is remote to namaste, then we've mapped the
* remote-source data to the namaste WH data), we batch-save all of the records retrieved.
*
* Next, we update the meta-data record by incrementing the relevant counters, setting various status fields,
* and then submitting a broker request to the admin service to update the WH meta record. (This is done so
* that, in the event of a processing failure, we can resume the wh process.)
*
* If the process completes successfully, then we close the wh meta record.
*
* The method returns a boolean value which indicates success (all of the WH data was successfully processed)
* or a false if any failure was encountered. The class maintains an error history structure which will
* eventually be returned to the calling client to assist with diagnostics.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @return bool
*
* HISTORY:
* ========
* 05-16-18 mks _INF-188: original coding
*
*/
private function processWHData(): bool
{
$numRecsProcessed = 0;
$rad = 0;
$numRecsSkipped = 0;
$whRecordLimit = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_WH_RECS_PER_XFER];
try {
// ensure that we set the meta-override to the value of the record limit fetch (defined in XML)
if (!$this->widget->addMeta(META_LIMIT_OVERRIDE, $whRecordLimit)) {
$error = ERROR_DATA_META_REJECTED . META_LIMIT_OVERRIDE;
$this->errorStack[] = $error;
if ($this->widget->debug) $this->logger->debug($error);
$this->state = STATE_META_ERROR;
return false;
}
} catch (TypeError $t) {
$msg = ERROR_TYPE_EXCEPTION . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->error($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
// set warehouse specific variables for create request
$this->widget->isWHRequest = true; // exceed record limit in broker payloads
$this->widget->whType = $this->brokerData[WH_TYPE]; // set the wh-type for resource management
// main control loop for fetching data from the source
while ($numRecsProcessed < $this->recordCount) {
if ($this->remoteSource) {
// source data is remote - data can be directly accessed
if ($this->schema == CONFIG_SCHEMA_MYSQL) {
$query = 'SELECT /* ' . basename(__FILE__) . COLON . __LINE__ . ' */ *';
$query .= ' FROM ' . $this->brokerData[WH_REMOTE_TABLE];
$query .= ' WHERE ' . $this->whereClause;
$query .= ' LIMIT ' . $numRecsSkipped . ', ' . $whRecordLimit;
try {
if (!$this->issueRemoteSQLQuery($query)) {
// remote query failed to execute
$this->errorStack[] = ERROR_REMOTE_QUERY_FAIL;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn(ERROR_REMOTE_QUERY_FAIL);
else
consoleLog($this->res, CON_ERROR, ERROR_REMOTE_QUERY_FAIL);
$this->state = STATE_DB_ERROR;
return false;
}
} catch (TypeError $t) {
$error = ERROR_TYPE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $error;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($error);
else
consoleLog($this->res, CON_ERROR, $error);
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
} elseif ($this->schema == CONFIG_SCHEMA_MONGO) {
// todo -- write this stub
if (isset($this->logger) and $this->logger->available) {
$this->logger->error(INFO_SHOULD_NOT_SEE_THIS);
} else {
consoleLog($this->res, CON_ERROR, INFO_SHOULD_NOT_SEE_THIS);
}
}
// we have data - but it needs to be processed through the migration map
$tData = $this->doDataMap($this->queryData);
if (!is_null($tData)) {
$this->queryData = $tData;
$this->recordCount = count($this->queryData);
} else {
$this->errorStack[] = ERROR_MIGRATION_MAPPING_FAILED;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn(ERROR_MIGRATION_MAPPING_FAILED);
else
consoleLog($this->res, CON_ERROR, ERROR_MIGRATION_MAPPING_FAILED);
$this->state = STATE_DATA_ERROR;
return false;
}
} else {
$bc = null;
try {
// source data must be requested via namaste
$bc = new gacBrokerClient(BROKER_QUEUE_R, __METHOD__ . AT . __LINE__);
if (!$bc->status) {
$error = ERROR_BROKER_CLIENT_DECLARE . BROKER_QUEUE_R;
$this->errorStack[] = $error;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($error);
else
consoleLog($this->res, CON_ERROR, $error);
$this->state = STATE_CLASS_INSTANTIATION_ERROR;
if (is_object($bc)) {
$bc->__destruct();
unset($bc);
}
return false;
}
// build the request to the appServer broker
$request = [
BROKER_REQUEST => BROKER_REQUEST_FETCH,
BROKER_DATA => [
STRING_QUERY_DATA => $this->nQuery
],
BROKER_META_DATA => [
META_CLIENT => CLIENT_SYSTEM,
META_DONUT_FILTER => 1,
META_EVENT_GUID => $this->meta[META_EVENT_GUID],
META_TEMPLATE => $this->meta[META_TEMPLATE],
META_SKIP => $numRecsSkipped,
META_LIMIT_OVERRIDE => $whRecordLimit,
META_LIMIT => $whRecordLimit
]
];
$payload = gzcompress(json_encode($request));
$response = json_decode(gzuncompress($bc->call($payload)), true);
if (!$response[PAYLOAD_STATUS]) {
$error = sprintf(ERROR_BROKER_INTERNAL_CALL, BROKER_REQUEST_FETCH, BROKER_QUEUE_R);
$this->errorStack = array_merge($this->errorStack, $response[PAYLOAD_DIAGNOSTICS]);
$this->errorStack[] = $error;
if ($this->widget->debug) $this->logger->debug($error);
$this->state = $response[PAYLOAD_STATE];
if (is_object($bc)) {
$bc->__destruct();
unset($bc);
}
return false;
}
$this->queryData = $response[PAYLOAD_RESULTS][STRING_QUERY_RESULTS];
$this->recordCount = count($this->queryData);
} catch (Throwable $t) {
$error = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $error;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($error);
else
consoleLog($this->res, CON_ERROR, $error);
$this->state = STATE_FRAMEWORK_WARNING;
if (is_object($bc)) {
$bc->__destruct();
unset($bc);
}
return false;
}
if (is_object($bc)) {
$bc->__destruct();
unset($bc);
}
}
try {
// invoke the widget's class create method to write the records (locally) to the db
$this->objWarehouse->_createRecord($this->queryData, DATA_WHSE);
} catch (Throwable $t) {
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
if ($this->objWarehouse->status) {
// create succeeded -- update the meta record and record
$rowsInserted = $this->objWarehouse->recordsInserted;
$rowsDropped = $this->objWarehouse->recordsDropped;
// check the record count
if ($rowsDropped != ($this->recordCount - $rowsInserted)) {
$this->errorStack[] = ERROR_DATA_WH_INSERT_FAIL;
if ($this->widget->debug) $this->logger->debug(ERROR_DATA_WH_INSERT_FAIL);
$error = sprintf(ERROR_DATA_RECORD_COUNT, ($rowsDropped + $rowsInserted), $this->recordCount);
$this->errorStack[] = $error;
if ($this->widget->debug) $this->logger->debug($error);
$this->state = STATE_DATA_ERROR;
return false;
}
// inject WH'ing results the meta-data (WH) payload
try {
$rip = $this->objWarehouseMeta->getColumn(MWH_NUM_RECS_MOVED); // records in progress
if (is_null($rip)) $rip = 0;
$rip += $rowsInserted;
$rad = $this->objWarehouseMeta->getColumn(MWH_NUM_RECS_DROPPED); // records already dropped
if (is_null($rad)) $rad = 0;
$rad += $rowsDropped;
// create/update the wh meta record and the lcv
if (is_null($this->objWarehouseMeta->getColumn(DB_TOKEN))) {
// create the new wh meta record
if (!$this->updateWHMeta()) return false;
}
// we need to do an update payload for the wh-meta record update
$warehouseUpdateData = [
MWH_NUM_RECS_MOVED => $rip,
MWH_NUM_RECS_DROPPED => $rad,
MWH_LAST_REC_WRITTEN => json_encode($this->objWarehouse->getRecord($this->objWarehouse->count, false)),
DB_ACCESSED => time()
];
if (!$this->updateWHMeta($warehouseUpdateData)) return false;
$numRecsProcessed += $rip;
} catch (Throwable $t) {
$error = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $error;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($error);
else
consoleLog($this->res, CON_ERROR, $error);
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
} else {
try {
// the create failed - stop processing and save the meta WH record
$this->objWarehouseMeta->insertField(DB_STATUS, STATUS_FAILED);
$this->objWarehouseMeta->insertField(DB_ACCESSED, time());
$this->objWarehouseMeta->insertField(MWH_STOP_REASON, json_encode($this->errorStack));
@$this->updateWHMeta();
} catch (Throwable $t) {
$error = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $error;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($error);
else
consoleLog($this->res, CON_ERROR, $error);
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
$this->state = $this->objWarehouse->state;
$this->errorStack = array_merge($this->errorStack, $this->objWarehouse->eventMessages);
return false;
}
}
// close the wh meta record
try {
$warehouseUpdateData = [
DB_STATUS => STATUS_COMPLETE,
DB_ACCESSED => time()
];
@$this->updateWHMeta($warehouseUpdateData);
} catch (Throwable $t) {
$error = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $error;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($error);
else
consoleLog($this->res, CON_ERROR, $error);
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
$this->recordCount = $numRecsProcessed;
$this->droppedRecords = $rad;
return true;
}
/**
* getWHRecCount() -- private method
*
* this method is used to get the record count for the WH source query... in other words, we generate a count
* of the number of records that will be warehoused and assign that value to the member variable: $recordCount.
*
* The method requires no input parameters and returns a Boolean to indicate success or failure in processing.
*
* The main loop checks for the source schema and then checks to see if the source is external to Namaste. The
* query is build accordingly and submitted to the db classes for resolution.
*
* Any errors or exceptions-raised will cause a false value to be returned to the calling client.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @return bool
*
*
* HISTORY:
* ========
* 04-27-18 mks _INF-188: original coding
* 04-30-18 mks _INF-188: revised fetch sections for data local to namaste
*
*/
private function getWHRecCount(): bool
{
if (!$this->remoteSource) {
// this is a request internal to namaste -- WH methods exec on Segundo -- need to publish
// a broker event request to appServer (namaste) for the record count request.
$lTime = $this->brokerData[WH_FILTER_VALUES][0] . ' 00:00:00';
$rTime = $this->brokerData[WH_FILTER_VALUES][1] . ' 00:00:00';
// if the source data is mongo, convert to epoch time
$lTime = ($this->schema == STRING_MONGO) ? unPrettyTime($lTime) : $lTime;
$rTime = ($this->schema == STRING_MONGO) ? unPrettyTime($rTime) : $rTime;
// build the payload to fetch the query count for the WH query
$request = [
BROKER_REQUEST => BROKER_REQUEST_QUERY_COUNT,
BROKER_DATA => [
STRING_QUERY_DATA => [ DB_CREATED => [OPERAND_AND => [ OPERATOR_GT => [$lTime], OPERATOR_LT => [$rTime]]]]
],
BROKER_META_DATA => [
META_TEMPLATE => $this->meta[META_TEMPLATE],
META_CLIENT => CLIENT_SYSTEM,
META_EVENT_GUID => $this->meta[META_EVENT_GUID]
]
];
try {
// create a broker-request client to the namaste read-broker
$bc = new gacBrokerClient(BROKER_QUEUE_R, basename(__FILE__) . COLON_NS . __LINE__);
} catch (Throwable $t) {
$msg = ERROR_THROWABLE_EXCEPTION . $t->getMessage();
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
if (!$bc->status) {
$error = ERROR_BROKER_CLIENT_DECLARE . BROKER_QUEUE_R;
$this->errorStack[] = $error;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($error);
else
consoleLog($this->res, CON_ERROR, $error);
$this->state = STATE_CLASS_INSTANTIATION_ERROR;
return false;
}
// submit the request to the namaste read broker and process the return
$payload = gzcompress(json_encode($request));
$response = json_decode(gzuncompress($bc->call($payload)), true);
if (!$response[PAYLOAD_STATUS]) {
$this->errorStack = array_merge($this->errorStack, $response[PAYLOAD_DIAGNOSTICS]);
$this->state = $response[PAYLOAD_STATE];
return false;
}
try {
if (!$this->objWarehouseMeta->insertField(MWH_NUM_RECS_IN_QUERY, intval($response[PAYLOAD_RESULTS][0]))) {
$error = sprintf(ERROR_DATA_ADD_FAIL, MWH_NUM_RECS_SOURCE) . $this->objWarehouseMeta->class;
$this->errorStack[] = $error;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($error);
else
consoleLog($this->res, CON_ERROR, $error);
$this->state = STATE_DATA_ERROR;
return false;
}
if (!$this->objWarehouseMeta->insertField(MWH_NUM_RECS_SOURCE, intval($response[PAYLOAD_RESULTS][1]))) {
$error = sprintf(ERROR_DATA_ADD_FAIL, MWH_NUM_RECS_IN_QUERY) . $this->objWarehouseMeta->class;
$this->errorStack[] = $error;
if ($this->objWarehouseMeta->debug) $this->objWarehouseMeta->logger->debug($error);
$this->state = STATE_DATA_ERROR;
return false;
}
} catch (TypeError $t) {
$msg = ERROR_TYPE_EXCEPTION . AT . basename(__FILE__) . COLON_NS . __LINE__;
$this->errorStack[] = $msg;
$this->errorStack[] = $t->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
$this->logger->warn($t->getMessage());
} else {
consoleLog($this->res, CON_ERROR, $msg);
consoleLog($this->res, CON_ERROR, $t->getMessage());
}
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
$this->recordCount = $response[PAYLOAD_RESULTS][0];
$this->recordsInSourceTable = $response[PAYLOAD_RESULTS][1];
if (is_object($bc)) $bc->__destruct();
unset($bc);
} else {
if ($this->schema == STRING_MYSQL) {
if ($this->remoteSource) {
if (!isset($this->resRemote)) {
$error = sprintf(ERROR_REMOTE_RESOURCE_404, STRING_MYSQL);
$this->errorStack[] = $error;
if ($this->widget->debug) $this->logger->debug($error);
$this->state = STATE_DB_ERROR;
return false;
}
// if the source is remote (external to namaste) we can directly query
$query = 'SELECT /* ' . basename(__FILE__) . COLON . __LINE__ . ' */ COUNT(*) AS ' . STRING_NUM_RECS;
$query .= ' FROM ' . $this->brokerData[WH_REMOTE_TABLE];
$query .= ' WHERE ' . $this->whereClause;
$query2 = 'SELECT /* ' . __METHOD__ . AT . __LINE__ . ' */ count(*) AS ' . STRING_NUM_RECS . ' FROM ' . $this->brokerData[WH_REMOTE_TABLE];
$this->recordsInSourceTable = 0;
$this->recordCount = 0;
try {
if (!$this->issueRemoteSQLQuery($query, true, $this->recordCount)) {
$this->errorStack[] = ERROR_REMOTE_QUERY_FAIL;
$this->errorStack[] = STRING_QUERY . COLON . $query;
if ($this->widget->debug) {
$this->logger->debug(ERROR_REMOTE_QUERY_FAIL);
$this->logger->debug(STRING_QUERY . COLON . $query);
}
$this->state = STATE_DB_ERROR;
return false;
}
if (!$this->issueRemoteSQLQuery($query2, true, $this->recordsInSourceTable)) {
$this->errorStack[] = ERROR_REMOTE_QUERY_FAIL;
$this->errorStack[] = STRING_QUERY . COLON . $query2;
if ($this->widget->debug) {
$this->logger->debug(ERROR_REMOTE_QUERY_FAIL);
$this->logger->debug(STRING_QUERY . COLON . $query2);
}
$this->state = STATE_DB_ERROR;
return false;
}
} catch (Throwable $t) {
$msg = ERROR_TYPE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
}
} elseif ($this->schema == STRING_MONGO) { // todo -- this needs to be re-written to support $resRemote
try {
$this->recordCount = $this->widget->getQueryCount($this->widget->strQuery, (($this->remoteSource) ? $this->brokerData[WH_REMOTE_TABLE] : ''));
} catch (TypeError $t) {
$msg = ERROR_TYPE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
if ($this->recordCount == -1) {
$this->errorStack[] = ERROR_MDB_FETCH_COUNT_FAIL;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn(ERROR_MDB_FETCH_COUNT_FAIL);
else
consoleLog($this->res, CON_ERROR, ERROR_MDB_FETCH_COUNT_FAIL);
return false;
}
}
}
return true;
}
/**
* migrateData() -- private method
*
* This method handles the heavy lifting of pulling data from the source database, converting the data to the
* Namaste schema, and then writing the modified records out to the destination database.
*
* The method requires no input parameters and returns a Boolean value indicating success or failure in the
* overall migration.
*
* TEST MODE:
* ----------
* Test mode is supported in this function - if enabled in the meta payload, we'll do the following:
* -- limit the query to 10 records
* -- do not write output to the destination database
* -- do not close (mark as completed) the migration record
*
* Programmer Notes:
* -----------------
* The method, as written, is fairly limited in that it only accommodates data from a mySQL source to a mongoDB
* destination. The destination class is masked by virtue of it being a Namaste class but we're clearly
* handling it as a mongoDB object when we're fetching the results of the bulk-write. (Hint - overload this
* method name in the mySQL class.)
*
* The main processing happens in a simple loop where the number of processed records is compared to the number
* of records in the source table. The concern is that, for large tables, running out of memory during a
* sustained operation.
*
* As such, we're using the current (Migrations) class to post events to the admin service that keep track of the
* current migration in terms of the number of records processed. In case of an event crash, then we can resume
* processing because the migration class will look for the migration record on spin-up.
*
* Some considerations for the source query:
* -- evaluate the MIGRATION_TEST_MODE ($this->testMode) setting and limit the number of records from source
* to ten if we're in test-mode.
* -- we have the option for over-riding the record limit and will do so if the record limit value exceeds the
* Namaste record limit. Both are set in the framework XML.
* -- source query is ordered by sorting the data set based on the sort key defined in the migration template
* for the new class
*
* Once we've fetched the (first) data-set from source, we're going to map the source data to the destination
* schema using the migration map declared in the destination template. We'll also inject meta data such as the
* record GUID, created/accessed timestamps, etc.
*
* After we've converted the source data, we'll execute a bulk-write to the destination table. On successful
* return, update the migration record with the new processed count. If possible, also record the count of any
* records that may have been dropped due to processing or validation.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @return bool
*
*
* HISTORY:
* ========
* 02-15-18 mks INF-139: original code completed
* 03-23-18 mks CORE-852: adding mysql destination support
* 03-27-18 mks CORE-852: added the PDO processing for the migration report
* 09-01-18 mks DB-48: updated for new _createRecord() parameter that identifies data source
* 10-04-18 mks DB-43: support for the new webUI migration app: invoke the html migration report instead
*/
private function migrateData(): bool
{
// everything we need is already defined as a class member, except for the actual data to be moved
// pull data FROM mysql
if ($this->schema == STRING_MYSQL) {
$mappedResults = null;
if ($this->destinationSchema == STRING_MONGO) {
/** @var gacMongoDB $widget */
$widget = $this->widget;
} else {
/** @var gacPDO $widget */
$widget = $this->widget;
$widget->setMigrationBit(true);
}
$widget->setRecordLimit($this->recordLimit);
// build the query
$this->query = 'SELECT /* ' . basename(__FILE__) . COLON_NS . __LINE__ . ' */ * ';
$this->query .= 'FROM ' . $this->brokerData[MIGRATION_SOURCE_TABLE]. ' ';
// add a filter for soft-deleted messages
if (isset($this->brokerData[MIGRATION_FILTER_SOFT_DELETES]) and true === boolval($this->brokerData[MIGRATION_FILTER_SOFT_DELETES])) {
$statusKey = key($this->migrationStatusKV);
$statusValue = $this->migrationStatusKV[$statusKey];
$this->query .= 'WHERE ' . $statusKey . ' ';
if (is_numeric($statusValue)) {
$this->query .= '!= ' . $statusValue;
} else {
$this->query .= '<> "' . $statusValue . '"';
}
}
// add the sort discriminant
$this->query .= 'ORDER BY ' . $this->migrationSortKey . ' ASC ';
// start the main processing loop which is based on current vs. total record count in the source table
while ($this->recordCount < $this->recordsInSourceTable) {
// add the limit -- check if this is test mode, if so: use the test-mode record limit defined in XML
$limitString = 'LIMIT ' . $this->recordCount . ',';
$limitString .= ($this->testMode) ? $this->config[CONFIG_MIGRATION_NUM_TEST_RECS] : $this->recordLimit;
$results = null;
$query = $this->query . $limitString;
// populate migration data during first pass of processing
try {
if (!isset($this->data[MWH_QUERY])) {
if (!$this->objMigrate->insertField(MWH_QUERY, $query, 0)) {
// failed to add query to migrations payload
$msg = sprintf(ERROR_DATA_ADD_FAIL, STRING_QUERY, $this->objMigrate->class);
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->error($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
}
}
// exec the query
foreach ($this->resRemote->query($query) as $row) {
$results[] = $row;
}
$currentCount = count($results);
$this->recordCount += $currentCount;
// $this->skip += ($currentCount < $this->recordLimit) ? $currentCount : $this->recordLimit;
} catch (PDOException | Throwable $p) {
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_PDO_EXCEPTION . $this->query);
$this->errorStack[] = $msg;
$this->errorStack[] = $p->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
$this->logger->warn($p->getMessage());
} else {
consoleLog($this->res, CON_ERROR, $msg);
consoleLog($this->res, CON_ERROR, $p->getMessage());
}
$this->state = STATE_FRAMEWORK_WARNING;
return false; // return control to calling client if the query failed
}
if (is_null($results)) {
// should not get to this point!!!
$this->state = STATE_RESOURCE_ERROR_PDO;
consoleLog($this->res, CON_SYSTEM, INFO_SHOULD_NOT_SEE_THIS . COLON . $query);
return false;
}
try {
// we have source data - next step is to map it via the migrationMap
$mappedResults = $this->doDataMap($results);
if (count($mappedResults) != $currentCount) {
$this->droppedRecords += $currentCount - count($mappedResults);
if (!$this->objMigrate->insertField(MWH_NUM_RECS_DROPPED, $this->droppedRecords, 0))
consoleLog($this->res, CON_ERROR, sprintf(ERROR_DATA_ADD_FAIL, MWH_NUM_RECS_DROPPED, $this->objMigrate->class));
}
// at this point, we've successfully mapped the data... if we're not in test mode, write the data
if ($this->testMode) {
// save the test records and break out of the loop processing
$widget->addData($mappedResults);
break;
} else {
$widget->_createRecord($mappedResults, DATA_MIGR);
if (!$widget->status) {
// todo -- pull data from the bulk-write object and compare count of records v. records written
$this->errorStack[] = ERROR_NOSQL_BC;
return false;
}
$widget->removeData(); // widget data successfully migrated (this batch) so reset to null
// branch processing here with functions depending on destination schema (CORE-852)
switch ($this->destinationSchema) {
case STRING_MONGO :
if (!$this->processMongoResults($widget, $mappedResults)) return false;
break;
case STRING_MYSQL :
if (!$this->processPDOResults($widget, $mappedResults)) return false;
$widget->recordsDropped = 0;
$widget->recordsInserted = 0;
break;
default:
// schema was pre-validated -- should never exec this code
$msg = ERROR_SCHEMA_NOT_SUPPORTED . COLON . $this->destinationSchema;
consoleLog($this->res, CON_ERROR, $msg);
$this->errorStack[] = $msg;
return false;
break;
}
}
// update/save the current migration record if the recordsMigrated < recordsInSource
if ($this->objMigrate->getColumn(MWH_NUM_RECS_MOVED) <= $this->objMigrate->getColumn(MWH_NUM_RECS_SOURCE)) {
if (!$this->updateMigrationMetaData()) return false;
}
} catch (Throwable $t) {
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
$this->errorStack[] = $msg;
$this->errorStack[] = $t->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
$this->logger->warn($t->getMessage());
} else {
consoleLog($this->res, CON_ERROR, $msg);
consoleLog($this->res, CON_ERROR, $t->getMessage());
}
$this->state = STATE_FRAMEWORK_WARNING;
return false; // return control to calling client if the query failed
}
// console log progress-status message
$msg = sprintf(INFO_MIGRATION_RECORDS_MOVED,
($this->brokerData[MWH_SOURCE_SCHEMA] . COLON . $this->brokerData[MWH_SOURCE_TABLE]),
($this->brokerData[MWH_DEST_SCHEMA] . COLON . $this->brokerData[MWH_DEST_TABLE]),
$this->objMigrate->getColumn(MWH_NUM_RECS_MOVED), $this->recordsInSourceTable);
consoleLog($this->res, CON_SYSTEM, $msg);
} // end WHILE loop for processing migration records
// generate the migration report which is stored as a migration class member based on if the
// migration request originated from the migration webApp or from the cli
if (!$this->isWebRequest)
$this->generateEventReport();
else
$this->generateHTMLEventReport();
if (empty($this->migrationReport)) {
$this->errorStack[] = ERROR_MIGRATION_REPORT;
$this->migrationReport = ERROR_MIGRATION_REPORT;
$this->objMigrate[MWH_REPORT] = ERROR_MIGRATION_REPORT;
}
// update the status of $this->objMigrate data to completed, dateCompleted to now(), if not in test mode
if (!$this->testMode) {
try {
if (!$this->updateMigrationMetaData(STATUS_COMPLETE)) return false;
} catch (TypeError $t) {
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_TYPE_EXCEPTION . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_FRAMEWORK_WARNING;
return false; // return control to calling client if the query failed
}
}
} else {
// source schema is not supported
$msg = sprintf(ERROR_SCHEMA_NOT_SUPPORTED, $this->schema);
$this->errorStack[] = $msg;
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_SCHEMA_ERROR;
return false;
}
return true;
}
/**
* generateHTMLEventReport() -- private method
*
* This method is called at the end of a migration event, if the migration event was launched from the new
* (and totally awesome) webUI migration app.
*
* The methods requires no input parameters and returns type void to the calling client
*
* The methods builds the migration report in HTML and assigns the completed report the the class member
* variable: $migrationReport.
*
* @author mike@givingassistant.org
* @version 1.0
*
* HISTORY:
* ========
* 10-05-18 mks DB-43: original coding
*
*/
private function generateHTMLEventReport(): void
{
$recPasses = 0;
$rpf = 'Max Recs Per Fetch: 0';
$eos = '<br>';
$this->migrateStatus = true;
$dropPartials = (isset($this->brokerData[MIGRATION_FILTER_PARTIALS]) and $this->brokerData[MIGRATION_FILTER_PARTIALS] == 1) ? 'dropped' : 'kept';
$softDeletes = (isset($this->brokerData[MIGRATION_FILTER_SOFT_DELETES]) and $this->brokerData[MIGRATION_FILTER_SOFT_DELETES] == 1) ? ON : OFF;
// start generating the HTML report
//
// first, generate the generic header dependent on test mode
$report = '<div class="modal-header">' . $eos;
$report .= '<h4 class="modal-title w-100" id="resultsModalLabel">Migration Results</h4>' . $eos;
$report .= '<button type="button" class="close" data-dismiss="modal" aria-label="Close">' . $eos;
$report .= '<span aria-hidden="true">&times;</span></button>' . $eos;
$report .= '</div>' . $eos;
$report .= '<div class="modal-body">' . $eos;
$report .= '<p class="text-danger">' . $eos;
//next, check to see if we're in test mode and, if so, generate the test mode header
if ($this->testMode) {
$report .= '<strong>Test Mode Active</strong></p>';
$report .= '<i class="fa fa-minus text-muted" aria-hidden="true">';
$report .= '<span class="text-muted">&nbsp;&nbsp;No data was deleted from the source table</span></i>' . $eos;
$report .= '<i class="fa fa-minus text-muted" aria-hidden="true">';
$report .= '<span class="text-muted">&nbsp;&nbsp;No data was written to the destination table</span></i><hr>' . $eos;
}
$report .= 'Record Partials are <strong>' . $dropPartials . '</strong>' . $eos;
$report .= 'Soft Delete Records <strong>' . $softDeletes . '</strong>' . $eos;
$report .= 'Source Repository: <strong>' . $this->config[$this->brokerData[MWH_SOURCE_SCHEMA]][STRING_HOST] . '</strong>' . $eos;
$report .= 'Source Schema: <strong>' . $this->brokerData[MWH_SOURCE_SCHEMA] . '</strong>' . $eos;
$report .= 'Source Table: <strong>' . $this->brokerData[MWH_SOURCE_TABLE] . '</strong>' . $eos;
$report .= 'Fetch Query: <strong>' . $this->query . '</strong>' . $eos;
$report .= 'Field Coverage: <strong>' . $this->coverage . '%' . '</strong>' . $eos;
$report .= 'Destination Repo: <strong>' . $this->widget->dbService . '</strong>' . $eos;
$report .= 'Destination Schema: <strong>' . $this->brokerData[MWH_DEST_SCHEMA] . '</strong>' . $eos;
$report .= 'Destination Table: <strong>' . $this->widget->collectionName . '</strong>' . $eos;
$report .= 'Records In Source: <strong>' . $this->recordsInSourceTable . '</strong>' . $eos;
$report .= '<strong>' . $rpf . '</strong>' . $eos;
if ($recPasses < 1) $recPasses = 1;
$report .= 'Est # Fetches: <strong>' . $recPasses . '</strong>' . $eos;
$report .= 'Num Recs Dropped: <strong>' . $this->droppedRecords . '</strong>' . $eos;
$report .= 'Num Records Moved: <strong>' . $this->recordCount . '</strong><hr>' . $eos;
if ($this->testMode) {
$report .= '<span class="text-danger">Records fetched for quality evaluation (0 Records actually moved)</span>' . $eos;
$report .= '<span class="text-muted"><i class="fa fa-minus text-muted" aria-hidden="true">';
$report .= '&nbsp;&nbsp;Ensure that record mapping is correct...</i></span>' . $eos;
$report .= '<span class="text-muted"><i class="fa fa-minus text-muted" aria-hidden="true">';
$report .= '&nbsp;&nbsp;Ensure that all fields were mapped...</i></span>' . $eos;
$report .= '<span class="text-muted"><i class="fa fa-minus text-muted" aria-hidden="true">';
$report .= '&nbsp;&nbsp;Were data types properly maintained...?</i></span>' . $eos;
$report .= '<span class="text-muted"><i class="fa fa-minus text-muted" aria-hidden="true">';
$report .= '&nbsp;&nbsp;Validate injected fields (timestamps, GUIDs)...</i></span>' . $eos;
$report .= $eos . $eos . '<pre>' . print_r($this->widget->getData(), true) . '</pre>' . $eos;
}
$report .= '</div>' . $eos; // end modal-body div
$report .= '<div class="modal-footer">' . $eos;
$report .= '<button type="button" class="btn btn-secondary btn-sm" data-dismiss="modal">Close</button>' . $eos;
$report .= '</div>' . $eos;
$this->migrationReport = $report;
}
/**
* generateEventReport() -- private method
*
* This method generates the migration report that's returned to the calling client as the success data payload
* in the broker event.
*
* We're going to actually publish a longer report in test mode since we're dumping all the testMode variables
* that were defined at the top of the file as well as var-dumping the 10 records that were pulled from the
* source and converted to Namaste records according to the data template.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
*
* HISTORY:
* ========
* 02-10-18 mks _INF-139: original coding completed
* 05-16-18 mks _INF-188: updated for warehousing
*
*/
private function generateEventReport(): void
{
$recPasses = 0;
$rpf = 'Max Recs Per Fetch: 0';
$this->migrateStatus = true;
$eos = (isset($_SERVER['HTTP_USER_AGENT'])) ? '<br>' : PHP_EOL;
$dropPartials = (isset($this->brokerData[MIGRATION_FILTER_PARTIALS]) and $this->brokerData[MIGRATION_FILTER_PARTIALS] == 1) ? 'dropped' : 'kept';
$softDeletes = (isset($this->brokerData[MIGRATION_FILTER_SOFT_DELETES]) and $this->brokerData[MIGRATION_FILTER_SOFT_DELETES] == 1) ? ON : OFF;
$report = (($this->testMode) ? $this->TestHeader : '') . $eos . $eos;
// todo -- option for html generation - for now, just return a text-only report CORE-860
// todo -- add the migration report to the migration record CORE-861
// $html = ($eos == PHP_EOL) ? false : true;
$report .= $this->eventType . ' Report' . $eos . '================' . $eos;
if ($this->eventType == STRING_MIGRATION) {
$report .= 'Record Partials are ' . $dropPartials . $eos;
$report .= 'Soft Delete Records ' . $softDeletes . $eos;
$report .= 'Source Repository: ' . $this->config[$this->brokerData[MIGRATION_SOURCE_SCHEMA]][STRING_HOST] . $eos;
$report .= 'Source Schema: ' . $this->brokerData[MWH_SOURCE_SCHEMA] . $eos;
$report .= 'Source Table: ' . $this->brokerData[MWH_SOURCE_TABLE] . $eos;
$report .= 'Fetch Query: ' . $this->query . $eos;
$report .= 'Field Coverage: ' . $this->coverage . '%' . $eos;
$report .= 'Destination Repo: ' . $this->widget->dbService . $eos;
$report .= 'Destination Schema: ' . $this->brokerData[MWH_DEST_SCHEMA] . $eos;
$report .= 'Destination Table: ' . $this->widget->collectionName . $eos;
} else {
if ($this->remoteSource) {
$report .= 'Source Repository: ' . $this->whSourceURI . $eos;
$report .= 'Source Schema: ' . $this->objWarehouse->template->schema . $eos;
$report .= 'Source Table: ' . $this->brokerData[WH_REMOTE_TABLE] . $eos;
$report .= 'Fetch Query: ' . $this->whereClause . $eos;
$recPasses = intval(floor($this->recordsInSourceTable / $this->objWarehouseMeta->getRecordLimit()));
$rpf = 'Max Recs Per Fetch: ' . $this->objWarehouseMeta->getRecordLimit() . $eos;
} else {
$recPasses = intval(floor($this->recordsInSourceTable / $this->recordLimit));
$report .= 'Source Repository: ' . $this->widget->template->service . $eos;
$report .= 'Source Schema: ' . $this->widget->template->schema . $eos;
$report .= 'Source Table: ' . $this->widget->template->collection . $this->widget->template->extension . $eos;
$report .= 'Fetch Query: ' . $this->widget->strQuery . $eos;
$report .= 'Query Variables: ' . json_encode($this->widget->queryVariables) . $eos;
$rpf = 'Max Recs Per Fetch: ' . $this->recordLimit . $eos;
}
$report .= 'Destination Repo: ' . $this->objWarehouse->dbService . $eos;
$report .= 'Destination Schema: ' . $this->objWarehouse->schema . $eos;
$report .= 'Destination Table: ' . $this->objWarehouse->collectionName . $this->objWarehouse->ext . $eos;
}
$report .= 'Records In Source: ' . $this->recordsInSourceTable . $eos;
$report .= $rpf . $eos;
if ($recPasses < 1) $recPasses = 1;
$report .= 'Est # Fetches: ' . $recPasses . $eos;
$report .= 'Num Recs Dropped: ' . $this->droppedRecords . $eos;
$report .= 'Num Records Moved: ' . $this->recordCount; // $eos added below
try {
if ($this->testMode and $this->eventType == STRING_MIGRATION) {
$report .= ' records fetched for quality evaluation (0 Records actually moved)';
$report .= $eos . $this->TestBody;
$report .= $eos . $eos . var_export($this->widget->getData(), true) . $eos;
$report .= $eos . $eos . $this->TestFooter . $eos . $eos;
} elseif ($this->testMode and $this->eventType == STRING_WAREHOUSING) {
$report .= $eos . $this->TestBodyWH;
$report .= $eos . $eos . var_export($this->objWarehouseMeta->getData(), true) . $eos;
$report .= $eos . $eos . $this->TestFooterWH . $eos . $eos;
}
// todo -- read the comments in the constructor for items to add to the report like "last record written"
$report .= $eos;
$this->migrationReport = $report;
// we update the meta record in the calling method if this is a migration event
if ($this->eventType == STRING_WAREHOUSING) {
if (!$this->updateWHMeta([MWH_REPORT => $report])) {
$error = ERROR_MDB_SYS_EVENT_SAVE . COLON . MWH_REPORT;
$this->errorStack[] = $error;
if (isset($this->logger) and $this->logger->available)
$this->logger->error($error);
else
consoleLog($this->res, CON_ERROR, $error);
}
}
} catch (Throwable $t) {
$msg = basename(__METHOD__) . AT . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
$this->errorStack[] = $msg;
$this->errorStack[] = $t->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
$this->logger->warn($t->getMessage());
} else {
consoleLog($this->res, CON_ERROR, $msg);
consoleLog($this->res, CON_ERROR, $t->getMessage());
}
$this->state = STATE_FRAMEWORK_WARNING;
}
}
/**
* updateMigrationMetaData() -- private method
*
* This method has an optional input parameter, that is a valid STATUS field - use this parameter only when you
* want to update the status of the migration record. (CLOSED) Note that a status change request will only
* be recognized on a migration-record update; the status will be ignored when creating a new migration record.
*
* The method returns a boolean value indicating if the migration record was successfully updated or not - saved
* to the Admin service mongo collection.
*
* The first thing we do is pull in the current mig data object and look for a token - if one does not exist, then
* we'll assume this is a new-record creation and we'll call the admin-out broker's migrate-create event. On
* successful return from that event, we'll import the GUID and the dateCreated strings into the current data
* so that subsequent calls are treated as an update event.
*
* If this is an update event, parse the status, if provided, and update the relevant fields.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @param string $_status
* @return bool
*
*
* HISTORY:
* ========
* 02-14-18 mks INF-139: original coding
* 09-04-18 mks DB-48: Added migration report to the record data
*
*/
private function updateMigrationMetaData(string $_status = ''): bool
{
$updateStatus = false;
$response = array();
try {
// get the migration data into an stand-alone array
if (is_null($data = $this->objMigrate->getData())) return false;
} catch (TypeError $t) {
$msg = ERROR_TYPE_EXCEPTION . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
if (count($data) != 1) {
$msg = sprintf(ERROR_DATA_RECORD_COUNT, 1, count($data));
$this->errorStack[] = $msg;
if (gasConfig::$settings[CONFIG_DEBUG]) consoleLog($this->res, CON_ERROR, $msg);
return false;
}
// todo -- check the count of $data before assigning it, perhaps also validate that the key == 0
if (!array_key_exists(DB_TOKEN, $data[0])) {
// no GUID exists in the record => save the data
$request = [
BROKER_REQUEST => BROKER_REQUEST_ADMIN_MWH_EVENT_CREATE,
BROKER_DATA => $data,
BROKER_META_DATA => [
META_TEMPLATE => TEMPLATE_CLASS_MIGRATIONS, // todo -- remove this requirement CORE-863
META_CLIENT => CLIENT_SYSTEM
]
];
try {
if (!$this->publishMigrationEvent($request, $response)) return false;
// if we successfully created the new migration record, we need to inject the GUID and the createdDate
@$this->objMigrate->insertField(DB_TOKEN, $response[PAYLOAD_RESULTS][0][DB_TOKEN]);
@$this->objMigrate->insertField(DB_CREATED, $response[PAYLOAD_RESULTS][0][DB_CREATED]);
} catch (Throwable $t) {
$msg = basename(__METHOD__) . AT . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
$this->errorStack[] = $msg;
$this->errorStack[] = $t->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
$this->logger->warn($t->getMessage());
} else {
consoleLog($this->res, CON_ERROR, $msg);
consoleLog($this->res, CON_ERROR, $t->getMessage());
}
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
} else {
// pull the data out of the indexed array and manipulate directly, strictly for cleaner code
$data = $data[0];
$updateData = [];
try {
// GUID exists in the record - update the status, if a value was passed to this method
if (!empty($_status)) {
switch ($_status) {
case STATUS_IN_PROGRESS :
case STATUS_COMPLETE :
@$this->objMigrate->insertField(DB_STATUS, $_status);
$updateData[DB_STATUS] = $_status;
$updateData[MWH_REPORT] = $this->migrationReport;
$updateStatus = true;
break;
default :
$msg = ERROR_MIGRATION_STATUS_INV . $_status;
$this->errorStack[] = $msg;
$this->state = STATE_STATUS;
return false;
break;
}
}
if ($updateStatus) {
$data[DB_STATUS] = $_status;
if ($_status == STATUS_COMPLETE) {
$ts = time();
$updateData[MWH_DATE_COMPLETED] = $ts;
$this->objMigrate->insertField(MWH_DATE_COMPLETED, $ts);
}
}
if (!is_null($this->objMigrate->getColumn(MWH_NUM_RECS_DROPPED))) {
$updateData[MWH_NUM_RECS_DROPPED] = $this->objMigrate->getColumn(MWH_NUM_RECS_DROPPED);
}
$updateData[MWH_NUM_RECS_MOVED] = $this->objMigrate->getColumn(MWH_NUM_RECS_MOVED);
$ts = time();
$updateData[DB_ACCESSED] = $ts;
$this->objMigrate->insertField(DB_ACCESSED, $ts);
// build the broker payload and submit
$query = [DB_TOKEN => [OPERAND_NULL => [OPERATOR_EQ => [$this->objMigrate->getColumn(DB_TOKEN)]]]];
$request = [
BROKER_REQUEST => BROKER_REQUEST_ADMIN_MWH_EVENT_UPDATE,
BROKER_DATA => [STRING_QUERY_DATA => $query, STRING_UPDATE_DATA => $updateData],
BROKER_META_DATA => [META_TEMPLATE => TEMPLATE_CLASS_MIGRATIONS, META_CLIENT => CLIENT_SYSTEM]
];
if (!$this->publishMigrationEvent($request)) return false;
} catch (Throwable $t) {
$msg = basename(__METHOD__) . AT . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
$this->errorStack[] = $msg;
$this->errorStack[] = $t->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
$this->logger->warn($t->getMessage());
} else {
consoleLog($this->res, CON_ERROR, $msg);
consoleLog($this->res, CON_ERROR, $t->getMessage());
}
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
}
return true;
}
/**
* fetchMigrationMetaData() -- private method
*
* This method is responsible to fetching the migration meta data from the Namaste admin service. This class
* object, as a review, is really a data manager as opposed to a data-class. The admin collection in mongo for
* migration meta data is maintained by this class through it's member instantiation via normal channels, e.g.:
* the gacFactory, and assigns the migration class object to the class member: $objMigrate.
*
* When this method is called, it's assumed that the $objMigrate class object has already been instantiated. That
* the $objMigrate class has data is not guaranteed. It's also assumed that the member $brokerData has been
* populated with the request payload (query) data. If both are empty, return a Boolean(false) and set the
* state of this class to STATE_DATA_ERROR.
*
* First off, when we enter the method, we're going to instantiate a local member to as a brokerClient class
* object which will handle our request to the Namaste admin service for fetching the migration meta data. If
* this object fails to instantiate, we'll return immediately. We'll return a Boolean(false) value to the calling
* client and set the STATE of this class to a framework warning.
*
* Next, we're going to check to see if the current instantiation of $objMigrate already has data or not. If it
* does, the inference is that this is a migration in-progress. If so, we want to pull the already generated
* token from the data payload, and use that as the basis for the query to fetch the data. This covers the
* (hopefully unlikely) scenario recovering from a previous failure by loading the last save points.
*
* If the $objMigrate class data is empty, we're going to build the query from the broker payload data sent
* by the client. For first pass in a migration, we're expecting that no data will be returned.
*
* If the payload returned by the broker is out-of-bounds with respect to processing, (edge-case), then we'll
* set the class state to reflect an AMQP error.
*
* If data is returned, we need to evaluate the data to ensure that this is a current, or active, migration.
* In other words, the migration record has the following potential values for the status field:
*
* -- COMPLETE: a migration for this data was previously and successfully completed
* -- IN-PROGRESS: this is an active, on-going, migration
*
* Once a migration is in-progress, it remains in that state until completed. Migrations that have failed
* completely should only occur during development and all impacted data would have to be manually removed
* from the migration and target tables.
*
* If data is not returned, but the query was successful, we should have STATE_NOT_FOUND returned in the
* broker event call payload status. If this is the case, we'll still return a Boolean(true) to the calling
* client, but we'll set the state of the $objMigrate object to STATE_NOT_FOUND and this MUST be tested for
* by the calling client when Boolean(true) values are returned.
*
* If data was returned, but we're unable to load the data into the $objMigrate class, set the class state to
* DATA_TYPE_ERROR and return a Boolean(false).
*
* If data was returned, check the row count - if the row count exceeds one, return an error.
*
* If data was returned, check the status field - if the status is not IN-PROGRESS, return an error.
*
* Finally, if we reach the end of the method, post-data-processing, where we shouldn't ever hit (edge case), then
* we'll return a STATE_FAIL.
*
* Possible Return States:
* -----------------------
* STATE_SUCCESS -- desired, optimal state
* STATE_FRAMEWORK_WARNING -- unable to instantiate a broker client class
* STATE_DATA_ERROR -- missing both brokerData and the migration class object data
* -- more than one migration row was returned from the generic search query
* -- a record was returned without a count of the # records already moved, or the count of
* records moved is less than 1, or the count is greater than the previously-recorded
* total number of records in the source table
* STATE_AMQP_ERROR -- we received bad data, or a false-status payload from the admin broker
* STATE_DATA_TYPE_ERROR -- we were unable to load the returned data into the $objMigrate $data member
* STATE_NOT_FOUND -- the query was successful, but no migration record exists for the request
* STATE_STATUS -- the query returned a migration record not in an IN-PROGRESS status
*
* The method is considered a success under these, and only these conditions:
*
* return TRUE and STATE_SUCCESS
* return TRUE and STATE_NOT_FOUND
*
* Anything else is, and should be considered to be, an error.
*
* In all cases, when the method returns false, the errorStack for the class is populated with a relevant message.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @return bool
*
* HISTORY:
* ========
* 02-09-18 mks _INF-139: original code completed
*
*/
private function fetchMigrationMetaData(): bool
{
$this->state = STATE_SUCCESS;
if (is_null($bc = $this->getRabbitClient())) return false;
// get the current migration data, if it exists
$data = $this->objMigrate->getData();
if (!is_null($data)) $data = $data[0]; // should only be one record if there's data
// build the namaste query depending on what's in the data payload
if (!is_null($data) and array_key_exists((DB_TOKEN . $this->objMigrate->ext), $data)) {
// we're going to query by GUID
$query = [ DB_TOKEN => [ OPERAND_NULL => [ OPERATOR_EQ => [$data[(DB_TOKEN . $this->objMigrate->ext)]] ] ] ];
$sort = [ $this->migrationSortKey => STRING_SORT_ASC ];
} elseif (!is_null($data) and !empty($this->brokerData)) {
// we're going to fetch by brokerData (first pass) defined keys
$query = [ MWH_SOURCE_SCHEMA => [ OPERAND_NULL => [ OPERATOR_EQ => [ $this->objMigrate->getColumn(MWH_SOURCE_SCHEMA . $this->objMigrate->ext)]]],
MWH_SOURCE_TABLE => [ OPERAND_NULL => [ OPERATOR_EQ => [ $this->objMigrate->getColumn(MWH_SOURCE_TABLE . $this->objMigrate->ext)]]],
MWH_DEST_SCHEMA => [ OPERAND_NULL => [ OPERATOR_EQ => [ $this->objMigrate->getColumn(MWH_DEST_SCHEMA . $this->objMigrate->ext)]]],
MWH_DEST_TABLE => [ OPERAND_NULL => [ OPERATOR_EQ => [ $this->objMigrate->getColumn(MWH_DEST_TABLE . $this->objMigrate->ext)]]],
OPERAND_AND => null
];
$sort = [ $this->migrationSortKey => STRING_SORT_ASC ];
} else {
// we have neither brokerData in the current object, or migration data in the widget
$msg = ERROR_DATA_MISSING_ARRAY . STRING_MIGRATION_DATA;
$this->errorStack[] = $msg;
$this->state = STATE_DATA_ERROR;
return false;
}
// build the request payload
$request = [
BROKER_REQUEST => BROKER_REQUEST_ADMIN_MWH_EVENT_FETCH,
BROKER_DATA => [
STRING_QUERY_DATA => $query,
STRING_SORT_DATA => $sort,
],
BROKER_META_DATA => [
META_TEMPLATE => TEMPLATE_CLASS_MIGRATIONS,
META_CLIENT => CLIENT_SYSTEM
]
];
$payload = gzcompress(json_encode($request));
// submit the request and process results
$response = $bc->call($payload);
// release the rabbit-client object
if (is_object($bc)) $bc->__destruct();
unset($bc);
// process the response returned from the adminOut broker
$response = json_decode(gzuncompress($response), true);
if (!is_array($response) or !$response[PAYLOAD_STATUS]) {
$msg = ERROR_BROKER_RESPONSE_BAD;
$this->errorStack[] = $msg;
if (is_array($response[PAYLOAD_DIAGNOSTICS])) {
$this->errorStack = array_merge($this->errorStack, $response[PAYLOAD_DIAGNOSTICS]);
}
$this->state = STATE_AMQP_ERROR;
return false;
} elseif ($response[PAYLOAD_STATUS] and $response[PAYLOAD_STATE] == STATE_SUCCESS) {
// todo -- check for the number of records and check for the status!
if (!$this->objMigrate->addData($response[PAYLOAD_RESULTS])) {
if (is_array($this->objMigrate->eventMessages)) {
$this->errorStack = array_merge($this->objMigrate->eventMessages, $this->errorStack);
}
$this->state = STATE_DATA_TYPE_ERROR;
return false;
}
if ($this->objMigrate->count > 1) {
$this->state = STATE_DATA_ERROR;
$this->errorStack[] = sprintf(ERROR_DATA_ARRAY_COUNT, 1, MONGO_STRING_COUNT, $this->objMigrate->count);
return false;
}
if ($this->objMigrate->getColumn(DB_STATUS) != STATUS_IN_PROGRESS) {
$this->errorStack[] = sprintf(ERROR_INVALID_STATUS, STATUS_IN_PROGRESS, $this->objMigrate->getColumn(DB_STATUS));
$this->state = STATE_STATUS;
return false;
}
// populate the skip member
$this->recordCount = $this->objMigrate->getColumn(MWH_NUM_RECS_MOVED) + $this->objMigrate->getColumn(MWH_NUM_RECS_DROPPED);
// $this->skip = $this->objMigrate->getColumn(MIGRATIONS_NUM_RECS_MOVED) + $this->objMigrate->getColumn(MIGRATIONS_NUM_RECS_DROPPED);
if (is_null($this->recordCount) or $this->recordCount < 0 or $this->recordCount > $this->recordsInSourceTable) {
// if (is_null($this->skip) or $this->skip < 0 or $this->skip > $this->recordsInSourceTable) {
$this->errorStack[] = ERROR_DATA_INCONSISTENT_COUNT;
$this->state = STATE_DATA_ERROR;
return false;
}
return true;
} elseif ($response[PAYLOAD_STATUS] and $response[PAYLOAD_STATE] == STATE_NOT_FOUND) {
$this->errorStack[] = INFO_QUERY_RETURNED_NO_DATA;
$this->objMigrate->state = STATE_NOT_FOUND;
$this->recordCount = 0;
$this->state = STATE_SUCCESS;
// $this->skip = 0;
return true;
}
// we should never execute past this point but, if we do, return an error...
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_UNKNOWN_EVENT;
$this->state = STATE_FAIL;
return false;
}
/**
* doDataMap() -- private method
*
* This method is a generic method meant to perform the mapping between the source and destination schemas.
*
* The method requires an input parameter which should be an array of records fetched from the source. For every
* record in source input array, do the following:
*
* 1. check to see if we're filtering out soft-deleted records and, if the current record's status field (as
* defined in the template migration member is set to the flagged value, then we're skip this record.
* 2. loop through each key->value pair in the migration map (field list) and, for Namaste reserved fields,
* inject the requisite data or, in the case of the time fields, force a conversion from MYSQL date stamp.
*
* At the end of processing the migration map, where each migrated field (value) was copied to a new key, append
* that record to the end of the result set.
*
* At the end of processing of the input records, return the result set to the calling client.
*
* Note that this method does not evaluate or validate any of the data returned from the source other than the
* conversion of date strings to epoch times and these values are copied into the new record as NEW values.
* (e.g.: the old values are preserved.)
*
* todo -- case conversion? e.g.: for incoming guids, should these be converted to UC?
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @param array $_sourceData
* @return array|null
*
*
* HISTORY:
* ========
* 02-02-18 mks _INF-139: original coding
*
*/
private function doDataMap(array $_sourceData): ?array
{
$resultSet = null;
/** @var gacMongoDB $widget */
$widget = $this->widget;
$ext = $widget->ext;
$meta = $widget->getMetaDataPayload();
$validStatusValues = [
STATUS_PENDING, STATUS_ACTIVE, STATUS_DELETED, STATUS_SUSPENDED, STATUS_EXPIRED, STATUS_REVOKED,
STATUS_LOCKED, STATUS_COMPLETE, STATUS_IN_PROGRESS, STATUS_ABANDONED, STATUS_INVALID, STATUS_VALID,
STATUS_REJECTED, STATUS_APPROVED, STATUS_FAILED, STATUS_CLOSED, STATUS_SENT, STATUS_REQUEST_SENT,
STATUS_INACTIVE, STATUS_NEW, STATUS_FAKE
];
foreach ($_sourceData as $record) {
$newRecord = null;
$skipRecord = false;
// skip this record if we're not importing soft-deleted records and current record is soft-deleted
if ($this->filterSoftDeletes and $record[key($this->migrationStatusKV)] == $this->migrationStatusKV[key($this->migrationStatusKV)]) {
$this->droppedRecords++;
continue;
}
// otherwise spin through the migration map fields and transfer/assign data
foreach ($this->migrationMap as $key => $value) {
switch ($key) {
case DB_CREATED :
case DB_ACCESSED :
$newRecord[$key . $ext] = (is_integer($record[$value])) ? $record[$value] : date('Y-m-d H:i:s', strtotime($record[$value]));
break;
case DB_EVENT_GUID :
if (!is_null($meta) and isset($meta[DB_EVENT_GUID])) {
$newRecord[$key . $ext] = $meta[DB_EVENT_GUID];
}
break;
case DB_TOKEN :
// do nothing -- prematurely inserting a guid will cause _createRecord() to fail
break;
case DB_STATUS :
if (!is_null($value)) {
if (in_array(strtoupper($record[$value]), $validStatusValues)) {
$newRecord[$key . $ext] = strtoupper($record[$value]);
} else {
$newRecord[$key . $ext] = $value;
}
} else {
$newRecord[DB_STATUS . $ext] = STATUS_ACTIVE;
}
break;
default :
if (!is_null($value)) {
if (!is_null($record[$value])) {
$newRecord[$key . $ext] = $record[$value];
} elseif (is_null($record[$value])) {
$newRecord[$key . $ext] = null;
} elseif (1 == $this->brokerData[MIGRATION_FILTER_PARTIALS]) {
$skipRecord = true;
$this->droppedRecords++;
}
}
break;
}
}
if (!is_null($newRecord) and !$skipRecord) {
$resultSet[] = $newRecord;
}
}
return $resultSet;
}
/**
* calculateCoverage() -- private method
*
* This method examines the source fields (schema) and compares it to the migration map in the destination template
* and, from that, calculates coverage as a percentage of each record (by column name) for the migration.
*
* Any fields that are dropped are done so solely by the fact that the field(s) do not appear in the target schema's
* migration map. Fields that are dropped are stored in the local member list: $omittedFields. Fields that are
* not dropped are stored in the local member list: $migratedFields.
*
* The coverage (percent) along with the field lists are used in the migration report that will be generated and
* returned to the requesting client.
*
* The method has no input parameters -- all non-derived values are culled from class member storage.
* The method returns a Boolean value indicating that the coverage was successfully calculated (returns a
* calculated percentage greater-than zero and less-than or equal-to 100.
*
* If the method returns a Boolean(false), you can assume that the method failed to calculate coverage and that
* further investigation as to why is warranted.
*
* @author mike@givingassistant.org
* @version 1.0
*
* @return bool
*
*
* HISTORY:
* ========
* 01-31-18 mks _INF-139: original coding
*
*/
private function calculateCoverage(): bool
{
$droppedCount = 0;
// loop through the migration map and look and each SOURCE field name
if ($this->schema == STRING_MYSQL) {
$totalCount = count($this->mysqlFieldList);
foreach ($this->mysqlFieldList as $fieldName) {
if (in_array($fieldName, $this->migrationMap)) {
$this->migratedFields[] = $fieldName;
} elseif (!in_array($fieldName,$this->omittedFields)) {
$this->omittedFields[] = $fieldName;
$droppedCount++;
}
}
} elseif ($this->schema == STRING_MONGO) {
// todo -- code this
$totalCount = count($this->mongoSourceSchema);
} else {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_SCHEMA_NOT_SUPPORTED, $this->schema);
return false;
}
// calculate the migration (record-fields) coverage
if (!$droppedCount and is_array($this->migratedFields) and count($this->migratedFields) == $totalCount) {
$this->coverage = 100;
} elseif ($droppedCount and count($this->migratedFields) < $totalCount) {
$this->coverage = intval(count($this->migratedFields) / $totalCount * 100);
} else {
$this->coverage = 0;
}
return boolval($this->coverage);
}
/**
* getNamasteObjects() -- private method
*
* This method, like the others, is invoked from the constructor method and requires the meta-data payload as
* a stored member variable.
*
* The method will instantiate two namaste class objects and assign both to different migrations-class member
* variables on success.
*
* The destination class is instantiated - whether or not this is a mongo or mysql class is handled by the
* template during instantiation.
*
* If the destination class fails to instantiate, then we return a false value immediately. Since the migration
* class error stack was passed to the gacFactory constructor, error messages are preserved.
*
* 2. The migration class is instantiated. This is a system-class that will keep track of the migration. As each
* "batch" of records is updated, the migration data is updated and saved providing us with a vector for recovery
* or roll-back of the migration request.
*
* If the migration class fails to instantiate, then we return a false value immediately. Otherwise, assuming that
* this is the first-iteration of this particular migration, we populate the migration class with initial data.
*
* If (to-do) we detect that this is resumption of a migration process, we'll need the migration guid in order
* to instantiate the migration classes $data member with the already-existing data.
*
* On successful instantiations of both classes, we'll return a Boolean(true) value to the calling client.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @return bool
*
*
* HISTORY:
* ========
* 01-30-18 mks _INF-139: original coding
* 03-22-18 mks CORE-852: removed a logic-dud
*
*/
private function getNamasteObjects(): bool
{
$retVal = false;
try {
// instantiate the destination object first
$tmpObj = new gacFactory($this->meta, FACTORY_EVENT_NEW_CLASS, '', $this->errorStack);
} catch (Throwable $t) {
$msg = basename(__METHOD__) . AT . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
$this->errorStack[] = $msg;
$this->errorStack[] = $t->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
$this->logger->warn($t->getMessage());
} else {
consoleLog($this->res, CON_ERROR, $msg);
consoleLog($this->res, CON_ERROR, $t->getMessage());
}
$this->state = STATE_CLASS_INSTANTIATION_ERROR;
return false;
}
// copy new Namaste data class if instantiation was successful
if ($tmpObj->status) {
// todo: validate that required fields are present in the template:
$this->widget = $tmpObj->widget;
$this->migrationMap = $tmpObj->widget->template->migrationMap;
$this->migrationSortKey = $tmpObj->widget->template->migrationSortKey;
$this->migrationStatusKV = $tmpObj->widget->template->migrationStatusKV;
$retVal = true;
}
if (is_object($tmpObj)) $tmpObj->__destruct();
unset($tmpObj);
// todo -- handle MIGRATION_RESUMPTION processing! (3/22/18: really? thought I tested this...)
if ($retVal) {
$retVal = false;
// build a fake meta for the migrations class importing the broker-event guid
$tmpMeta[META_TEMPLATE] = TEMPLATE_CLASS_MIGRATIONS;
// if the event guid is present, validate and copy
if (isset($_meta[META_EVENT_GUID]) and validateGUID($this->meta[META_EVENT_GUID])) {
$tmpMeta[META_EVENT_GUID] = $this->meta[META_EVENT_GUID];
}
try {
// instantiate the migration object
$tmpObj = new gacFactory($tmpMeta, FACTORY_EVENT_NEW_CLASS, '', $this->errorStack);
} catch (Throwable $t) {
$msg = basename(__METHOD__) . AT . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
$this->errorStack[] = $msg;
$this->errorStack[] = $t->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
$this->logger->warn($t->getMessage());
} else {
consoleLog($this->res, CON_ERROR, $msg);
consoleLog($this->res, CON_ERROR, $t->getMessage());
}
$this->state = STATE_CLASS_INSTANTIATION_ERROR;
return false;
}
// for a new migrate-class object, pre-populate the data prior to the first migration batch
if ($tmpObj->status) {
// validate that the template has the fields required for migration
if (!$this->validationTemplateMigrationData()) {
if (is_object($tmpObj)) $tmpObj->__destruct();
unset($tmpObj);
return false;
}
// extract the widget (class object) and pull data from the broker payload
/** @var gacMongoDB $widget */
$widget = $tmpObj->widget;
$data[MWH_SOURCE_TABLE] = $this->brokerData[MWH_SOURCE_TABLE];
$data[MWH_SOURCE_SCHEMA] = $this->brokerData[MWH_SOURCE_SCHEMA];
$data[MIGRATION_DEST_SCHEMA] = $this->brokerData[MIGRATION_DEST_SCHEMA];
$data[MWH_DEST_TABLE] = $this->brokerData[MWH_DEST_TABLE];
$data[MWH_NUM_RECS_SOURCE] = $this->recordsInSourceTable;
if (isset($this->meta[META_EVENT_GUID])) $data[META_EVENT_GUID] = $this->meta[META_EVENT_GUID];
$data[DB_STATUS] = STATUS_IN_PROGRESS;
$data[MWH_NUM_RECS_MOVED] = 0;
$data[MWH_NUM_RECS_DROPPED] = 0;
$data[MWH_DATE_STARTED] = time();
// let's see if a migrations-record for this move already exists in the db: (hint: migrations table
// is *always* stored in mongo...on the admin server!)
// validate and add the data to the widget
if ($widget->addData([$data])) {
// override the Namaste record limit with the migration record limit
try {
if ($widget->setRecordLimit($this->config[CONFIG_MIGRATION_RECORD_LIMIT])) {
// todo -- system event notification?
$msg = sprintf(INFO_RECORD_LIMIT_OVERRIDE, gasConfig::$settings[CONFIG_DATABASE][CONFIG_DATABASE_QUERY_RECORD_LIMIT], $this->config[CONFIG_MIGRATION_RECORD_LIMIT]);
consoleLog($this->res, CON_SUCCESS, $msg);
}
$this->recordLimit = $this->config[CONFIG_MIGRATION_RECORD_LIMIT];
} catch (Throwable $t) {
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
$this->errorStack[] = $msg;
$this->errorStack[] = $t->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
$this->logger->warn($t->getMessage());
} else {
consoleLog($this->res, CON_ERROR, $msg);
consoleLog($this->res, CON_ERROR, $t->getMessage());
}
return false;
}
$retVal = true;
$this->objMigrate = $widget;
}
if (is_object($widget)) $widget->__destruct();
unset($widget);
}
if (is_object($tmpObj)) $tmpObj->__destruct();
unset($tmpObj);
}
return $retVal;
}
/**
* getTableBonafides() -- private method
*
* This method requires no input parameters because all necessary data has been assigned to class members prior
* to the invocation of this method. And the method returns a Boolean value to indicate if the request
* completed successfully or not.
*
* Based on the current request schema, we create a query for the source table, named in the broker-event request,
* and we pass that generic query to the source db schema/engine.
*
* If the target schema is mySQL, then we're going to issue two queries:
* 1. Get the table schema using "desc table-name" syntax
* 2. Get the record count using "select count(*)" syntax
*
* If the target schema is mongo, the best we can do is get the record count.
*
* The return results are stored in a class member variable.
*
* All db queries are exception trapped and, if raised, will add error messages to the member's error stack and
* output copies of the error to the console log.
*
*
* @author mike@givingassistant.org
* @version 1.00
*
* @return bool
*
*
* HISTORY:
* ========
* 01-25-18 mks _INF-139: original coding
*
*/
private function getTableBonafides(): bool
{
$status = false;
$this->state = STATE_DB_ERROR;
$this->recordsInSourceTable = intval(0);
$table = $this->brokerData[MIGRATION_SOURCE_TABLE];
if ($this->schema == STRING_MYSQL) {
/** @var PDO $dbLink */
$dbLink = $this->resRemote;
// first, get the schema...
$query = 'DESC /* ' . basename(__FILE__) . COLON_NS . __LINE__ . ' */ ' . $table;
try {
foreach ($dbLink->query($query) as $row) {
$this->mysqlSourceSchema[] = $row;
$this->mysqlFieldList[] = $row[STRING_FIELD];
}
} catch (PDOException | Throwable $p) {
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_PDO_EXCEPTION . $query);
$this->errorStack[] = $msg;
$this->errorStack[] = $p->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
$this->logger->warn($p->getMessage());
} else {
consoleLog($this->res, CON_ERROR, $msg);
consoleLog($this->res, CON_ERROR, $p->getMessage());
}
$this->state = STATE_FRAMEWORK_WARNING;
return false; // return control to calling client if the query failed
}
// next, get the record count
$query = 'SELECT /* ' . basename(__FILE__) . COLON_NS . __LINE__ . ' */ count(*) AS rowCount ';
$query .= 'FROM ' . $table;
try {
// fetchColumn returns the value of a single column of a single row
$this->recordsInSourceTable = $dbLink->query($query)->fetchColumn();
$this->state = STATE_SUCCESS;
$status = true;
} catch (PDOException | Throwable $p) {
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_PDO_EXCEPTION, $query);
$this->errorStack[] = $msg;
$this->errorStack[] = $p->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
$this->logger->warn($p->getMessage());
} else {
consoleLog($this->res, CON_ERROR, $msg);
consoleLog($this->res, CON_ERROR, $p->getMessage());
}
$this->state = STATE_FRAMEWORK_WARNING;
}
} elseif ($this->schema == STRING_MONGO) {
// get the mongo table count
/** @var MongoDB\Driver\Manager $dbLink */
$dbLink = $this->resRemote;
$db = $this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE];
try {
$command = new MongoDB\Driver\Command(["count" => $table, "query" => []]);
$this->recordsInSourceTable = (int) $dbLink->executeCommand($db, $command);
$this->state = STATE_SUCCESS;
$status = true;
} catch (MongoDB\Driver\Exception\InvalidArgumentException | MongoDB\Driver\Exception\Exception | Throwable $e) {
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MONGO_EXCEPTION . $e->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
}
}
return $status;
}
/**
* remoteConnect() -- private method
*
* This method takes no input parameters and returns a boolean value indicating success or failure of being able
* to connect to the remote source database.
*
* This method handles all supported schemas which currently are:
*
* -- mysql
* -- mongo
*
* The class members have pre-loaded the XML configuration which identifies the resource location and, from this
* configuration we attempt to build the correct resource connector and then attempt to make a connection to the
* remote service.
*
* All connection requests are exception wrapped and trapped and errors will be logged to both the class error
* stack and the console log.
*
* Remote connection information, regardless of schema-type, are stored in class member variables on success.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @return bool
*
* HISTORY:
* ========
* 01-25-18 mks _INF-139: original coding
*
*/
private function remoteConnect(): bool
{
$this->remoteAvailable = false;
// build the resource DSN
if ($this->brokerData[MIGRATION_SOURCE_SCHEMA] == STRING_MYSQL or $this->schema == STRING_MYSQL) {
$db = 'dbname=' . $this->config[CONFIG_SCHEMA_MYSQL][CONFIG_DATABASE];
$pdoAttributes = [
PDO::ATTR_EMULATE_PREPARES => false,
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC
];
$host = $this->config[CONFIG_SCHEMA_MYSQL][STRING_HOST];
$port = $this->config[CONFIG_SCHEMA_MYSQL][STRING_PORT];
$user = $this->config[CONFIG_SCHEMA_MYSQL][STRING_USER];
$pass = $this->config[CONFIG_SCHEMA_MYSQL][STRING_PASS];
// connect to remote mysql installation via PDO
$PDOConnectString = 'mysql:host=' . $host . COLON_NS . $port . SEMI . $db . SEMI . $this->config[CONFIG_SCHEMA_MYSQL][CONFIG_DATABASE_PDO_CHARSET];
if (gasConfig::$settings[CONFIG_DEBUG]) {
consoleLog($this->res, CON_DEBUG, 'PDO-Connection: ' . $PDOConnectString);
}
try {
$res = new PDO($PDOConnectString, $user, $pass, $pdoAttributes);
$this->resRemote =& $res;
$this->remoteAvailable = true;
$this->whSourceURI = $PDOConnectString;
} catch (PDOException | Throwable $e) {
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_PDO_CONNECT;
$this->errorStack[] = $msg;
$this->errorStack[] = $e->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->error($msg);
$this->logger->error($e->getMessage());
} else {
consoleLog($this->res, CON_ERROR, $msg);
consoleLog($this->res, CON_ERROR, $e->getMessage());
}
}
} elseif ($this->brokerData[MIGRATION_SOURCE_SCHEMA] == STRING_MONGO or $this->schema == STRING_MONGO) {
$mongoDSN = MONGO_DSN;
if (1 === $this->config[CONFIG_SCHEMA_MONGO][CONFIG_USER_SECURITY]) {
// todo -- evaluate the connectivity of the DSN with COLON_NS as opposed to COLON
$mongoDSN .= $this->config[CONFIG_SCHEMA_MONGO][STRING_USER] . COLON_NS;
$mongoDSN .= $this->config[CONFIG_SCHEMA_MONGO][STRING_PASS] . AT;
$authSource = $this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE_MONGODB_AUTH_SOURCE];
$mongoOptions[CONFIG_DATABASE_MONGODB_AUTH_SOURCE] = $authSource;
}
// get the heartbeat and readPreferences configuration
$mongoOptions[CONFIG_DATABASE_MONGODB_HB] = intval($this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE_MONGODB_HB]);
$mongoOptions[CONFIG_DATABASE_MONGODB_RP] = $this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE_MONGODB_RP];
// get the remote configuration: standAlone, replSet or shard?
switch ($this->config[CONFIG_SCHEMA_MONGO][CONFIG_INSTANCE_TYPE]) {
case CONFIG_INSTANCE_TYPE_INST :
// stand-alone instance
$mongoDSN .= $this->config[CONFIG_SCHEMA_MONGO][STRING_HOST] . COLON_NS;
$mongoDSN .= $this->config[CONFIG_SCHEMA_MONGO][STRING_PORT];
break;
case CONFIG_INSTANCE_TYPE_REPL :
// replication set
$mongoOptions[MONGO_REPL_SET] = $this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE_MONGODB_REPLSET_NAME];
foreach ($this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE_MONGODB_REPLSET_DSN][CONFIG_DATABASE_MONGODB_ADMIN_REPLSET_SET] as $node) {
$mongoDSN .= $node . COMMA_NS;
}
$mongoDSN = rtrim($mongoDSN, COMMA_NS);
break;
case CONFIG_INSTANCE_TYPE_SHARD :
foreach ($this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE_MONGODB_SHARDING_MONGOS_NODES] as $node) {
$mongoDSN .= $node . COMMA_NS;
}
$mongoDSN = rtrim($mongoDSN, COMMA_NS);
break;
}
// connect to the remote mongo service
try {
$res = new MongoDB\Driver\Manager($mongoDSN, $mongoOptions);
if (!is_object($res)) {
consoleLog($this->res, CON_ERROR, ERROR_MONGO_CONNECT);
consoleLog($this->res, CON_ERROR, basename(__FILE__) . COLON_NS . __LINE__ . COLON . $mongoDSN);
} else {
$this->resRemote =& $res;
$this->remoteAvailable = true;
$this->whSourceURI = $mongoDSN;
}
} catch (MongoDB\Driver\Exception\ConnectionException |
MongoDB\Driver\Exception\InvalidArgumentException |
MongoDB\Driver\Exception\RuntimeException |
MongoDB\Driver\Exception\Exception |
Throwable $e) {
$msg = ERROR_MONGO_EXCEPTION . $e->getMessage();
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
}
} else {
// todo -- error: unknown schema
$msg = sprintf(ERROR_MIGRATION_SCHEMA_UNKNOWN, $this->brokerData[MIGRATION_SOURCE_SCHEMA]);
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . $msg;
consoleLog($this->res, CON_ERROR, $msg);
return false;
}
return $this->remoteAvailable;
}
/**
* validateMigrationData -- private method
*
* This method is called from the class constructor and required one input parameter:
*
* The method validates the broker-data payload which was stored in a class variable in the constructor. If
* any errors are found, the class error-stack container is updated with the error message.
*
* Once all the validation checks have run (with the exception of the check of there being NO data which returns
* a Boolean(false) immediately), then we check the current error stack and return a false if it has any events.
*
* The broker-data container is not modified in this method.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @param array $_data
* @return bool
*
*
* HISTORY:
* ========
* 01-24-18 mks _INF-139: original coding
*
*/
private function validateMigrationData(array $_data): bool
{
if (!is_array($_data) or empty($_data)) {
$this->errorStack[] = ERROR_MIGRATION_DATA;
return false;
}
// validate the broker data - ensure the minimum data is present and that optional data is the correct type
if (!array_key_exists(MIGRATION_SOURCE_SCHEMA, $_data)) {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_SOURCE_SCHEMA;
}
if (!array_key_exists(MIGRATION_SOURCE_TABLE, $_data)) {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_DEST_TABLE;
}
if (!array_key_exists(MIGRATION_DEST_SCHEMA, $_data)) {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_DEST_SCHEMA;
}
if (!array_key_exists(MIGRATION_DEST_TABLE, $_data)) {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_DEST_TABLE;
}
if (array_key_exists(MIGRATION_FILTER_SOFT_DELETES, $_data) and !is_bool(boolval($_data[MIGRATION_FILTER_SOFT_DELETES]))) {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_MIGRATION_DATA_FIELD_TYPE, DATA_TYPE_BOOL, MIGRATION_FILTER_SOFT_DELETES, $_data[MIGRATION_FILTER_SOFT_DELETES]);
} elseif (array_key_exists(MIGRATION_FILTER_SOFT_DELETES, $_data) and true === boolval($_data[MIGRATION_FILTER_SOFT_DELETES])) {
$this->filterSoftDeletes = true;
}
// validate the source and destination schemas
if (!in_array($_data[MIGRATION_SOURCE_SCHEMA], $this->allowedSchemas)) {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_MIGRATION_SCHEMA_UNKNOWN, MIGRATION_SOURCE_SCHEMA);
}
if (!in_array($_data[MIGRATION_DEST_SCHEMA], $this->allowedSchemas)) {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_MIGRATION_SCHEMA_UNKNOWN, MIGRATION_DEST_SCHEMA);
}
// check for testMode
if (isset($_data[MIGRATION_TEST_MODE])) {
switch ($_data[MIGRATION_TEST_MODE]) {
case 1 :
case true :
$this->testMode = true;
// if we're missing the test record count in the XML, set the record limit to 10
if (!isset($this->config[CONFIG_MIGRATION_NUM_TEST_RECS]) or empty($this->config[CONFIG_MIGRATION_NUM_TEST_RECS])) {
$this->config[CONFIG_MIGRATION_NUM_TEST_RECS] = 10;
}
break;
case 0 :
case false :
$this->testMode = false;
break;
default :
$msg = sprintf(ERROR_MIGRATION_DATA_FIELD_TYPE, 'boolean', gettype($_data[MIGRATION_TEST_MODE]));
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . $msg;
break;
}
}
// return a boolean value indicating data validation success or failure
return (empty($this->errorStack));
}
/**
* validateTemplateMigrationData() -- private method
*
* This method is called from getNamasteObjects() right after the Namaste class template is successfully
* instantiated. The required template data has already been read-in and assigned to members within the
* current class.
*
* The method validates the three migration parameters from the destination template:
*
* 1. If the current request is filtering migrations based on soft-deletes, then we need to validate that
* the SOURCE status column is present in the current schema.
* 2. Validation on the mandatory SOURCE sort key (column) is done by ensuring that the column name also
* appears in the current schema.
* 3. That the migration map exists as a member element and is in array format and that every non-null value
* in the associative array exists in the current schema.
*
* Item 1 (above) is optional and will only proc if the MIGRATION_DELETED flag was set in the broker request
* data to Boolean(false).
*
* If Item 1 procs, then both the status and sort keys must be present in the current schema, as well as all
* value declared in the associative-array migration map, for the method to return a Boolean(true) value.
*
* Otherwise, Items 2 and 3 must be present in order to return a Boolean(true) value.
*
* Anything else will generate an error - an error event message will be recorded to the class error stack and
* a Boolean(false) value will be returned.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @return bool
*
*
* HISTORY:
* ========
* 02-01-16 mks _INF-139: original coding
*
*/
private function validationTemplateMigrationData(): bool
{
$rc = true; // return code (bool)
// basic validation - requiring all the stuffs to be present
if (!isset($this->migrationMap) or !is_array($this->migrationMap) or empty($this->migrationMap)) {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . STRING_MIGRATION_MAP;
$rc = false;
}
if (!isset($this->migrationSortKey) or empty($this->migrationSortKey)) {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_SOURCE_SORT_KEY;
$rc = false;
}
// status (array) required only if we're filtering soft-deleted records
if (array_key_exists(MIGRATION_FILTER_SOFT_DELETES, $this->brokerData) and true === boolval($this->brokerData[MIGRATION_FILTER_SOFT_DELETES])) {
if (!isset($this->migrationStatusKV) or empty($this->migrationStatusKV) or !is_array($this->migrationStatusKV)) {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_SOURCE_STATUS_KV;
$rc = false;
}
}
// validate that the named fields in the template exist in the current schema dump from the SOURCE
if ($rc and $this->schema == STRING_MYSQL) {
// if we're supporting filtering of soft-deletes, validate the status field
if (array_key_exists(MIGRATION_FILTER_SOFT_DELETES, $this->brokerData) and true === boolval($this->brokerData[MIGRATION_FILTER_SOFT_DELETES])) {
$rc = (false === array_search(key($this->migrationStatusKV), $this->mysqlFieldList)) ? false : true;
if (!$rc) {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_DATA_INVALID_KEY, key($this->migrationStatusKV));
$this->filterSoftDeletes = false; // reset this value to default
}
}
// validate that the required sort field is in the schema
if ($rc) {
if (!in_array($this->migrationSortKey, $this->mysqlFieldList)) {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_SOURCE_SORT_KEY;
$rc = false;
}
}
// validate the non-null fields in the migration map as being present in the source mysql;
if ($rc) {
$goodFields = true;
foreach ($this->migrationMap as $key => $value) {
if ($value != null and !in_array($value, $this->mysqlFieldList)) {
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_MIGRATION_DATA_FIELD_UNK, $value);
$goodFields = false;
}
}
$rc = $goodFields;
}
} elseif ($this->schema == STRING_MONGO) {
$rc = true;
// todo -- write this code once we know what fetched mongo-schema looks like...if we can ever know...
} else {
// error: unsupported schema
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_SCHEMA_NOT_SUPPORTED, $this->mysqlSourceSchema);
$rc = false;
}
return $rc;
}
/**
* updateWHMetaData() -- private method
*
* This method has an optional input parameter:
*
* $_data -- used to update the destination record, an assoc array containing the k->v pairs of new columns
*
* The general assumption is that this method will be invoked from several places/times:
*
* -- when we're looping through the WH data -- each iteration will prompt a call to either create or update
* -- when we're done processing WH data and we want to close the record
*
* We instantiate a brokerClass object to the admin-out queue and publish the request.
*
* The method returns a boolean indicating the success/fail of the remote update request.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @param array|null $_data
* @return bool
*
*
* HISTORY:
* ========
* 05-23-18 mks _INF-188: original coding completed
*
*/
private function updateWHMeta(array $_data = null): bool
{
// if we passed data (update) use it - o/w, grab all the data for a new-record create
$data = (is_null($_data)) ? $this->objWarehouseMeta->getData() : $_data;
if (is_null($data)) {
$msg = ERROR_DATA_MISSING_ARRAY . STRING_WH_META_DATA;
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_DATA_ERROR;
return false;
}
try {
/** @var gacBrokerClient $bc */
$bc = $this->getRabbitClient();
} catch (TypeError $t) {
$msg = ERROR_BROKER_CLIENT_DECLARE . BROKER_QUEUE_AO;
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_RESOURCE_ERROR_RABBIT;
if (is_object($bc)) {
$bc->__destruct();
unset($bc);
}
return false;
}
if (!$bc->status) {
$msg = ERROR_BROKER_CLIENT_DECLARE . BROKER_QUEUE_AO;
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_FRAMEWORK_WARNING;
if (is_object($bc)) {
$bc->__destruct();
unset($bc);
}
return false;
}
try {
// we have a broker client - create the request payload, submit the request, and parse the result
if (!isset($this->adminWHMetaData[META_EVENT_GUID]))
$this->adminWHMetaData[META_EVENT_GUID] = $this->objWarehouseMeta->getColumn(DB_EVENT_GUID);
if (!is_null($_data)) {
$payload = [
STRING_QUERY_DATA => [ DB_TOKEN => [ OPERAND_NULL => [ OPERATOR_EQ => [$this->objWarehouseMeta->getColumn(DB_TOKEN)]]]],
STRING_UPDATE_DATA => $data
];
$be = BROKER_REQUEST_ADMIN_MWH_EVENT_UPDATE;
} else {
$payload = $data;
$be = BROKER_REQUEST_ADMIN_MWH_EVENT_CREATE;
}
$request = [
BROKER_REQUEST => $be,
BROKER_DATA => $payload,
BROKER_META_DATA => $this->adminWHMetaData
];
$payload = gzcompress(json_encode($request));
$response = $bc->call($payload);
$response = json_decode(gzuncompress($response), true);
if (empty($response) or !is_array($response) or !$response[PAYLOAD_STATUS]) {
$msg = sprintf(ERROR_BROKER_INTERNAL_CALL, $request[BROKER_REQUEST], BROKER_QUEUE_AO);
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_FRAMEWORK_WARNING;
if (is_object($bc)) {
$bc->__destruct();
unset($bc);
}
return false;
}
$this->objWarehouseMeta->addData($response[PAYLOAD_RESULTS]);
} catch (Throwable $t) {
$msg = sprintf(ERROR_THROWABLE_EXCEPTION, $t->getMessage());
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available)
$this->logger->warn($msg);
else
consoleLog($this->res, CON_ERROR, $msg);
$this->state = STATE_DATA_ERROR;
if (is_object($bc)) {
$bc->__destruct();
unset($bc);
}
return false;
}
if (is_object($bc)) {
$bc->__destruct();
unset($bc);
}
return true;
}
/**
* getRabbitClient() -- private method
*
* This method is used by the class methods that need to publish migration-meta-data updates to the admin service.
*
* The method accepts an optional parameter - the queue designation which defaults to the AdminOut queue.
* Validation of the input-queue is handled during instantiation.
*
* All this method does is instantiate a new gacBrokerClient class object -- if the instantiation is successful,
* then the object is returned to the calling client. If not, a null is returned and error messages are generated
* and assigned/output.
*
* Function was created to reduce overall code footprint.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @param string $_queue
* @return gacBrokerClient|null
*
*
* HISTORY:
* ========
* 02-12-18 mks _INF-139: original coding
*
*/
private function getRabbitClient(string $_queue = BROKER_QUEUE_AO): ? gacBrokerClient
{
try {
// create a broker-client class for the query
$bc = new gacBrokerClient($_queue);
} catch (Throwable $t) {
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
$this->state = STATE_FRAMEWORK_WARNING;
return null;
}
if (!is_object($bc) or !$bc->status) {
$msg = ERROR_BROKER_CLIENT_DECLARE . $_queue;
consoleLog($this->res, CON_SYSTEM, $msg);
$this->errorStack[] = $msg;
if (is_object($bc)) $bc->__destruct();
unset($bc);
$this->state = STATE_FRAMEWORK_WARNING;
return null;
}
return $bc;
}
/**
* publishMigrationEvent() -- private method
*
* This method requires two input parameters:
*
* $_request: the associative array we're going to morph into an AMQP message.
* $_response: a call-by-reference parameter that will return the broker payload received from the admin service
*
* There's no validation for the input data as any errors will be handled and recorded in the broker event.
*
* The method returns a boolean value indicating whether or not the request was successfully published and
* returned back a payload. (Note: The payload itself may contain errors but processing those errors is
* beyond the scope of this method.)
*
* We also instantiate a gacBrokerClient to handle the AMQP request submission to the Admin service.
*
* This method was written to reduce the code footprint.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @param array $_request
* @param array $_response
* @return bool
*
*
* HISTORY:
* ========
* 02-12-18 mks _INF-139: original coding
*
*/
private function publishMigrationEvent(array $_request, array &$_response = []): bool
{
$bc = null;
try {
// get the broker client we'll use to publish the update/save request to the remote admin service
if (is_null($bc = $this->getRabbitClient())) return false;
$payload = gzcompress(json_encode($_request));
$_response = json_decode(gzuncompress($bc->call($payload)), true);
if (!is_array($_response) or !$_response[PAYLOAD_STATUS]) {
$msg = ERROR_BROKER_RESPONSE_BAD;
$this->errorStack[] = $msg;
if (is_array($_response[PAYLOAD_DIAGNOSTICS])) {
$this->errorStack = array_merge($this->errorStack, $_response[PAYLOAD_DIAGNOSTICS]);
}
$this->state = STATE_AMQP_ERROR;
if (is_object($bc)) $bc->__destruct();
unset($bc);
return false;
}
} catch (Throwable $t) {
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
$this->state = STATE_FRAMEWORK_WARNING;
if (is_object($bc)) $bc->__destruct();
unset($bc);
return false;
}
if (is_object($bc)) $bc->__destruct();
unset($bc);
return true;
}
/**
* processMongoResults() -- private method
*
* This method is a single-use/purpose method meant to calculate the results from a mongoDB bulk-write insert
* invoked as part of the create-record process during migration. It's only called from the migrateData()
* function and was broken out to reduce the calling function's code footprint.
*
* There are two required inputs to the method:
*
* $_widget - this is a copy of the class $widget that was created in the parent function
* $_mappedResults -- an array of the records submitted for insertion to mongoDB
*
* The algorithm is to fetch the bulk-write results from the widget (a gacMongoDB object) post _createRecord()
* and then extract data about the bulk-write operation (records created), and from data grab the last record
* written (in case we have have to resume processing), and check for dropped records. Data about the write
* event is posted to the migration (tracking) object.
*
* The method will return a Boolean(false) value if the bulk-write object returned null from the create method,
* or if we generated additional errors when parsing the return data via other method calls.
*
* Otherwise, the method returns a Boolean(true) value.
*
*
* @param gacMongoDB $_widget
* @param array $_mappedResults
* @return bool
*
*
* HISTORY:
* ========
* 02-18-18 mks _INF-139: original coding
* 03-27-18 mks CORE-852: corrected errors in method header comment block, fixed last-written-record fetch
*
*/
private function processMongoResults(gacMongoDB $_widget, array $_mappedResults): bool
{
// fetch number of records inserted in this pass and add it to the migration meta data
$rc = count($_mappedResults);
$esc = count($this->errorStack); // error stack count
try {
/** @var MongoDB\Driver\WriteResult $bwr */
$bwr = $_widget->getBWResults();
if (!is_null($bwr)) {
$rw = $bwr->getInsertedCount(); // count of Records Written
// if the record count is greater-than the count of records written....
if ($rc > $rw) {
// grab what we think is the last record written and encode it
$this->lastRecordWritten = json_encode($_mappedResults[($rc - 1)]);
// store it in the migration object
@$this->objMigrate->insertField(MWH_LAST_REC_WRITTEN, $this->lastRecordWritten, 0, $this->errorStack, $this->res);
// update the dropped record count
$dr = $this->objMigrate->getColumn(MWH_NUM_RECS_DROPPED);
$tdr = $dr + ($rc - $rw); // total dropped record count
@$this->objMigrate->insertField(MWH_NUM_RECS_DROPPED, $tdr, 0, $this->errorStack, $this->res);
}
$cc = $this->objMigrate->getColumn(MWH_NUM_RECS_MOVED) + $rw;
@$this->objMigrate->insertField(MWH_NUM_RECS_MOVED, $cc, 0, $this->errorStack, $this->res);
} else {
$msg = sprintf(ERROR_DATA_OBJECT_EMPTY, STRING_BULK_WRITE);
$this->errorStack[] = $msg;
consoleLog($this->res, CON_SYSTEM, $msg);
return false;
}
unset($bwr);
} catch (Throwable $t) {
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
$this->state = STATE_FRAMEWORK_WARNING;
}
// return if we got errors setting data in the migrate object
if ($esc < count($this->errorStack)) return false;
return true;
}
/**
* processPDOResults() -- private method
*
* This method is called at the end of a write-cycle during migration when mySQL is the destination schema. The
* method is meant to update the migration record, keeping track of the migration progress and metrics.
*
* There are two input parameters to this method:
*
* $_widget -- a copy of the destination class widget (Namaste data object)
* $_mappedResults -- an array containing the list of records inserted into mySQL
*
* If we wrote less records than what's contained in the data array, the assumption is that some records were
* dropped during processing. If that's the case, then we want to capture the number of records written, dropped,
* and the last written record and post all this information to the migration record.
*
* If no records were dropped, just update the current record count of total number of inserted objects in the
* current (ongoing) migration effort.
*
* The method returns a Boolean value that's false if we generated any errors during this collection process,
* otherwise a Boolean(true) is returned.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @param gacPDO $_widget
* @param array $_mappedResults
* @return bool
*/
private function processPDOResults(gacPDO $_widget, array $_mappedResults): bool
{
$rc = count($_mappedResults); // total record count
$esc = count($this->errorStack); // error stack count
$rw = $_widget->recordsInserted; // records written
try {
if ($rc > $rw) {
// a record was dropped - grab the last record in the array as potentially the last record written
$this->lastRecordWritten = json_encode($_mappedResults[($rc - 1)]);
// store it in the migration object
@$this->objMigrate->insertField(MWH_LAST_REC_WRITTEN, $this->lastRecordWritten, 0, $this->errorStack, $this->res);
// update the dropped record counter
$dr = $this->objMigrate->getColumn(MWH_NUM_RECS_DROPPED);
$tdr = $dr + $_widget->recordsDropped; // total dropped records
@$this->objMigrate->insertField(MWH_NUM_RECS_DROPPED, $tdr, 0, $this->errorStack, $this->res);
}
$twr = $this->objMigrate->getColumn(MWH_NUM_RECS_MOVED) + $rw;
@$this->objMigrate->insertField(MWH_NUM_RECS_MOVED, $twr, 0, $this->errorStack, $this->res);
} catch (Throwable $t) {
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
$this->state = STATE_FRAMEWORK_WARNING;
}
// return false if any new errors were generated
return ($esc < count($this->errorStack)) ? false : true;
}
/**
* doMigration() -- private method
*
* This was the old constructor code before we introduced warehousing to the class. Once we introduced warehousing
* we determine the event type in the constructor and invoke the controller function specific to the event.
*
* There is one input parameter to this method and that's an array containing the broker data as received by
* the migration broker.
*
* The method returns is embedded in the class member variables to the return type is void.
*
* If any of the migration processing steps, represented by the invoked method, fail, then we'll return immediately
* passing the results straight back to the broker event code.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* HISTORY:
* ========
* 04-10-18 mks _INF-188: code refactoring completed
*
*/
private function doMigration(): void
{
if (!isset($this->brokerData[MIGRATION_TEST_MODE])) $this->brokerData[MIGRATION_TEST_MODE] = 0;
$this->schema = $this->brokerData[MIGRATION_SOURCE_SCHEMA];
$this->destinationSchema = $this->brokerData[MIGRATION_DEST_SCHEMA];
$this->lastRecordWritten = null;
// reset state/status for the query
$this->state = STATE_VALIDATION_ERROR;
$this->status = false;
try {
// connect to the remote (source) schema service
if (!$this->remoteConnect()) return;
// get the count of the number of records to migrate (incidentally testing source params)
if (!$this->getTableBonafides()) return;
// instantiate the Namaste destination widget and the system migrations widget
if (!$this->getNamasteObjects()) return;
// make a call to the admin/migrations table and see if we're already done this migration or
// if it's a migration in-progress...
$this->fetchMigrationMetaData();
if ($this->state != STATE_SUCCESS) return;
$this->state = STATE_VALIDATION_ERROR;
// calculate the coverage of the migration
if (!$this->calculateCoverage()) return;
// call the method to do the data migration
if (!$this->migrateData()) return;
$this->state = STATE_SUCCESS;
$this->status = true;
} catch (Throwable $t) {
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
$this->state = STATE_FRAMEWORK_WARNING;
}
}
/**
* startWHRequest() -- private method
*
* This method has no input parameters and returns no value -- status and values are stored within the class member
* and, as such, should be evaluated by the calling client on return.
*
* The method validates the request basics:
*
* 1. do we have a data payload?
* 2. do we have meta data?
* 3. can we load the data template?
* 4. does the template have WH settings
*
* At this stage, once the above has been validated, then we call the method to validate the request details.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
*
* HISTORY:
* ========
* 04-24-18 mks _INF-188: original coding
*
*/
private function startWHRequest(): void
{
$this->status = false;
$debug = gasConfig::$settings[CONFIG_DEBUG];
// first thing to do is validate that we have a data payload
if (empty($this->brokerData) or !isset($this->brokerData)) {
$msg = ERROR_DATA_ARRAY_EMPTY . COLON . BROKER_DATA;
$this->errorStack[] = $msg;
$this->state = STATE_DATA_ERROR;
if (isset($this->logger) and $this->logger->available) {
$this->logger->error($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
return;
}
// next - validate the payload as a valid warehousing request
if (!isset($this->meta[META_TEMPLATE]) or empty($this->meta[META_TEMPLATE])) {
$msg = ERROR_META_CLIENT_404;
$this->errorStack[] = $msg;
$this->state = STATE_META_ERROR;
if (isset($this->logger) and $this->logger->available) {
$this->logger->error($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
return;
}
// load the template
try {
/** @var gatProductRegistrations $template */
$tc = STRING_CLASS_GAT . $this->meta[META_TEMPLATE];
$template = new $tc();
} catch (Throwable $t) {
$this->errorStack[] = ERROR_EXCEPTION;
$this->errorStack[] = $t->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->error(ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage());
} else {
consoleLog($this->res, CON_ERROR, ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage());
}
$this->state = STATE_FRAMEWORK_WARNING;
return;
}
$this->whSettings = $template->wareHouse;
if (!is_array($this->whSettings)) {
$msg = ERROR_DATA_ARRAY_EMPTY . COLON . STRING_WH_SETTINGS;
if (isset($this->logger) and $this->logger->available) {
$this->logger->error($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
$this->errorStack[] = $msg;
$this->state = STATE_TEMPLATE_ERROR;
return;
}
try {
// data validation is complete -- let's validate the query against the class template settings
if (!$this->validateWHRequest($debug)) return;
} catch (TypeError $t) {
$msg = ERROR_TYPE_EXCEPTION . COLON . $t->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->error($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
}
}
/**
* validateWHRequest() -- private method
*
* This method is called by the constructor on a new warehousing request. The method takes a single, optional,
* input parameter which is the current debug state. (The class does not normally access the framework's debug
* setting which is why the default is set to false.)
*
* The method is responsible for first-pass validation of the WH request -- and, importantly, if this source
* data for the request is remote to Namaste, we confirm that we can create a resource connection to the remote
* service.
*
* The validation stubs just ensure that the WH request doesn't violate individual rules set in the destination
* class template.
*
* We instantiate two other Namaste classes in this method and assign them to the following members:
*
* $widget -- this is the local data class instantiation specified in the meta data META_TEMPLATE value
* $objWarehouse -- this is the admin data class that records the wh operation progress
*
* The ultimate objective of this method is to generate these two objects so that, when control is returned to
* the calling client, we can pass control back to the admin WH broker, which returns the task GUID back to
* the requesting client -- then the broker will invoke the processing method in this class to perform the
* actual warehousing operation without blocking the original client.
*
* The method returns a boolean value - if the request passes initial validation, then we return a boolean(true),
* otherwise, a boolean(false) is returned.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @param bool $_debug
* @return bool
*
*
* HISTORY:
* ========
* 04-24-16 mks _INF-188: original coding completed
* 01-08-20 mks DB-150: fixed bug in assignment for $mysqlConfig and $mongoConfig
*/
private function validateWHRequest(bool $_debug = false): bool
{
$skipCheck = false;
$mysqlConfig = false;
$mongoConfig = false;
// check 1: Does the destination data class support warehousing?
if (!$this->whSettings[WH_SUPPORTED]) {
$error = sprintf(ERROR_WH_CLASS_NOT_SUPPORTED, $this->meta[META_TEMPLATE]);
$this->errorStack[] = $error;
$this->state = STATE_DATA_ERROR;
if ($_debug) $this->logger->debug($error);
return false;
}
// check 2: Check if the source is remote (to Namaste) and, if so, is the class auth to fetch remote data
if (isset($this->brokerData[WH_SOURCE_IS_REMOTE]) and $this->brokerData[WH_SOURCE_IS_REMOTE]) {
if (!$this->whSettings[WH_REMOTE_SUPPORT]) {
$error = sprintf(ERROR_WH_REMOTE_SOURCE_NOT_AUTH, $this->meta[META_TEMPLATE]);
$this->errorStack[] = $error;
$this->state = STATE_DATA_ERROR;
if ($_debug) $this->logger->debug($error);
return false;
}
// check 2.1: if this is a remote request, ensure we have a remote table name in the payload
if (!isset($this->brokerData[WH_REMOTE_TABLE]) or empty($this->brokerData[WH_REMOTE_TABLE])) {
$this->errorStack[] = ERROR_WH_REMOTE_SOURCE_404;
if ($_debug) $this->logger->debug(ERROR_WH_REMOTE_SOURCE_404);
$this->state = STATE_DATA_ERROR;
return false;
}
$this->remoteSource = true; // set the remote-source flag which we'll check later
}
// check 3: if this is an automated (cron-launched) request, validate that automation is supported
if (isset($this->brokerData[WH_AUTOMATED]) and $this->brokerData[WH_AUTOMATED]) {
if (false == $this->whSettings[WH_AUTOMATED]) {
$error = sprintf(ERROR_WH_CRON_NOT_SUPPORTED, $this->meta[META_TEMPLATE]);
$this->errorStack[] = $error;
$this->state = STATE_DATA_ERROR;
if ($_debug) $this->logger->debug($error);
return false;
}
$skipCheck = true;
}
// check 4: if this is an ad-hoc request, validate that the class supports such requests
if (!$this->whSettings[WH_DYNAMIC] and !$skipCheck) {
$error = sprintf(ERROR_WH_DYNAMIC_NOT_SUPPORTED, $this->meta[META_TEMPLATE]);
$this->errorStack[] = $error;
$this->state = STATE_DATA_ERROR;
if ($_debug) $this->logger->debug($error);
return false;
}
// check 5: WH_FILTER_VAL is required, should be an indexed array with two valid short-dates
if (!isset($this->brokerData[WH_FILTER_VALUES])) {
$error = ERROR_DATA_KEY_404 . WH_FILTER_VALUES;
$this->errorStack[] = $error;
if ($this->widget->debug) $this->logger->debug($error);
$this->state = STATE_DATA_ERROR;
return false;
} elseif (!is_array($this->brokerData[WH_FILTER_VALUES])) {
$error = ERROR_DATA_ARRAY_NOT_ARRAY . WH_FILTER_VALUES;
$this->errorStack[] = $error;
if ($this->widget->debug) $this->logger->debug($error);
$this->state = STATE_DATA_ERROR;
return false;
} elseif (count($this->brokerData[WH_FILTER_VALUES]) != 2) {
$error = sprintf(ERROR_DATA_RECORD_COUNT, 2, count($this->brokerData[WH_FILTER_VALUES]));
$this->errorStack[] = $error;
if ($this->widget->debug) $this->logger->debug($error);
$this->state = STATE_DATA_ERROR;
return false;
} elseif (!validateMigrationDate($this->brokerData[WH_FILTER_VALUES][0])) {
$error = sprintf(ERROR_DATE_INVALID, $this->brokerData[WH_FILTER_VALUES][0]);
$this->errorStack[] = $error;
$this->state = STATE_DATA_ERROR;
if ($_debug) $this->logger->debug($error);
return false;
} elseif (!validateMigrationDate($this->brokerData[WH_FILTER_VALUES][1])) {
$error = sprintf(ERROR_DATE_INVALID, $this->brokerData[WH_FILTER_VALUES][1]);
$this->errorStack[] = $error;
$this->state = STATE_DATA_ERROR;
if ($_debug) $this->logger->debug($error);
return false;
}
// check 6: if this is a remote source, then check for the migration URI configuration to be present
if ($this->remoteSource) {
if (!isset(gasConfig::$settings[CONFIG_MIGRATION]) or empty(gasConfig::$settings[CONFIG_MIGRATION]) or !is_array(gasConfig::$settings[CONFIG_MIGRATION])) {
$error = ERROR_WH_REMOTE_MIG_CFG_404;
$this->errorStack[] = $error;
$this->state = STATE_DATA_ERROR;
if ($_debug) $this->logger->debug($error);
return false;
}
$mysqlConfig = (isset(gasConfig::$settings[CONFIG_MIGRATION][CONFIG_SCHEMA_MYSQL]) and is_array(gasConfig::$settings[CONFIG_MIGRATION][CONFIG_SCHEMA_MYSQL])) ? gasConfig::$settings[CONFIG_MIGRATION][CONFIG_SCHEMA_MYSQL] : new stdClass();
$mongoConfig = (isset(gasConfig::$settings[CONFIG_MIGRATION][CONFIG_SCHEMA_MONGO]) and is_array(gasConfig::$settings[CONFIG_MIGRATION][CONFIG_SCHEMA_MONGO])) ? gasConfig::$settings[CONFIG_MIGRATION][CONFIG_SCHEMA_MONGO] : new stdClass();
}
// check 7: if we have test-mode set in the meta, or we're in debug mode, override the warehouse default setting
if ((isset($this->brokerData[MWH_TEST_MODE]) and $this->brokerData[MWH_TEST_MODE]) or $_debug) {
$this->whSettings[WH_DELETE] = 'T';
}
// if we get to this point, the request is valid in terms of permissions/authorizations/scope,
// next, if an override query was submitted, we have to validate the query: (will it build?).
$widget = new gacFactory($this->meta, FACTORY_EVENT_NEW_CLASS, '', $this->errorStack);
if (!$widget->status) {
$error = ERROR_TEMPLATE_INSTANTIATE . $this->meta[META_TEMPLATE];
$this->errorStack[] = $error;
$this->state = STATE_TEMPLATE_ERROR;
if ($_debug) $this->logger->debug($error);
return false;
}
$this->widget = $widget->widget;
if (is_object($widget)) $widget->__destruct();
unset($widget);
// assign the source schema
switch ($this->widget->template->schema) {
case CONFIG_DATABASE_PDO :
case STRING_MYSQL:
$this->schema = STRING_MYSQL;
$lTime = $this->brokerData[WH_FILTER_VALUES][0] . TIME_ZEROS;
$rTime = $this->brokerData[WH_FILTER_VALUES][1] . TIME_ZEROS;
if ($this->remoteSource) {
$this->brokerData[MIGRATION_SOURCE_SCHEMA] = STRING_MYSQL;
$this->remoteSourceConfig = $mysqlConfig;
if (!$mysqlConfig) {
$this->errorStack[] = ERROR_WH_REMOTE_MYSQL_CFG_404;
$this->state = STATE_CONFIG_ERROR;
if ($_debug) $this->logger->debug(ERROR_WH_REMOTE_MYSQL_CFG_404);
return false;
}
}
break;
case CONFIG_DATABASE_MONGODB :
case STRING_MONGO :
case TEMPLATE_DB_MONGO :
$this->schema = STRING_MONGO;
// convert to epoch time if the source schema is mongo
$lTime = unPrettyTime($this->brokerData[WH_FILTER_VALUES][0] . TIME_ZEROS);
$rTime = unPrettyTime($this->brokerData[WH_FILTER_VALUES][1] . TIME_ZEROS);
if ($this->remoteSource) {
$this->brokerData[MIGRATION_SOURCE_SCHEMA] = STRING_MONGO;
$this->remoteSourceConfig = $mongoConfig;
if (!$mongoConfig) {
$this->errorStack[] = ERROR_WH_REMOTE_MONGO_CFG_404;
$this->state = STATE_CONFIG_ERROR;
if ($_debug) $this->logger->debug(ERROR_WH_REMOTE_MONGO_CFG_404);
return false;
}
}
break;
default :
$error = sprintf(ERROR_SCHEMA_NOT_SUPPORTED, $this->widget->template->schema);
$this->errorStack[] = $error;
if ($_debug) $this->logger->debug($error);
$this->state = STATE_DATA_ERROR;
return false;
break;
}
// using the payload query-filter value, create the source query
$field = ($this->remoteSource) ? $this->brokerData[WH_REMOTE_CDATE_FIELD] : DB_CREATED;
$query[$field][OPERAND_AND] = [ OPERATOR_GT => [$lTime], OPERATOR_LT => [$rTime]];
$this->nQuery = $query;
try {
// check that we can build the namaste (local) query
if (!$this->remoteSource and !$this->widget->_checkQuery($query)) {
$error = ERROR_DATA_QUERY_BUILD;
$this->errorStack[] = $error;
$this->state = STATE_TEMPLATE_ERROR;
if (isset($this->logger) and $this->logger->available) {
$this->logger->error($error);
} else {
consoleLog($this->res, CON_ERROR, $error);
}
return false;
} elseif ($this->remoteSource and $this->schema == STRING_MYSQL) {
$query = $field . " > '" . $lTime . "' AND " . $field . " < '" . $rTime . "' ";
}
$this->whereClause = $query; // stash the built where-clause
$this->status = false; // reset from _checkQuery()
} catch (Throwable $t) {
$error = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $error;
if (isset($this->logger) and $this->logger->available) {
$this->logger->error($error);
} else {
consoleLog($this->res, CON_ERROR, $error);
}
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
// set the query limit for the source-fetch, overriding the framework defaults using undocumented feature
$this->meta[META_LIMIT_OVERRIDE] = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_WH_RECS_PER_XFER];
// load the migration map into the widget class member
if (!isset($this->widget->template->migrationMap)) {
$error = ERROR_MIGRATION_MAP_404 . COLON . $this->meta[META_TEMPLATE];
$this->errorStack[] = $error;
$this->state = STATE_TEMPLATE_ERROR;
if ($_debug) $this->logger->debug($error);
return false;
}
try {
// if source is remote, check the connection
if ($this->remoteSource and !$this->remoteConnect()) return false; // check val of remoteAvailable
// we need to return a GUID back to the event requester so that they're not blocked the entire time we're
// doing the actual warehousing operation.
// instantiate and populate the warehousing object that will contain the progress/final reports of the request.
$widget = new gacFactory([META_TEMPLATE => TEMPLATE_CLASS_WAREHOUSE], FACTORY_EVENT_NEW_CLASS, '', $this->errorStack);
if (!$widget->status) {
$error = ERROR_TEMPLATE_INSTANTIATE . TEMPLATE_CLASS_WAREHOUSE;
$this->errorStack[] = $error;
if ($_debug) $this->logger->debug($error);
$this->state = STATE_TEMPLATE_ERROR;
return false;
}
$this->objWarehouseMeta = $widget->widget;
if (is_object($widget)) $widget->__destruct();
unset($widget);
// build the wh meta data and add to the wh obj
$whData = [
MWH_SOURCE_SCHEMA => $this->schema,
MWH_SOURCE_TABLE => (($this->remoteSource) ? $this->brokerData[WH_REMOTE_TABLE] : $this->widget->collectionName),
MWH_DEST_SCHEMA => $this->schema,
MWH_DEST_TABLE => ($this->widget->template->whTemplate . $this->widget->template->extension),
MWH_QUERY => $this->widget->strQuery,
MWH_QUERY_DATA => json_encode([$lTime, $rTime]),
DB_EVENT_GUID => $this->meta[META_EVENT_GUID]
];
if (!$this->objWarehouseMeta->addData([$whData])) {
$error = ERROR_DATA_ARRAY_ADD . $this->objWarehouseMeta->class;
$this->errorStack[] = $error;
$this->state = STATE_FRAMEWORK_WARNING;
if (isset($this->logger) and $this->logger->available) {
$this->logger->error($error);
} else {
consoleLog($this->res, CON_ERROR, $error);
}
return false;
}
// save the warehouse meta record so we can return the guid back to the appserver requestor
$this->updateWHMeta();
} catch (Throwable $t) {
$error = ERROR_THROWABLE_EXCEPTION . basename(__FILE__) . AT . __LINE__ . COLON;
$this->errorStack[] = $error;
$this->errorStack[] = $t->getMessage();
if (isset($this->logger) and $this->logger->available) {
$this->logger->error($error);
$this->logger->error($t->getMessage());
} else {
consoleLog($this->res, CON_ERROR, $error);
consoleLog($this->res, CON_ERROR, $t->getMessage());
}
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
$this->status = true;
$this->state = STATE_SUCCESS;
return true;
}
/**
* issueRemoteSQLQuery() -- private method
*
* This method is used to issue a remote, non-prepared, query to a remote mySQL/mariaDB resource.
*
* The method has three input parameters, one of which is required...
*
* $_query - the fully-qualified query string to be passed to the remote db for execution
* $_isCount - boolean value, defaulting to false, that indicates if we're exec'ing a remote count(*) query
* $_retVal - call-by-reference param, if isCount == true, the query results will be loaded here
*
* The method returns a Boolean value to indicate if the query successfully executed or not. It is not an
* indication that data was returned.
*
* If data is not returned (assume that the remote query exec'd successfully), then return a Boolean(true)
* value but set the member variable $noData == true to indicate the lack of data.
*
* If data is returned, set the $recordCount equal to the number of rows returned if this was a non-count query.
* If the query is a count(*) fetch, then populate $recordCount with that count value. Otherwise, the returned
* data is stored in the $queryData member variable.
*
* The client calls to this method should be wrapped in a TypeException handler.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @param string $_query
* @param bool $_isCount
* @param int $_retVal
* @return bool
*
* HISTORY:
* ========
* 05-03-18 mks _INF-188: original coding
* 05-23-18 mks _INF-188: added $_retVal param
*
*/
private function issueRemoteSQLQuery(string $_query, bool $_isCount = false, int &$_retVal = 0): bool
{
$this->noData = false;
$startTime = floatval(0);
try {
// start the query timer if supported
if ($this->widget->useTimers) $startTime = gasStatic::doingTime();
foreach ($this->resRemote->query($_query) as $row) {
$queryResults[] = $row;
}
// log the query time if supported
if ($this->widget->useTimers) $this->logger->metrics($_query, gasStatic::doingTime($startTime));
if (!empty($queryResults)) {
if ($_isCount) {
$_retVal = (!empty($queryResults)) ? $queryResults[0][STRING_NUM_RECS] : 0;
} else {
$this->recordCount = count($queryResults);
$this->queryData = $queryResults;
}
} else {
$this->noData = true;
$this->errorStack[] = ERROR_UT_QUERY_RETURNED_ZERO;
return true;
}
return true;
} catch (PDOException $p) {
$msg = sprintf(ERROR_PDO_EXCEPTION, $_query);
$this->errorStack[] = $msg;
$this->errorStack[] = $p->getMessage();
$this->logger->warn($msg);
$this->logger->warn($p->getMessage());
$this->state = STATE_DB_ERROR;
return false;
} catch (Throwable $t) {
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
$this->state = STATE_FRAMEWORK_WARNING;
return false;
}
}
/**
* deleteWHSourceData() -- private method
*
* This method is called as part of the wareHousing process -- it is not called during data migration.
*
* The method should already have all internal members populated prior to invocation. As such, no input parameters
* are required. The method returns a Boolean value to indicate success or failure of the request and, as always,
* it is the responsibility of the calling client to check the return value.
*
* The method looks at the current schema of the source data ($this->widget) and if the source is remote, (as in:
* external to namaste), then builds the query to execute directly via the remote resource API. If the source data
* is local then we'll build a broker delete-event request and publish this request to the namaste write broker.
*
* the namaste-delete operation will take care of the delete request (in terms of soft/hard deletes) and will
* return a standard payload. On a successful remote event, we simple return -- otherwise the remote-request's
* diagnostics are integrated into the class error stack.
*
*
* @author mike@givingassistant.org
* @version 1.0
*
* @return bool
*
*
* HISTORY:
* ========
* 05-21-18 mks _INF-188: original coding completed
*
*/
private function deleteWHSourceData(): bool
{
switch ($this->widget->schema) {
case CONFIG_SCHEMA_MYSQL :
case CONFIG_DATABASE_PDO :
if ($this->remoteSource) {
// call exec-remote-sql method
$query = preg_replace('/\?/', "'" . $this->widget->queryVariables[0] . "'", $this->widget->strQuery, 1);
$query = preg_replace('/\?/', "'" . $this->widget->queryVariables[1] . "'", $query, 1);
$query = 'DELETE /* ' . basename(__FILE__) . AT . __LINE__ . ' */ FROM ' . $this->widget->collectionName . ' WHERE ' . $query;
$msg = INFO_SHOULD_NOT_SEE_THIS . COLON . $query;
if (isset($this->logger) and $this->logger->available) {
$this->logger->info($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
} else {
try {
// build a query to submit to namaste write broker
$bc = new gacBrokerClient(BROKER_QUEUE_W, __METHOD__ . AT . __LINE__);
if (!$bc->status) {
$error = ERROR_BROKER_CLIENT_DECLARE . BROKER_QUEUE_W;
$this->errorStack[] = $error;
if ($this->widget->debug) $this->logger->debug($error);
$this->errorStack[] = ERROR_WH_DEL_REMOTE_RECS;
consoleLog($this->res, CON_SYSTEM, ERROR_WH_DEL_REMOTE_RECS);
$this->widget->logger->error(ERROR_WH_DEL_REMOTE_RECS);
if (is_object($bc)) $bc->__destruct();
unset($bc);
return false;
}
$payload = [
BROKER_REQUEST => BROKER_REQUEST_DELETE,
BROKER_DATA => [STRING_QUERY_DATA => $this->whereClause],
BROKER_META_DATA => $this->meta
];
$request = gzcompress(json_encode($payload));
$response = json_decode(gzuncompress($bc->call($request)), true);
if (!$response[PAYLOAD_STATUS]) {
$error = sprintf(ERROR_UT_BROKER_EVENT_FAIL, BROKER_REQUEST_DELETE, $response[PAYLOAD_STATE]);
if (is_array($response[PAYLOAD_DIAGNOSTICS])) $this->errorStack = array_merge($this->errorStack, $response[PAYLOAD_DIAGNOSTICS]);
$this->errorStack[] = $error;
if ($this->widget->debug) $this->logger->debug($error);
if (is_object($bc)) $bc->__destruct();
unset($bc);
return false;
}
if (is_object($bc)) $bc->__destruct();
unset($bc);
return true;
} catch (Throwable $t) {
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
$this->errorStack[] = $msg;
if (isset($this->logger) and $this->logger->available) {
$this->logger->warn($msg);
} else {
consoleLog($this->res, CON_ERROR, $msg);
}
return false;
}
}
break;
}
return true;
}
/**
* __destruct() -- public function
*
* class destructor
*
*
* @author mike@givingassistant.org
* @version 1.0
*
*
* HISTORY:
* ========
* 02-21-18 mks _INF-139: original coding
*
*/
public function __destruct()
{
// As of PHP 5.3.10 destructors are not run on shutdown caused by fatal errors.
//
// destructor is registered shut-down function in constructor -- so any recovery
// efforts should go in this method.
}
}