952 days continuous production uptime, 40k+ tp/s single node. Original corpo Bitbucket history not included — clean archive commit.
3645 lines
179 KiB
PHP
3645 lines
179 KiB
PHP
<?php
|
|
/**
|
|
* gacMigrations -- class file
|
|
*
|
|
* this class is a new Namaste class and is intended for use only in the backend migration operation where we'll
|
|
* migrate all data from one database table or collection to a Namaste database table or collection.
|
|
*
|
|
* As of 3/25/18 the class also supports warehousing operations which is meant to import records either internal or
|
|
* external (to namaste) and move them into COOL or COLD storage. Cold storage is not implemented at this time until
|
|
* a schema is determined -- assuming we're writing to SaaS cold-storage, a new API will have to be integrated to
|
|
* handle COLD (and COLD->WARM) requests.
|
|
*
|
|
* Since the source is accessing, or has the capacity to access, production-level data, we need to create a new class
|
|
* outside of the Namaste framework that limits the capability of the class to read-only operations as a function of
|
|
* security and protecting against (un)intentional destructive queries against the source.
|
|
*
|
|
* The source data must be defined in the env.xml file - NOT in the namaste.xml file.
|
|
*
|
|
* The destination object is instantiated as a Namaste class since we'll be using the vetted procedures to move the
|
|
* source data into (potentially) a production database. In order to successfully migrate data into Namaste, the
|
|
* destination template file must have a pre-defined section ("migrationMap") defined that maps the source columns
|
|
* to the destination columns, including data-types (casting to a new type may happen because different schemas)
|
|
*
|
|
* All of the migration functionality will be initiated via this class and invoked by only by the broker event.
|
|
*
|
|
* Note that you also have the option to migrate data within the same schema.
|
|
*
|
|
* Remember -- the point of migration is to import data (all the records) from one table in a database defined
|
|
* externally to Namaste, to a database that is internal to Namaste.
|
|
*
|
|
* Supported Migrations:
|
|
* ---------------------
|
|
* 1. mysql -> mongo
|
|
* 2. mongo -> mysql
|
|
* 3. mysql -> mysql
|
|
* 4. mongo -> mongo
|
|
*
|
|
* Supported Warehousing:
|
|
* ----------------------
|
|
* 1. mysql (external) -> COOL
|
|
* 2. mysql (namaste) -> COOL
|
|
*
|
|
* To be completed:
|
|
* ----------------
|
|
* 1. mongo (external) -> COOL
|
|
* 2. mongo (namaste) -> COOL
|
|
* 3. COLD (all)
|
|
* 4. WARM (all)
|
|
*
|
|
* Note:
|
|
* -----
|
|
* Source data is accessed using non-destructive queries only for both migration and warehousing!
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 01-24-18 mks _INF-139: original coding
|
|
* 03-22-18 mks CORE-852: mysql destination coding
|
|
* 05-25-18 mks _INF-188: completed code update for data warehousing
|
|
* 07-31-18 mks CORE-774: PHP7.2 exception handling
|
|
* 01-07-20 mks DB-150: PHP7.4 class member type casting
|
|
*
|
|
*/
|
|
class gacMigrations
|
|
{
|
|
// data is migrated by making calls to the respective objects for the source and destination data.
|
|
// public $objSource = null; // pointer to the source class object
|
|
public string $state; // a STATE container
|
|
public bool $status; // Boolean status container
|
|
public array $errorStack; // container for the class error-stack
|
|
public string $migrationReport; // container for the migration results report
|
|
private bool $testMode = false; // must be over-ridden by broker payload
|
|
private array $config; // container for the configuration XML
|
|
private array $allowedSchemas = [ // authority container for valid schemas
|
|
CONFIG_SCHEMA_MYSQL,
|
|
CONFIG_SCHEMA_MONGO
|
|
];
|
|
private array $brokerData; // container for a copy of the broker request data
|
|
private string $res = 'MGRA: '; // resource identifier for console logging
|
|
private object $resRemote; // pointer to the remote database resource
|
|
private bool $remoteAvailable = false; // quick indicator that confirms remote source availability
|
|
private object $widget; // pointer to the gacFactory destination widget
|
|
private string $schema; // identifies, by name, the source schema
|
|
private string $destinationSchema; // identifies, by name, the destination schema
|
|
private int $recordsInSourceTable; // count of the total # records in the source table
|
|
private array $mysqlSourceSchema; // used to store output from "desc table" mysql command
|
|
private array $mysqlFieldList; // list of mysql source field names
|
|
private array $mongoSourceSchema; // used to store mongo schema for the source table
|
|
private array $migrationMap; // stores the migration map found in the namaste source template
|
|
private string $migrationSortKey; // stores the sort-key that we'll sort SOURCE data by
|
|
private array $migrationStatusKV; // stores the status-key for the SOURCE data for soft-deletes
|
|
private int $coverage; // coverage as a percent of SOURCE data that maps successfully
|
|
private bool $filterSoftDeletes = false; // toggle for importing soft-deleted records
|
|
private gacMongoDB $objMigrate; // pointer to the Migration class object (system object)
|
|
public object $objWarehouseMeta; // container for the Warehouse class object (system object)
|
|
public object $objWarehouse; // container for the warehouse class object (storage object)
|
|
private array $migratedFields; // list of columns that will be migrated from SOURCE -> DEST
|
|
private array $omittedFields; // list of columns dropped in SOURCE during migration
|
|
private string $lastRecordWritten; // json-encoded string of the last record written
|
|
private int $droppedRecords = 0; // count of the number of records dropped during mapping
|
|
private int $recordLimit = 0; // XML config value = max num records fetched from SOURCE
|
|
private string $migrateStatus; // promoted objMigrate status value
|
|
private int $recordCount = 0; // counts the total number of records fetched from SOURCE
|
|
private array $nQuery; // container for the namaste query (array)
|
|
private string $query; // text container for query string sans skip-limit
|
|
private array $meta = []; // container for the broker event meta data
|
|
private gacErrorLogger $logger; // error logging object
|
|
private string $whereClause = ''; // wh where clause built during validation (1st pass)
|
|
private array $whSettings; // container for template warehouse array (settings)
|
|
private bool $remoteSource = false; // for WH, set to true if source is remote to Namaste\
|
|
private string $whSourceURI = ''; // for WH, the URI of the remote source resource if required
|
|
// I'm setting this to contain either the mongo or mysql configuration but, as of now, it's not in-use or accessed
|
|
private array $remoteSourceConfig; // container for the remote source configuration
|
|
private array $queryData; // container for remote date (query results)
|
|
private bool $noData = false; // indicator if a query successfully exec'd but ret'd no data
|
|
private string $eventType; // tells the current instantiation if WH or MIG request
|
|
private bool $isWebRequest = false; // determines if output is for CLI or HTML
|
|
|
|
// meta data payload for WH -> admin requests
|
|
private array $adminWHMetaData = [
|
|
META_CLIENT => CLIENT_SYSTEM,
|
|
META_TEMPLATE => TEMPLATE_CLASS_WAREHOUSE,
|
|
META_DONUT_FILTER => 1
|
|
];
|
|
|
|
|
|
// migration report output headers
|
|
private string $TestHeader =
|
|
'=================================================
|
|
= T E S T M O D E A C T I V E =
|
|
=================================================';
|
|
private string $TestFooter =
|
|
'=================================================
|
|
= Test Mode Means: =
|
|
= ---------------- =
|
|
= No data was written to the destination table. =
|
|
= =
|
|
= Record fetch from source was limited to the =
|
|
= number of records specified in the migration =
|
|
= configuration. =
|
|
= The data dump is provided so you can validate =
|
|
= data migration and schema changes. =
|
|
=================================================';
|
|
private string $TestFooterWH =
|
|
'=================================================
|
|
= Test Mode Means: =
|
|
= ---------------- =
|
|
= No data was deleted from the source table. =
|
|
= =
|
|
= Data was migrated to the destination WH table =
|
|
= successfully, but the original data has been =
|
|
= left un-touched because the request submitted =
|
|
= mandated test mode. =
|
|
=================================================';
|
|
private string $TestBody =
|
|
'=================================================
|
|
= TEST MODE RECORD DUMP =
|
|
=-----------------------------------------------=
|
|
= Check records to ensure that all data mapped =
|
|
= in the destination template was covered, and =
|
|
= that all data maintained data type. Validate =
|
|
= injected fields such as timestamps and GUIDs =
|
|
= that are required by Namaste. =
|
|
= NOTE: tokens are not generated in test mode =
|
|
=================================================';
|
|
private string $TestBodyWH =
|
|
'=================================================
|
|
= TEST MODE INFORMATION =
|
|
=-----------------------------------------------=
|
|
= Check the destination WH table to ensure that =
|
|
= the correct number of records reported were =
|
|
= copied to the destination table. =
|
|
= Validate that injected fields (timestamps, =
|
|
= GUIDS, etc.) were added. =
|
|
=================================================';
|
|
|
|
|
|
//
|
|
/**
|
|
* gacMigrations constructor.
|
|
*
|
|
* the class constructor requires the broker payload as an input parameter to the method. Within this array
|
|
* parameter, the following fields should be defined and, if they're not, then the request to instantiate will
|
|
* fail:
|
|
*
|
|
* 1. the source schema
|
|
* 2. the source table name
|
|
* 3. the destination schema
|
|
* 4. the destination table name
|
|
* 5. the name of the source status field, if it exists
|
|
* 6. the value of the deleted field that indicates the record has been deleted
|
|
* 7. partial validation as a boolean - if true, then records that partially pass validation, as opposed to
|
|
* passing ALL validation, are migrated
|
|
* 8. soft-deletes -- if set to true, then records that are soft-deleted will be migrated
|
|
* 9. migrationTestMode - a boolean that, if true, limits the number of records returned from the source
|
|
* collection and output is returned as a payload instead of being written to the database.
|
|
*
|
|
* The remaining parameters are provided/passed as part of the broker-data payload. If they are omitted, then
|
|
* the defaults are assumed. Parameters can be overridden either by the passed-parameter list or by adding
|
|
* the appropriate fields in the $_brokerData payload (param 1).
|
|
*
|
|
* About Partials:
|
|
* ---------------
|
|
* If the partials option is set to true (if defaults to false), then a record will be migrated if any of the
|
|
* fields are validated. Fields that fail validation may be dropped depending on the destination schema so it's
|
|
* NOT RECOMMENDED that you enable this option when importing into a mysql-schema... Data that is rejected
|
|
* as migration candidates will be returned in a list of primary keys to the client identifying them as records
|
|
* that failed to migrate along with a total-count of the number of records that failed to migrate.
|
|
*
|
|
* If the $_partials is not overridden (false), then any record that fails the migration requirement will cause the
|
|
* entire request to fail. Identifying information about the failed record will be returned to the calling client.
|
|
*
|
|
* About Deleted:
|
|
* --------------
|
|
* Submitting a request with this variable set to Boolean(true) requires that the source-status-file parameter
|
|
* be set, defining the column name in the source table for the status field AND the value of delete column that
|
|
* defines a deleted record. (In most cases, this should be a string.)
|
|
*
|
|
* In order to migrate soft-deleted records, the following conditions must align:
|
|
* a. the deleted parameter is set to Boolean(true)
|
|
* b. the source-table deleted-field name is defined and included in the payload
|
|
* c. the source-table deleted-field VALUE is defined and included in the payload
|
|
* If all of these conditions are met, and a record is found that have the specified value in the specified column
|
|
* while the deleted parameter is set to true, then the source soft-deleted record will be migrated.
|
|
*
|
|
* Validation:
|
|
* -----------
|
|
* The following checks are immediately made:
|
|
*
|
|
* 1. That the source and destination schema and tables exist and are valid
|
|
* 2. That the destination template exists
|
|
* 3. That the destination template can be instantiated
|
|
* 4. That the destination template contains a migration map
|
|
* 5. That we can connect to the source DB (defined in gasConfig)
|
|
* 6. That we can retrieve a sample/example record from the source
|
|
*
|
|
* If all this succeeds, then we'll continue processing. The call to migrate data from one schema to another is
|
|
* pretty much a one-shot deal in terms of class instantiation and data access. Any processing or validation
|
|
* failure will cause processing to immediately cease and return an appropriate error message.
|
|
*
|
|
* If we're writing data to a schema that supports transactions, then all of the data writes will be under a single
|
|
* transaction so that if a processing failure is raised, then all of the data will be backed out of the destination
|
|
* table.
|
|
*
|
|
* If the destination table does not support transactions, then we'll keep a list, internally, of all the new
|
|
* GUIDs (primary key identifies) that were created and, prior to returning control to the calling client, all
|
|
* of these "new" records will be hard-deleted from the collection.
|
|
*
|
|
* Return Payload:
|
|
* ---------------
|
|
* Data that we will return to the calling client will include:
|
|
*
|
|
* 1. a general success or failure message
|
|
* 2. if the request failed, we'll return an error stack showing the root cause of the failure
|
|
* 3. if the request succeeded, the following data report will generated and returned:
|
|
* 3a. the source schema information
|
|
* 3b. the destination schema information
|
|
* 3c. the number of records in the source table
|
|
* 3d. the number of records successfully moved to the new schema
|
|
* 3e. if partials are allowed, the number of partial records migrated
|
|
* 3f. if partials are allowed, a list of the partials migrated...OR...
|
|
* if partials are not allowed, a list of the partials not migrated
|
|
* 3g. if deleted is false, the number of records not migrated b/c deleted status
|
|
* 3h. if deleted is false, a list of the deleted records not migrated
|
|
* 3i. the number of records in the source table prior to the operation
|
|
* 3j. the number of records in the source table after the operation
|
|
* 3k. the coverage percent of individual records -- number of columns mapped / total number of columns
|
|
*
|
|
*
|
|
* @param array|null $_brokerData the data payload as received by the broker
|
|
* @param array|null $_meta the meta-data payload as received by the broker
|
|
* @param string $_rt the request type (either EVENT_MIGRATE or EVENT_WH_COOL|COLD|WARM|HOT
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 01-24-18 mks _INF-139: original coding
|
|
* 03-22-18 mks CORE-852: mysql destination coding
|
|
* 04-10-18 mks _INF-188: introduced warehousing (cool) functionality
|
|
* 10-04-18 mks DB-43: updated for HTML webApp (reporting in HTML)
|
|
*
|
|
*/
|
|
public function __construct(array $_brokerData = [], array $_meta = null, $_rt = EVENT_MIGRATE)
|
|
{
|
|
register_shutdown_function(array($this, '__destruct'));
|
|
|
|
// todo -- validate the meta-client field (make it required) CORE-858
|
|
// todo -- timer event, system event CORE-859
|
|
|
|
// set-up class member variables
|
|
$this->errorStack = [];
|
|
$this->state = STATE_DATA_ERROR;
|
|
$this->status = false;
|
|
if (!isset(gasConfig::$settings[CONFIG_MIGRATION]) or (empty(gasConfig::$settings[CONFIG_MIGRATION]))) {
|
|
$this->errorStack[] = ERROR_CONFIG_RESOURCE_404 . STRING_MIGRATION_CONFIG;
|
|
return;
|
|
}
|
|
if (isset($_meta[BROKER_XML_DATA]) and is_array($_meta[BROKER_XML_DATA])) {
|
|
$this->config = $_meta[BROKER_XML_DATA];
|
|
$this->isWebRequest = true;
|
|
} else {
|
|
$this->config = gasConfig::$settings[CONFIG_MIGRATION];
|
|
}
|
|
try {
|
|
switch ($_rt) {
|
|
case EVENT_MIGRATE :
|
|
// validate payload data that's been pulled into class vars - return control to calling client if false value
|
|
if (!$this->validateMigrationData($_brokerData)) return;
|
|
$this->brokerData = $_brokerData;
|
|
$_meta[META_TEMPLATE] = $this->brokerData[MIGRATION_DEST_TABLE];
|
|
$this->meta = $_meta;
|
|
$this->eventType = STRING_MIGRATION;
|
|
$this->doMigration();
|
|
break;
|
|
case EVENT_WAREHOUSE :
|
|
$this->brokerData = $_brokerData;
|
|
$this->meta = $_meta;
|
|
$this->logger = new gacErrorLogger($this->meta[META_EVENT_GUID]);
|
|
$this->eventType = STRING_WAREHOUSING;
|
|
$this->startWHRequest();
|
|
break;
|
|
default :
|
|
$msg = ERROR_EVENT_404 . $_rt;
|
|
$this->errorStack[] = $msg;
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
break;
|
|
}
|
|
} catch (Throwable $t) {
|
|
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->error($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* whData() -- public method
|
|
*
|
|
* This method is invoked from the whBroker event once validation has successfully completed and we've returned
|
|
* the wh-guid back to the calling client.
|
|
*
|
|
* This method does a quick check to confirm that critical data hasn't been dropped and is still present. Once
|
|
* that check has been passed, we invoke a private method (moveWHData()) to perform the actual data move/copy
|
|
* request.
|
|
*
|
|
* the method requires no input parameters and returns a boolean value to indicate processing success.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 04-26-18 mks _INF-188: original coding
|
|
*
|
|
*/
|
|
public function whData(): bool
|
|
{
|
|
$error = null;
|
|
|
|
// ensure we still have all the generated data
|
|
if (!isset($this->widget) or empty($this->widget))
|
|
$error[] = ERROR_WH_MISSING_WIDGET;
|
|
if (!isset($this->objWarehouseMeta) or empty($this->objWarehouseMeta))
|
|
$error[] = ERROR_WH_MISSING_WH_OBJ;
|
|
if (!isset($this->whSettings) or empty($this->whSettings))
|
|
$error[] = ERROR_WH_MISSING_SETTINGS;
|
|
if (!isset($this->brokerData) or empty($this->brokerData))
|
|
$error[] = ERROR_WH_MISSING_BROKER_DATA;
|
|
if (!isset($this->meta) or empty($this->meta))
|
|
$error[] = ERROR_WH_MISSING_META_DATA;
|
|
if (!isset($this->whereClause) or empty($this->whereClause))
|
|
$error[] = ERROR_WH_MISSING_WHERE_CLAUSE;
|
|
|
|
if (!is_null($error)) {
|
|
$this->errorStack = array_merge($this->errorStack, $error);
|
|
$this->state = STATE_DATA_ERROR;
|
|
$this->status = false;
|
|
return false;
|
|
}
|
|
|
|
// step 1: get the count of the number of records to be moved -- this becomes our LCV
|
|
if (!$this->getWHRecCount()) return false;
|
|
// return immediately if no records in the source satisfy the WH query
|
|
if ($this->recordCount == 0) {
|
|
$this->errorStack[] = INFO_WH_NO_QUALIFIED_DATA;
|
|
$this->state = STATE_NOT_FOUND;
|
|
$this->status = false;
|
|
return false;
|
|
}
|
|
// NOTE: the count of the number of records to wh is stored in $recordCount
|
|
|
|
// step 2: instantiate the WH data class object
|
|
$whMD = $this->meta;
|
|
$whMD[META_TEMPLATE] = TEMPLATE_CLASS_WHC1_PROD_REG;
|
|
$whMD[META_CLIENT] = CLIENT_SYSTEM;
|
|
|
|
$whErrors = [];
|
|
try {
|
|
$tmpObj = new gacFactory($whMD, FACTORY_EVENT_NEW_CLASS, '', $whErrors);
|
|
} catch (Throwable $t) {
|
|
$this->errorStack[] = ERROR_THROWABLE_EXCEPTION;
|
|
$this->errorStack[] = $t->getMessage();
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($t->getMessage());
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $t->getMessage());
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
$this->status = false;
|
|
return false;
|
|
}
|
|
if (!$tmpObj->status) {
|
|
$error = ERROR_TEMPLATE_INSTANTIATE . TEMPLATE_CLASS_WHC1_PROD_REG;
|
|
$this->errorStack[] = $error;
|
|
if (is_array($whErrors)) $this->errorStack = array_merge($this->errorStack, $whErrors);
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($error);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
$this->state = STATE_CLASS_INSTANTIATION_ERROR;
|
|
$this->status = false;
|
|
return false;
|
|
}
|
|
$this->objWarehouse = $tmpObj->widget;
|
|
// set warehouse specific variables for create request
|
|
$this->objWarehouse->isWHRequest = true; // exceed record limit in broker payloads
|
|
$this->objWarehouse->whType = $this->brokerData[WH_TYPE]; // set the wh-type for resource management
|
|
|
|
$this->migrationMap = ($this->remoteSource) ? $this->objWarehouse->template->migrationMap : $this->widget->migrationConfig[STRING_MIGRATION_MAP];
|
|
|
|
if (is_object($tmpObj)) $tmpObj->__destruct();
|
|
unset($tmpObj, $whMD, $whErrors);
|
|
|
|
try {
|
|
// step 3: perform the fetch-put part of the warehousing request
|
|
if (!$this->processWHData()) return false;
|
|
|
|
// step 4: if not in test mode and source is not remote, then delete the source records
|
|
if (!isset($this->brokerData[MWH_TEST_MODE]) and !$this->remoteSource)
|
|
if (!$this->deleteWHSourceData()) return false;
|
|
|
|
// step 5: generate the WH report and save the meta data
|
|
$this->generateEventReport();
|
|
} catch (Throwable $t) {
|
|
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
/**
|
|
* processWHData() -- private method
|
|
*
|
|
* This method is called by the whData() method as one of a series of calls to process a WH request.
|
|
*
|
|
* There are no input parameters to the method because the relevent data has already been stored within the
|
|
* class members:
|
|
*
|
|
* $this->brokerData :: contains the original payload request
|
|
* $this->config :: pulled from the xml section: <migration> which should only be defined in the env.xml file
|
|
* $this->meta :: copy of the broker meta data
|
|
* $this->nQuery :: the namaste (array) query built during validation
|
|
* $this->objWarehouse :: instantiation of the admin WH class object
|
|
* $this->recordCount :: count of the number of records to be warehoused (LCV)
|
|
* $this->remoteAvailable :: boolean telling me if the remote resource is available
|
|
* $this->resRemote :: defines the API resource to the remote schema, if source is remote
|
|
* $this->schema :: defines the source schema
|
|
* $this->whereClause :: string containing the query where-clause in dest-schema form
|
|
* $this->whSettings :: destination class template settings for warehousing
|
|
* $this->whSourceURI :: string containing the (FQ) warehouse URI
|
|
* $this->widget :: instantiation of the destination class object
|
|
* $this->widget->whTemplate :: name of the destination (WH) template
|
|
*
|
|
* The method contains the control loop (number of total records to warehouse / recordsFetchedPerRequest)
|
|
* and will iterate until the count of the denominator exceeds the numerator.
|
|
*
|
|
* This method builds the fetch (source data) query remotely for mySQL or mongo repos, and calls a method
|
|
* which, in turn, queries the remote repos directly via the URI set-up in the XML config. If the data to
|
|
* be warehoused is Namaste sourced, then we'll build a query/fetch request and submit the request to the
|
|
* appServer (Namaste) service.
|
|
*
|
|
* NOTE:
|
|
* -----
|
|
* If the data is remote sourced, the destination template must contain a migration map!
|
|
*
|
|
* Once we've fetched the source data, (and, if the source is remote to namaste, then we've mapped the
|
|
* remote-source data to the namaste WH data), we batch-save all of the records retrieved.
|
|
*
|
|
* Next, we update the meta-data record by incrementing the relevant counters, setting various status fields,
|
|
* and then submitting a broker request to the admin service to update the WH meta record. (This is done so
|
|
* that, in the event of a processing failure, we can resume the wh process.)
|
|
*
|
|
* If the process completes successfully, then we close the wh meta record.
|
|
*
|
|
* The method returns a boolean value which indicates success (all of the WH data was successfully processed)
|
|
* or a false if any failure was encountered. The class maintains an error history structure which will
|
|
* eventually be returned to the calling client to assist with diagnostics.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @return bool
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 05-16-18 mks _INF-188: original coding
|
|
*
|
|
*/
|
|
private function processWHData(): bool
|
|
{
|
|
$numRecsProcessed = 0;
|
|
$rad = 0;
|
|
$numRecsSkipped = 0;
|
|
$whRecordLimit = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_WH_RECS_PER_XFER];
|
|
|
|
try {
|
|
// ensure that we set the meta-override to the value of the record limit fetch (defined in XML)
|
|
if (!$this->widget->addMeta(META_LIMIT_OVERRIDE, $whRecordLimit)) {
|
|
$error = ERROR_DATA_META_REJECTED . META_LIMIT_OVERRIDE;
|
|
$this->errorStack[] = $error;
|
|
if ($this->widget->debug) $this->logger->debug($error);
|
|
$this->state = STATE_META_ERROR;
|
|
return false;
|
|
}
|
|
} catch (TypeError $t) {
|
|
$msg = ERROR_TYPE_EXCEPTION . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->error($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
// set warehouse specific variables for create request
|
|
$this->widget->isWHRequest = true; // exceed record limit in broker payloads
|
|
$this->widget->whType = $this->brokerData[WH_TYPE]; // set the wh-type for resource management
|
|
|
|
// main control loop for fetching data from the source
|
|
while ($numRecsProcessed < $this->recordCount) {
|
|
if ($this->remoteSource) {
|
|
// source data is remote - data can be directly accessed
|
|
if ($this->schema == CONFIG_SCHEMA_MYSQL) {
|
|
$query = 'SELECT /* ' . basename(__FILE__) . COLON . __LINE__ . ' */ *';
|
|
$query .= ' FROM ' . $this->brokerData[WH_REMOTE_TABLE];
|
|
$query .= ' WHERE ' . $this->whereClause;
|
|
$query .= ' LIMIT ' . $numRecsSkipped . ', ' . $whRecordLimit;
|
|
try {
|
|
if (!$this->issueRemoteSQLQuery($query)) {
|
|
// remote query failed to execute
|
|
$this->errorStack[] = ERROR_REMOTE_QUERY_FAIL;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn(ERROR_REMOTE_QUERY_FAIL);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, ERROR_REMOTE_QUERY_FAIL);
|
|
$this->state = STATE_DB_ERROR;
|
|
return false;
|
|
}
|
|
} catch (TypeError $t) {
|
|
$error = ERROR_TYPE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $error;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($error);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
} elseif ($this->schema == CONFIG_SCHEMA_MONGO) {
|
|
// todo -- write this stub
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->error(INFO_SHOULD_NOT_SEE_THIS);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, INFO_SHOULD_NOT_SEE_THIS);
|
|
}
|
|
}
|
|
// we have data - but it needs to be processed through the migration map
|
|
$tData = $this->doDataMap($this->queryData);
|
|
if (!is_null($tData)) {
|
|
$this->queryData = $tData;
|
|
$this->recordCount = count($this->queryData);
|
|
} else {
|
|
$this->errorStack[] = ERROR_MIGRATION_MAPPING_FAILED;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn(ERROR_MIGRATION_MAPPING_FAILED);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, ERROR_MIGRATION_MAPPING_FAILED);
|
|
$this->state = STATE_DATA_ERROR;
|
|
return false;
|
|
}
|
|
} else {
|
|
$bc = null;
|
|
try {
|
|
// source data must be requested via namaste
|
|
$bc = new gacBrokerClient(BROKER_QUEUE_R, __METHOD__ . AT . __LINE__);
|
|
if (!$bc->status) {
|
|
$error = ERROR_BROKER_CLIENT_DECLARE . BROKER_QUEUE_R;
|
|
$this->errorStack[] = $error;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($error);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
$this->state = STATE_CLASS_INSTANTIATION_ERROR;
|
|
if (is_object($bc)) {
|
|
$bc->__destruct();
|
|
unset($bc);
|
|
}
|
|
return false;
|
|
}
|
|
// build the request to the appServer broker
|
|
$request = [
|
|
BROKER_REQUEST => BROKER_REQUEST_FETCH,
|
|
BROKER_DATA => [
|
|
STRING_QUERY_DATA => $this->nQuery
|
|
],
|
|
BROKER_META_DATA => [
|
|
META_CLIENT => CLIENT_SYSTEM,
|
|
META_DONUT_FILTER => 1,
|
|
META_EVENT_GUID => $this->meta[META_EVENT_GUID],
|
|
META_TEMPLATE => $this->meta[META_TEMPLATE],
|
|
META_SKIP => $numRecsSkipped,
|
|
META_LIMIT_OVERRIDE => $whRecordLimit,
|
|
META_LIMIT => $whRecordLimit
|
|
]
|
|
];
|
|
$payload = gzcompress(json_encode($request));
|
|
|
|
$response = json_decode(gzuncompress($bc->call($payload)), true);
|
|
if (!$response[PAYLOAD_STATUS]) {
|
|
$error = sprintf(ERROR_BROKER_INTERNAL_CALL, BROKER_REQUEST_FETCH, BROKER_QUEUE_R);
|
|
$this->errorStack = array_merge($this->errorStack, $response[PAYLOAD_DIAGNOSTICS]);
|
|
$this->errorStack[] = $error;
|
|
if ($this->widget->debug) $this->logger->debug($error);
|
|
$this->state = $response[PAYLOAD_STATE];
|
|
if (is_object($bc)) {
|
|
$bc->__destruct();
|
|
unset($bc);
|
|
}
|
|
return false;
|
|
}
|
|
$this->queryData = $response[PAYLOAD_RESULTS][STRING_QUERY_RESULTS];
|
|
$this->recordCount = count($this->queryData);
|
|
} catch (Throwable $t) {
|
|
$error = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $error;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($error);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
if (is_object($bc)) {
|
|
$bc->__destruct();
|
|
unset($bc);
|
|
}
|
|
return false;
|
|
}
|
|
if (is_object($bc)) {
|
|
$bc->__destruct();
|
|
unset($bc);
|
|
}
|
|
}
|
|
|
|
try {
|
|
// invoke the widget's class create method to write the records (locally) to the db
|
|
$this->objWarehouse->_createRecord($this->queryData, DATA_WHSE);
|
|
} catch (Throwable $t) {
|
|
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
if ($this->objWarehouse->status) {
|
|
// create succeeded -- update the meta record and record
|
|
$rowsInserted = $this->objWarehouse->recordsInserted;
|
|
$rowsDropped = $this->objWarehouse->recordsDropped;
|
|
// check the record count
|
|
if ($rowsDropped != ($this->recordCount - $rowsInserted)) {
|
|
$this->errorStack[] = ERROR_DATA_WH_INSERT_FAIL;
|
|
if ($this->widget->debug) $this->logger->debug(ERROR_DATA_WH_INSERT_FAIL);
|
|
$error = sprintf(ERROR_DATA_RECORD_COUNT, ($rowsDropped + $rowsInserted), $this->recordCount);
|
|
$this->errorStack[] = $error;
|
|
if ($this->widget->debug) $this->logger->debug($error);
|
|
$this->state = STATE_DATA_ERROR;
|
|
return false;
|
|
}
|
|
|
|
// inject WH'ing results the meta-data (WH) payload
|
|
try {
|
|
$rip = $this->objWarehouseMeta->getColumn(MWH_NUM_RECS_MOVED); // records in progress
|
|
if (is_null($rip)) $rip = 0;
|
|
$rip += $rowsInserted;
|
|
$rad = $this->objWarehouseMeta->getColumn(MWH_NUM_RECS_DROPPED); // records already dropped
|
|
if (is_null($rad)) $rad = 0;
|
|
$rad += $rowsDropped;
|
|
// create/update the wh meta record and the lcv
|
|
if (is_null($this->objWarehouseMeta->getColumn(DB_TOKEN))) {
|
|
// create the new wh meta record
|
|
if (!$this->updateWHMeta()) return false;
|
|
}
|
|
// we need to do an update payload for the wh-meta record update
|
|
$warehouseUpdateData = [
|
|
MWH_NUM_RECS_MOVED => $rip,
|
|
MWH_NUM_RECS_DROPPED => $rad,
|
|
MWH_LAST_REC_WRITTEN => json_encode($this->objWarehouse->getRecord($this->objWarehouse->count, false)),
|
|
DB_ACCESSED => time()
|
|
];
|
|
if (!$this->updateWHMeta($warehouseUpdateData)) return false;
|
|
$numRecsProcessed += $rip;
|
|
} catch (Throwable $t) {
|
|
$error = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $error;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($error);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
} else {
|
|
try {
|
|
// the create failed - stop processing and save the meta WH record
|
|
$this->objWarehouseMeta->insertField(DB_STATUS, STATUS_FAILED);
|
|
$this->objWarehouseMeta->insertField(DB_ACCESSED, time());
|
|
$this->objWarehouseMeta->insertField(MWH_STOP_REASON, json_encode($this->errorStack));
|
|
@$this->updateWHMeta();
|
|
} catch (Throwable $t) {
|
|
$error = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $error;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($error);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
$this->state = $this->objWarehouse->state;
|
|
$this->errorStack = array_merge($this->errorStack, $this->objWarehouse->eventMessages);
|
|
return false;
|
|
}
|
|
}
|
|
// close the wh meta record
|
|
try {
|
|
$warehouseUpdateData = [
|
|
DB_STATUS => STATUS_COMPLETE,
|
|
DB_ACCESSED => time()
|
|
];
|
|
@$this->updateWHMeta($warehouseUpdateData);
|
|
} catch (Throwable $t) {
|
|
$error = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $error;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($error);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
$this->recordCount = $numRecsProcessed;
|
|
$this->droppedRecords = $rad;
|
|
return true;
|
|
}
|
|
|
|
|
|
/**
|
|
* getWHRecCount() -- private method
|
|
*
|
|
* this method is used to get the record count for the WH source query... in other words, we generate a count
|
|
* of the number of records that will be warehoused and assign that value to the member variable: $recordCount.
|
|
*
|
|
* The method requires no input parameters and returns a Boolean to indicate success or failure in processing.
|
|
*
|
|
* The main loop checks for the source schema and then checks to see if the source is external to Namaste. The
|
|
* query is build accordingly and submitted to the db classes for resolution.
|
|
*
|
|
* Any errors or exceptions-raised will cause a false value to be returned to the calling client.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 04-27-18 mks _INF-188: original coding
|
|
* 04-30-18 mks _INF-188: revised fetch sections for data local to namaste
|
|
*
|
|
*/
|
|
private function getWHRecCount(): bool
|
|
{
|
|
if (!$this->remoteSource) {
|
|
// this is a request internal to namaste -- WH methods exec on Segundo -- need to publish
|
|
// a broker event request to appServer (namaste) for the record count request.
|
|
$lTime = $this->brokerData[WH_FILTER_VALUES][0] . ' 00:00:00';
|
|
$rTime = $this->brokerData[WH_FILTER_VALUES][1] . ' 00:00:00';
|
|
|
|
// if the source data is mongo, convert to epoch time
|
|
$lTime = ($this->schema == STRING_MONGO) ? unPrettyTime($lTime) : $lTime;
|
|
$rTime = ($this->schema == STRING_MONGO) ? unPrettyTime($rTime) : $rTime;
|
|
|
|
// build the payload to fetch the query count for the WH query
|
|
$request = [
|
|
BROKER_REQUEST => BROKER_REQUEST_QUERY_COUNT,
|
|
BROKER_DATA => [
|
|
STRING_QUERY_DATA => [ DB_CREATED => [OPERAND_AND => [ OPERATOR_GT => [$lTime], OPERATOR_LT => [$rTime]]]]
|
|
],
|
|
BROKER_META_DATA => [
|
|
META_TEMPLATE => $this->meta[META_TEMPLATE],
|
|
META_CLIENT => CLIENT_SYSTEM,
|
|
META_EVENT_GUID => $this->meta[META_EVENT_GUID]
|
|
]
|
|
];
|
|
|
|
try {
|
|
// create a broker-request client to the namaste read-broker
|
|
$bc = new gacBrokerClient(BROKER_QUEUE_R, basename(__FILE__) . COLON_NS . __LINE__);
|
|
} catch (Throwable $t) {
|
|
$msg = ERROR_THROWABLE_EXCEPTION . $t->getMessage();
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
if (!$bc->status) {
|
|
$error = ERROR_BROKER_CLIENT_DECLARE . BROKER_QUEUE_R;
|
|
$this->errorStack[] = $error;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($error);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
$this->state = STATE_CLASS_INSTANTIATION_ERROR;
|
|
return false;
|
|
}
|
|
|
|
// submit the request to the namaste read broker and process the return
|
|
$payload = gzcompress(json_encode($request));
|
|
$response = json_decode(gzuncompress($bc->call($payload)), true);
|
|
if (!$response[PAYLOAD_STATUS]) {
|
|
$this->errorStack = array_merge($this->errorStack, $response[PAYLOAD_DIAGNOSTICS]);
|
|
$this->state = $response[PAYLOAD_STATE];
|
|
return false;
|
|
}
|
|
try {
|
|
if (!$this->objWarehouseMeta->insertField(MWH_NUM_RECS_IN_QUERY, intval($response[PAYLOAD_RESULTS][0]))) {
|
|
$error = sprintf(ERROR_DATA_ADD_FAIL, MWH_NUM_RECS_SOURCE) . $this->objWarehouseMeta->class;
|
|
$this->errorStack[] = $error;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($error);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
$this->state = STATE_DATA_ERROR;
|
|
return false;
|
|
}
|
|
if (!$this->objWarehouseMeta->insertField(MWH_NUM_RECS_SOURCE, intval($response[PAYLOAD_RESULTS][1]))) {
|
|
$error = sprintf(ERROR_DATA_ADD_FAIL, MWH_NUM_RECS_IN_QUERY) . $this->objWarehouseMeta->class;
|
|
$this->errorStack[] = $error;
|
|
if ($this->objWarehouseMeta->debug) $this->objWarehouseMeta->logger->debug($error);
|
|
$this->state = STATE_DATA_ERROR;
|
|
return false;
|
|
}
|
|
} catch (TypeError $t) {
|
|
$msg = ERROR_TYPE_EXCEPTION . AT . basename(__FILE__) . COLON_NS . __LINE__;
|
|
$this->errorStack[] = $msg;
|
|
$this->errorStack[] = $t->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
$this->logger->warn($t->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
consoleLog($this->res, CON_ERROR, $t->getMessage());
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
$this->recordCount = $response[PAYLOAD_RESULTS][0];
|
|
$this->recordsInSourceTable = $response[PAYLOAD_RESULTS][1];
|
|
if (is_object($bc)) $bc->__destruct();
|
|
unset($bc);
|
|
} else {
|
|
if ($this->schema == STRING_MYSQL) {
|
|
if ($this->remoteSource) {
|
|
if (!isset($this->resRemote)) {
|
|
$error = sprintf(ERROR_REMOTE_RESOURCE_404, STRING_MYSQL);
|
|
$this->errorStack[] = $error;
|
|
if ($this->widget->debug) $this->logger->debug($error);
|
|
$this->state = STATE_DB_ERROR;
|
|
return false;
|
|
}
|
|
// if the source is remote (external to namaste) we can directly query
|
|
$query = 'SELECT /* ' . basename(__FILE__) . COLON . __LINE__ . ' */ COUNT(*) AS ' . STRING_NUM_RECS;
|
|
$query .= ' FROM ' . $this->brokerData[WH_REMOTE_TABLE];
|
|
$query .= ' WHERE ' . $this->whereClause;
|
|
$query2 = 'SELECT /* ' . __METHOD__ . AT . __LINE__ . ' */ count(*) AS ' . STRING_NUM_RECS . ' FROM ' . $this->brokerData[WH_REMOTE_TABLE];
|
|
$this->recordsInSourceTable = 0;
|
|
$this->recordCount = 0;
|
|
try {
|
|
if (!$this->issueRemoteSQLQuery($query, true, $this->recordCount)) {
|
|
$this->errorStack[] = ERROR_REMOTE_QUERY_FAIL;
|
|
$this->errorStack[] = STRING_QUERY . COLON . $query;
|
|
if ($this->widget->debug) {
|
|
$this->logger->debug(ERROR_REMOTE_QUERY_FAIL);
|
|
$this->logger->debug(STRING_QUERY . COLON . $query);
|
|
}
|
|
$this->state = STATE_DB_ERROR;
|
|
return false;
|
|
}
|
|
if (!$this->issueRemoteSQLQuery($query2, true, $this->recordsInSourceTable)) {
|
|
$this->errorStack[] = ERROR_REMOTE_QUERY_FAIL;
|
|
$this->errorStack[] = STRING_QUERY . COLON . $query2;
|
|
if ($this->widget->debug) {
|
|
$this->logger->debug(ERROR_REMOTE_QUERY_FAIL);
|
|
$this->logger->debug(STRING_QUERY . COLON . $query2);
|
|
}
|
|
$this->state = STATE_DB_ERROR;
|
|
return false;
|
|
}
|
|
} catch (Throwable $t) {
|
|
$msg = ERROR_TYPE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
}
|
|
} elseif ($this->schema == STRING_MONGO) { // todo -- this needs to be re-written to support $resRemote
|
|
try {
|
|
$this->recordCount = $this->widget->getQueryCount($this->widget->strQuery, (($this->remoteSource) ? $this->brokerData[WH_REMOTE_TABLE] : ''));
|
|
} catch (TypeError $t) {
|
|
$msg = ERROR_TYPE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
if ($this->recordCount == -1) {
|
|
$this->errorStack[] = ERROR_MDB_FETCH_COUNT_FAIL;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn(ERROR_MDB_FETCH_COUNT_FAIL);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, ERROR_MDB_FETCH_COUNT_FAIL);
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
/**
|
|
* migrateData() -- private method
|
|
*
|
|
* This method handles the heavy lifting of pulling data from the source database, converting the data to the
|
|
* Namaste schema, and then writing the modified records out to the destination database.
|
|
*
|
|
* The method requires no input parameters and returns a Boolean value indicating success or failure in the
|
|
* overall migration.
|
|
*
|
|
* TEST MODE:
|
|
* ----------
|
|
* Test mode is supported in this function - if enabled in the meta payload, we'll do the following:
|
|
* -- limit the query to 10 records
|
|
* -- do not write output to the destination database
|
|
* -- do not close (mark as completed) the migration record
|
|
*
|
|
* Programmer Notes:
|
|
* -----------------
|
|
* The method, as written, is fairly limited in that it only accommodates data from a mySQL source to a mongoDB
|
|
* destination. The destination class is masked by virtue of it being a Namaste class but we're clearly
|
|
* handling it as a mongoDB object when we're fetching the results of the bulk-write. (Hint - overload this
|
|
* method name in the mySQL class.)
|
|
*
|
|
* The main processing happens in a simple loop where the number of processed records is compared to the number
|
|
* of records in the source table. The concern is that, for large tables, running out of memory during a
|
|
* sustained operation.
|
|
*
|
|
* As such, we're using the current (Migrations) class to post events to the admin service that keep track of the
|
|
* current migration in terms of the number of records processed. In case of an event crash, then we can resume
|
|
* processing because the migration class will look for the migration record on spin-up.
|
|
*
|
|
* Some considerations for the source query:
|
|
* -- evaluate the MIGRATION_TEST_MODE ($this->testMode) setting and limit the number of records from source
|
|
* to ten if we're in test-mode.
|
|
* -- we have the option for over-riding the record limit and will do so if the record limit value exceeds the
|
|
* Namaste record limit. Both are set in the framework XML.
|
|
* -- source query is ordered by sorting the data set based on the sort key defined in the migration template
|
|
* for the new class
|
|
*
|
|
* Once we've fetched the (first) data-set from source, we're going to map the source data to the destination
|
|
* schema using the migration map declared in the destination template. We'll also inject meta data such as the
|
|
* record GUID, created/accessed timestamps, etc.
|
|
*
|
|
* After we've converted the source data, we'll execute a bulk-write to the destination table. On successful
|
|
* return, update the migration record with the new processed count. If possible, also record the count of any
|
|
* records that may have been dropped due to processing or validation.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 02-15-18 mks INF-139: original code completed
|
|
* 03-23-18 mks CORE-852: adding mysql destination support
|
|
* 03-27-18 mks CORE-852: added the PDO processing for the migration report
|
|
* 09-01-18 mks DB-48: updated for new _createRecord() parameter that identifies data source
|
|
* 10-04-18 mks DB-43: support for the new webUI migration app: invoke the html migration report instead
|
|
*/
|
|
private function migrateData(): bool
|
|
{
|
|
// everything we need is already defined as a class member, except for the actual data to be moved
|
|
|
|
// pull data FROM mysql
|
|
if ($this->schema == STRING_MYSQL) {
|
|
$mappedResults = null;
|
|
if ($this->destinationSchema == STRING_MONGO) {
|
|
/** @var gacMongoDB $widget */
|
|
$widget = $this->widget;
|
|
} else {
|
|
/** @var gacPDO $widget */
|
|
$widget = $this->widget;
|
|
$widget->setMigrationBit(true);
|
|
}
|
|
$widget->setRecordLimit($this->recordLimit);
|
|
// build the query
|
|
$this->query = 'SELECT /* ' . basename(__FILE__) . COLON_NS . __LINE__ . ' */ * ';
|
|
$this->query .= 'FROM ' . $this->brokerData[MIGRATION_SOURCE_TABLE]. ' ';
|
|
// add a filter for soft-deleted messages
|
|
if (isset($this->brokerData[MIGRATION_FILTER_SOFT_DELETES]) and true === boolval($this->brokerData[MIGRATION_FILTER_SOFT_DELETES])) {
|
|
$statusKey = key($this->migrationStatusKV);
|
|
$statusValue = $this->migrationStatusKV[$statusKey];
|
|
$this->query .= 'WHERE ' . $statusKey . ' ';
|
|
if (is_numeric($statusValue)) {
|
|
$this->query .= '!= ' . $statusValue;
|
|
} else {
|
|
$this->query .= '<> "' . $statusValue . '"';
|
|
}
|
|
}
|
|
// add the sort discriminant
|
|
$this->query .= 'ORDER BY ' . $this->migrationSortKey . ' ASC ';
|
|
|
|
// start the main processing loop which is based on current vs. total record count in the source table
|
|
while ($this->recordCount < $this->recordsInSourceTable) {
|
|
// add the limit -- check if this is test mode, if so: use the test-mode record limit defined in XML
|
|
$limitString = 'LIMIT ' . $this->recordCount . ',';
|
|
$limitString .= ($this->testMode) ? $this->config[CONFIG_MIGRATION_NUM_TEST_RECS] : $this->recordLimit;
|
|
$results = null;
|
|
$query = $this->query . $limitString;
|
|
// populate migration data during first pass of processing
|
|
try {
|
|
if (!isset($this->data[MWH_QUERY])) {
|
|
if (!$this->objMigrate->insertField(MWH_QUERY, $query, 0)) {
|
|
// failed to add query to migrations payload
|
|
$msg = sprintf(ERROR_DATA_ADD_FAIL, STRING_QUERY, $this->objMigrate->class);
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->error($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
}
|
|
// exec the query
|
|
foreach ($this->resRemote->query($query) as $row) {
|
|
$results[] = $row;
|
|
}
|
|
$currentCount = count($results);
|
|
$this->recordCount += $currentCount;
|
|
// $this->skip += ($currentCount < $this->recordLimit) ? $currentCount : $this->recordLimit;
|
|
} catch (PDOException | Throwable $p) {
|
|
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_PDO_EXCEPTION . $this->query);
|
|
$this->errorStack[] = $msg;
|
|
$this->errorStack[] = $p->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
$this->logger->warn($p->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
consoleLog($this->res, CON_ERROR, $p->getMessage());
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false; // return control to calling client if the query failed
|
|
}
|
|
|
|
if (is_null($results)) {
|
|
// should not get to this point!!!
|
|
$this->state = STATE_RESOURCE_ERROR_PDO;
|
|
consoleLog($this->res, CON_SYSTEM, INFO_SHOULD_NOT_SEE_THIS . COLON . $query);
|
|
return false;
|
|
}
|
|
|
|
try {
|
|
// we have source data - next step is to map it via the migrationMap
|
|
$mappedResults = $this->doDataMap($results);
|
|
if (count($mappedResults) != $currentCount) {
|
|
$this->droppedRecords += $currentCount - count($mappedResults);
|
|
if (!$this->objMigrate->insertField(MWH_NUM_RECS_DROPPED, $this->droppedRecords, 0))
|
|
consoleLog($this->res, CON_ERROR, sprintf(ERROR_DATA_ADD_FAIL, MWH_NUM_RECS_DROPPED, $this->objMigrate->class));
|
|
}
|
|
|
|
// at this point, we've successfully mapped the data... if we're not in test mode, write the data
|
|
if ($this->testMode) {
|
|
// save the test records and break out of the loop processing
|
|
$widget->addData($mappedResults);
|
|
break;
|
|
} else {
|
|
$widget->_createRecord($mappedResults, DATA_MIGR);
|
|
if (!$widget->status) {
|
|
// todo -- pull data from the bulk-write object and compare count of records v. records written
|
|
$this->errorStack[] = ERROR_NOSQL_BC;
|
|
return false;
|
|
}
|
|
$widget->removeData(); // widget data successfully migrated (this batch) so reset to null
|
|
|
|
// branch processing here with functions depending on destination schema (CORE-852)
|
|
switch ($this->destinationSchema) {
|
|
case STRING_MONGO :
|
|
if (!$this->processMongoResults($widget, $mappedResults)) return false;
|
|
break;
|
|
|
|
case STRING_MYSQL :
|
|
if (!$this->processPDOResults($widget, $mappedResults)) return false;
|
|
$widget->recordsDropped = 0;
|
|
$widget->recordsInserted = 0;
|
|
break;
|
|
|
|
default:
|
|
// schema was pre-validated -- should never exec this code
|
|
$msg = ERROR_SCHEMA_NOT_SUPPORTED . COLON . $this->destinationSchema;
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->errorStack[] = $msg;
|
|
return false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// update/save the current migration record if the recordsMigrated < recordsInSource
|
|
if ($this->objMigrate->getColumn(MWH_NUM_RECS_MOVED) <= $this->objMigrate->getColumn(MWH_NUM_RECS_SOURCE)) {
|
|
if (!$this->updateMigrationMetaData()) return false;
|
|
}
|
|
} catch (Throwable $t) {
|
|
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
|
|
$this->errorStack[] = $msg;
|
|
$this->errorStack[] = $t->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
$this->logger->warn($t->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
consoleLog($this->res, CON_ERROR, $t->getMessage());
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false; // return control to calling client if the query failed
|
|
}
|
|
|
|
// console log progress-status message
|
|
$msg = sprintf(INFO_MIGRATION_RECORDS_MOVED,
|
|
($this->brokerData[MWH_SOURCE_SCHEMA] . COLON . $this->brokerData[MWH_SOURCE_TABLE]),
|
|
($this->brokerData[MWH_DEST_SCHEMA] . COLON . $this->brokerData[MWH_DEST_TABLE]),
|
|
$this->objMigrate->getColumn(MWH_NUM_RECS_MOVED), $this->recordsInSourceTable);
|
|
consoleLog($this->res, CON_SYSTEM, $msg);
|
|
} // end WHILE loop for processing migration records
|
|
|
|
// generate the migration report which is stored as a migration class member based on if the
|
|
// migration request originated from the migration webApp or from the cli
|
|
if (!$this->isWebRequest)
|
|
$this->generateEventReport();
|
|
else
|
|
$this->generateHTMLEventReport();
|
|
|
|
if (empty($this->migrationReport)) {
|
|
$this->errorStack[] = ERROR_MIGRATION_REPORT;
|
|
$this->migrationReport = ERROR_MIGRATION_REPORT;
|
|
$this->objMigrate[MWH_REPORT] = ERROR_MIGRATION_REPORT;
|
|
}
|
|
|
|
// update the status of $this->objMigrate data to completed, dateCompleted to now(), if not in test mode
|
|
if (!$this->testMode) {
|
|
try {
|
|
if (!$this->updateMigrationMetaData(STATUS_COMPLETE)) return false;
|
|
} catch (TypeError $t) {
|
|
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_TYPE_EXCEPTION . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false; // return control to calling client if the query failed
|
|
}
|
|
}
|
|
} else {
|
|
// source schema is not supported
|
|
$msg = sprintf(ERROR_SCHEMA_NOT_SUPPORTED, $this->schema);
|
|
$this->errorStack[] = $msg;
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_SCHEMA_ERROR;
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
/**
|
|
* generateHTMLEventReport() -- private method
|
|
*
|
|
* This method is called at the end of a migration event, if the migration event was launched from the new
|
|
* (and totally awesome) webUI migration app.
|
|
*
|
|
* The methods requires no input parameters and returns type void to the calling client
|
|
*
|
|
* The methods builds the migration report in HTML and assigns the completed report the the class member
|
|
* variable: $migrationReport.
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 10-05-18 mks DB-43: original coding
|
|
*
|
|
*/
|
|
private function generateHTMLEventReport(): void
|
|
{
|
|
$recPasses = 0;
|
|
$rpf = 'Max Recs Per Fetch: 0';
|
|
$eos = '<br>';
|
|
$this->migrateStatus = true;
|
|
$dropPartials = (isset($this->brokerData[MIGRATION_FILTER_PARTIALS]) and $this->brokerData[MIGRATION_FILTER_PARTIALS] == 1) ? 'dropped' : 'kept';
|
|
$softDeletes = (isset($this->brokerData[MIGRATION_FILTER_SOFT_DELETES]) and $this->brokerData[MIGRATION_FILTER_SOFT_DELETES] == 1) ? ON : OFF;
|
|
|
|
// start generating the HTML report
|
|
//
|
|
// first, generate the generic header dependent on test mode
|
|
$report = '<div class="modal-header">' . $eos;
|
|
$report .= '<h4 class="modal-title w-100" id="resultsModalLabel">Migration Results</h4>' . $eos;
|
|
$report .= '<button type="button" class="close" data-dismiss="modal" aria-label="Close">' . $eos;
|
|
$report .= '<span aria-hidden="true">×</span></button>' . $eos;
|
|
$report .= '</div>' . $eos;
|
|
$report .= '<div class="modal-body">' . $eos;
|
|
$report .= '<p class="text-danger">' . $eos;
|
|
//next, check to see if we're in test mode and, if so, generate the test mode header
|
|
if ($this->testMode) {
|
|
$report .= '<strong>Test Mode Active</strong></p>';
|
|
$report .= '<i class="fa fa-minus text-muted" aria-hidden="true">';
|
|
$report .= '<span class="text-muted"> No data was deleted from the source table</span></i>' . $eos;
|
|
$report .= '<i class="fa fa-minus text-muted" aria-hidden="true">';
|
|
$report .= '<span class="text-muted"> No data was written to the destination table</span></i><hr>' . $eos;
|
|
}
|
|
$report .= 'Record Partials are <strong>' . $dropPartials . '</strong>' . $eos;
|
|
$report .= 'Soft Delete Records <strong>' . $softDeletes . '</strong>' . $eos;
|
|
$report .= 'Source Repository: <strong>' . $this->config[$this->brokerData[MWH_SOURCE_SCHEMA]][STRING_HOST] . '</strong>' . $eos;
|
|
$report .= 'Source Schema: <strong>' . $this->brokerData[MWH_SOURCE_SCHEMA] . '</strong>' . $eos;
|
|
$report .= 'Source Table: <strong>' . $this->brokerData[MWH_SOURCE_TABLE] . '</strong>' . $eos;
|
|
$report .= 'Fetch Query: <strong>' . $this->query . '</strong>' . $eos;
|
|
$report .= 'Field Coverage: <strong>' . $this->coverage . '%' . '</strong>' . $eos;
|
|
$report .= 'Destination Repo: <strong>' . $this->widget->dbService . '</strong>' . $eos;
|
|
$report .= 'Destination Schema: <strong>' . $this->brokerData[MWH_DEST_SCHEMA] . '</strong>' . $eos;
|
|
$report .= 'Destination Table: <strong>' . $this->widget->collectionName . '</strong>' . $eos;
|
|
|
|
$report .= 'Records In Source: <strong>' . $this->recordsInSourceTable . '</strong>' . $eos;
|
|
$report .= '<strong>' . $rpf . '</strong>' . $eos;
|
|
if ($recPasses < 1) $recPasses = 1;
|
|
$report .= 'Est # Fetches: <strong>' . $recPasses . '</strong>' . $eos;
|
|
$report .= 'Num Recs Dropped: <strong>' . $this->droppedRecords . '</strong>' . $eos;
|
|
$report .= 'Num Records Moved: <strong>' . $this->recordCount . '</strong><hr>' . $eos;
|
|
|
|
if ($this->testMode) {
|
|
$report .= '<span class="text-danger">Records fetched for quality evaluation (0 Records actually moved)</span>' . $eos;
|
|
$report .= '<span class="text-muted"><i class="fa fa-minus text-muted" aria-hidden="true">';
|
|
$report .= ' Ensure that record mapping is correct...</i></span>' . $eos;
|
|
$report .= '<span class="text-muted"><i class="fa fa-minus text-muted" aria-hidden="true">';
|
|
$report .= ' Ensure that all fields were mapped...</i></span>' . $eos;
|
|
$report .= '<span class="text-muted"><i class="fa fa-minus text-muted" aria-hidden="true">';
|
|
$report .= ' Were data types properly maintained...?</i></span>' . $eos;
|
|
$report .= '<span class="text-muted"><i class="fa fa-minus text-muted" aria-hidden="true">';
|
|
$report .= ' Validate injected fields (timestamps, GUIDs)...</i></span>' . $eos;
|
|
$report .= $eos . $eos . '<pre>' . print_r($this->widget->getData(), true) . '</pre>' . $eos;
|
|
}
|
|
$report .= '</div>' . $eos; // end modal-body div
|
|
$report .= '<div class="modal-footer">' . $eos;
|
|
$report .= '<button type="button" class="btn btn-secondary btn-sm" data-dismiss="modal">Close</button>' . $eos;
|
|
$report .= '</div>' . $eos;
|
|
|
|
$this->migrationReport = $report;
|
|
}
|
|
|
|
|
|
/**
|
|
* generateEventReport() -- private method
|
|
*
|
|
* This method generates the migration report that's returned to the calling client as the success data payload
|
|
* in the broker event.
|
|
*
|
|
* We're going to actually publish a longer report in test mode since we're dumping all the testMode variables
|
|
* that were defined at the top of the file as well as var-dumping the 10 records that were pulled from the
|
|
* source and converted to Namaste records according to the data template.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 02-10-18 mks _INF-139: original coding completed
|
|
* 05-16-18 mks _INF-188: updated for warehousing
|
|
*
|
|
*/
|
|
private function generateEventReport(): void
|
|
{
|
|
$recPasses = 0;
|
|
$rpf = 'Max Recs Per Fetch: 0';
|
|
$this->migrateStatus = true;
|
|
$eos = (isset($_SERVER['HTTP_USER_AGENT'])) ? '<br>' : PHP_EOL;
|
|
$dropPartials = (isset($this->brokerData[MIGRATION_FILTER_PARTIALS]) and $this->brokerData[MIGRATION_FILTER_PARTIALS] == 1) ? 'dropped' : 'kept';
|
|
$softDeletes = (isset($this->brokerData[MIGRATION_FILTER_SOFT_DELETES]) and $this->brokerData[MIGRATION_FILTER_SOFT_DELETES] == 1) ? ON : OFF;
|
|
$report = (($this->testMode) ? $this->TestHeader : '') . $eos . $eos;
|
|
// todo -- option for html generation - for now, just return a text-only report CORE-860
|
|
// todo -- add the migration report to the migration record CORE-861
|
|
// $html = ($eos == PHP_EOL) ? false : true;
|
|
$report .= $this->eventType . ' Report' . $eos . '================' . $eos;
|
|
if ($this->eventType == STRING_MIGRATION) {
|
|
$report .= 'Record Partials are ' . $dropPartials . $eos;
|
|
$report .= 'Soft Delete Records ' . $softDeletes . $eos;
|
|
$report .= 'Source Repository: ' . $this->config[$this->brokerData[MIGRATION_SOURCE_SCHEMA]][STRING_HOST] . $eos;
|
|
$report .= 'Source Schema: ' . $this->brokerData[MWH_SOURCE_SCHEMA] . $eos;
|
|
$report .= 'Source Table: ' . $this->brokerData[MWH_SOURCE_TABLE] . $eos;
|
|
$report .= 'Fetch Query: ' . $this->query . $eos;
|
|
$report .= 'Field Coverage: ' . $this->coverage . '%' . $eos;
|
|
$report .= 'Destination Repo: ' . $this->widget->dbService . $eos;
|
|
$report .= 'Destination Schema: ' . $this->brokerData[MWH_DEST_SCHEMA] . $eos;
|
|
$report .= 'Destination Table: ' . $this->widget->collectionName . $eos;
|
|
} else {
|
|
if ($this->remoteSource) {
|
|
$report .= 'Source Repository: ' . $this->whSourceURI . $eos;
|
|
$report .= 'Source Schema: ' . $this->objWarehouse->template->schema . $eos;
|
|
$report .= 'Source Table: ' . $this->brokerData[WH_REMOTE_TABLE] . $eos;
|
|
$report .= 'Fetch Query: ' . $this->whereClause . $eos;
|
|
$recPasses = intval(floor($this->recordsInSourceTable / $this->objWarehouseMeta->getRecordLimit()));
|
|
$rpf = 'Max Recs Per Fetch: ' . $this->objWarehouseMeta->getRecordLimit() . $eos;
|
|
} else {
|
|
$recPasses = intval(floor($this->recordsInSourceTable / $this->recordLimit));
|
|
$report .= 'Source Repository: ' . $this->widget->template->service . $eos;
|
|
$report .= 'Source Schema: ' . $this->widget->template->schema . $eos;
|
|
$report .= 'Source Table: ' . $this->widget->template->collection . $this->widget->template->extension . $eos;
|
|
$report .= 'Fetch Query: ' . $this->widget->strQuery . $eos;
|
|
$report .= 'Query Variables: ' . json_encode($this->widget->queryVariables) . $eos;
|
|
$rpf = 'Max Recs Per Fetch: ' . $this->recordLimit . $eos;
|
|
}
|
|
$report .= 'Destination Repo: ' . $this->objWarehouse->dbService . $eos;
|
|
$report .= 'Destination Schema: ' . $this->objWarehouse->schema . $eos;
|
|
$report .= 'Destination Table: ' . $this->objWarehouse->collectionName . $this->objWarehouse->ext . $eos;
|
|
}
|
|
$report .= 'Records In Source: ' . $this->recordsInSourceTable . $eos;
|
|
$report .= $rpf . $eos;
|
|
if ($recPasses < 1) $recPasses = 1;
|
|
$report .= 'Est # Fetches: ' . $recPasses . $eos;
|
|
$report .= 'Num Recs Dropped: ' . $this->droppedRecords . $eos;
|
|
$report .= 'Num Records Moved: ' . $this->recordCount; // $eos added below
|
|
|
|
try {
|
|
if ($this->testMode and $this->eventType == STRING_MIGRATION) {
|
|
$report .= ' records fetched for quality evaluation (0 Records actually moved)';
|
|
$report .= $eos . $this->TestBody;
|
|
$report .= $eos . $eos . var_export($this->widget->getData(), true) . $eos;
|
|
$report .= $eos . $eos . $this->TestFooter . $eos . $eos;
|
|
} elseif ($this->testMode and $this->eventType == STRING_WAREHOUSING) {
|
|
$report .= $eos . $this->TestBodyWH;
|
|
$report .= $eos . $eos . var_export($this->objWarehouseMeta->getData(), true) . $eos;
|
|
$report .= $eos . $eos . $this->TestFooterWH . $eos . $eos;
|
|
}
|
|
|
|
// todo -- read the comments in the constructor for items to add to the report like "last record written"
|
|
$report .= $eos;
|
|
$this->migrationReport = $report;
|
|
// we update the meta record in the calling method if this is a migration event
|
|
if ($this->eventType == STRING_WAREHOUSING) {
|
|
if (!$this->updateWHMeta([MWH_REPORT => $report])) {
|
|
$error = ERROR_MDB_SYS_EVENT_SAVE . COLON . MWH_REPORT;
|
|
$this->errorStack[] = $error;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->error($error);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
}
|
|
}
|
|
} catch (Throwable $t) {
|
|
$msg = basename(__METHOD__) . AT . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
|
|
$this->errorStack[] = $msg;
|
|
$this->errorStack[] = $t->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
$this->logger->warn($t->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
consoleLog($this->res, CON_ERROR, $t->getMessage());
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* updateMigrationMetaData() -- private method
|
|
*
|
|
* This method has an optional input parameter, that is a valid STATUS field - use this parameter only when you
|
|
* want to update the status of the migration record. (CLOSED) Note that a status change request will only
|
|
* be recognized on a migration-record update; the status will be ignored when creating a new migration record.
|
|
*
|
|
* The method returns a boolean value indicating if the migration record was successfully updated or not - saved
|
|
* to the Admin service mongo collection.
|
|
*
|
|
* The first thing we do is pull in the current mig data object and look for a token - if one does not exist, then
|
|
* we'll assume this is a new-record creation and we'll call the admin-out broker's migrate-create event. On
|
|
* successful return from that event, we'll import the GUID and the dateCreated strings into the current data
|
|
* so that subsequent calls are treated as an update event.
|
|
*
|
|
* If this is an update event, parse the status, if provided, and update the relevant fields.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @param string $_status
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 02-14-18 mks INF-139: original coding
|
|
* 09-04-18 mks DB-48: Added migration report to the record data
|
|
*
|
|
*/
|
|
private function updateMigrationMetaData(string $_status = ''): bool
|
|
{
|
|
$updateStatus = false;
|
|
$response = array();
|
|
|
|
try {
|
|
// get the migration data into an stand-alone array
|
|
if (is_null($data = $this->objMigrate->getData())) return false;
|
|
} catch (TypeError $t) {
|
|
$msg = ERROR_TYPE_EXCEPTION . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
if (count($data) != 1) {
|
|
$msg = sprintf(ERROR_DATA_RECORD_COUNT, 1, count($data));
|
|
$this->errorStack[] = $msg;
|
|
if (gasConfig::$settings[CONFIG_DEBUG]) consoleLog($this->res, CON_ERROR, $msg);
|
|
return false;
|
|
}
|
|
// todo -- check the count of $data before assigning it, perhaps also validate that the key == 0
|
|
|
|
if (!array_key_exists(DB_TOKEN, $data[0])) {
|
|
// no GUID exists in the record => save the data
|
|
$request = [
|
|
BROKER_REQUEST => BROKER_REQUEST_ADMIN_MWH_EVENT_CREATE,
|
|
BROKER_DATA => $data,
|
|
BROKER_META_DATA => [
|
|
META_TEMPLATE => TEMPLATE_CLASS_MIGRATIONS, // todo -- remove this requirement CORE-863
|
|
META_CLIENT => CLIENT_SYSTEM
|
|
]
|
|
];
|
|
try {
|
|
if (!$this->publishMigrationEvent($request, $response)) return false;
|
|
|
|
// if we successfully created the new migration record, we need to inject the GUID and the createdDate
|
|
@$this->objMigrate->insertField(DB_TOKEN, $response[PAYLOAD_RESULTS][0][DB_TOKEN]);
|
|
@$this->objMigrate->insertField(DB_CREATED, $response[PAYLOAD_RESULTS][0][DB_CREATED]);
|
|
} catch (Throwable $t) {
|
|
$msg = basename(__METHOD__) . AT . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
|
|
$this->errorStack[] = $msg;
|
|
$this->errorStack[] = $t->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
$this->logger->warn($t->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
consoleLog($this->res, CON_ERROR, $t->getMessage());
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
} else {
|
|
// pull the data out of the indexed array and manipulate directly, strictly for cleaner code
|
|
$data = $data[0];
|
|
$updateData = [];
|
|
try {
|
|
// GUID exists in the record - update the status, if a value was passed to this method
|
|
if (!empty($_status)) {
|
|
switch ($_status) {
|
|
case STATUS_IN_PROGRESS :
|
|
case STATUS_COMPLETE :
|
|
@$this->objMigrate->insertField(DB_STATUS, $_status);
|
|
$updateData[DB_STATUS] = $_status;
|
|
$updateData[MWH_REPORT] = $this->migrationReport;
|
|
$updateStatus = true;
|
|
break;
|
|
|
|
default :
|
|
$msg = ERROR_MIGRATION_STATUS_INV . $_status;
|
|
$this->errorStack[] = $msg;
|
|
$this->state = STATE_STATUS;
|
|
return false;
|
|
break;
|
|
}
|
|
}
|
|
if ($updateStatus) {
|
|
$data[DB_STATUS] = $_status;
|
|
if ($_status == STATUS_COMPLETE) {
|
|
$ts = time();
|
|
$updateData[MWH_DATE_COMPLETED] = $ts;
|
|
$this->objMigrate->insertField(MWH_DATE_COMPLETED, $ts);
|
|
}
|
|
}
|
|
if (!is_null($this->objMigrate->getColumn(MWH_NUM_RECS_DROPPED))) {
|
|
$updateData[MWH_NUM_RECS_DROPPED] = $this->objMigrate->getColumn(MWH_NUM_RECS_DROPPED);
|
|
}
|
|
$updateData[MWH_NUM_RECS_MOVED] = $this->objMigrate->getColumn(MWH_NUM_RECS_MOVED);
|
|
$ts = time();
|
|
$updateData[DB_ACCESSED] = $ts;
|
|
$this->objMigrate->insertField(DB_ACCESSED, $ts);
|
|
|
|
// build the broker payload and submit
|
|
$query = [DB_TOKEN => [OPERAND_NULL => [OPERATOR_EQ => [$this->objMigrate->getColumn(DB_TOKEN)]]]];
|
|
$request = [
|
|
BROKER_REQUEST => BROKER_REQUEST_ADMIN_MWH_EVENT_UPDATE,
|
|
BROKER_DATA => [STRING_QUERY_DATA => $query, STRING_UPDATE_DATA => $updateData],
|
|
BROKER_META_DATA => [META_TEMPLATE => TEMPLATE_CLASS_MIGRATIONS, META_CLIENT => CLIENT_SYSTEM]
|
|
];
|
|
if (!$this->publishMigrationEvent($request)) return false;
|
|
} catch (Throwable $t) {
|
|
$msg = basename(__METHOD__) . AT . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
|
|
$this->errorStack[] = $msg;
|
|
$this->errorStack[] = $t->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
$this->logger->warn($t->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
consoleLog($this->res, CON_ERROR, $t->getMessage());
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
/**
|
|
* fetchMigrationMetaData() -- private method
|
|
*
|
|
* This method is responsible to fetching the migration meta data from the Namaste admin service. This class
|
|
* object, as a review, is really a data manager as opposed to a data-class. The admin collection in mongo for
|
|
* migration meta data is maintained by this class through it's member instantiation via normal channels, e.g.:
|
|
* the gacFactory, and assigns the migration class object to the class member: $objMigrate.
|
|
*
|
|
* When this method is called, it's assumed that the $objMigrate class object has already been instantiated. That
|
|
* the $objMigrate class has data is not guaranteed. It's also assumed that the member $brokerData has been
|
|
* populated with the request payload (query) data. If both are empty, return a Boolean(false) and set the
|
|
* state of this class to STATE_DATA_ERROR.
|
|
*
|
|
* First off, when we enter the method, we're going to instantiate a local member to as a brokerClient class
|
|
* object which will handle our request to the Namaste admin service for fetching the migration meta data. If
|
|
* this object fails to instantiate, we'll return immediately. We'll return a Boolean(false) value to the calling
|
|
* client and set the STATE of this class to a framework warning.
|
|
*
|
|
* Next, we're going to check to see if the current instantiation of $objMigrate already has data or not. If it
|
|
* does, the inference is that this is a migration in-progress. If so, we want to pull the already generated
|
|
* token from the data payload, and use that as the basis for the query to fetch the data. This covers the
|
|
* (hopefully unlikely) scenario recovering from a previous failure by loading the last save points.
|
|
*
|
|
* If the $objMigrate class data is empty, we're going to build the query from the broker payload data sent
|
|
* by the client. For first pass in a migration, we're expecting that no data will be returned.
|
|
*
|
|
* If the payload returned by the broker is out-of-bounds with respect to processing, (edge-case), then we'll
|
|
* set the class state to reflect an AMQP error.
|
|
*
|
|
* If data is returned, we need to evaluate the data to ensure that this is a current, or active, migration.
|
|
* In other words, the migration record has the following potential values for the status field:
|
|
*
|
|
* -- COMPLETE: a migration for this data was previously and successfully completed
|
|
* -- IN-PROGRESS: this is an active, on-going, migration
|
|
*
|
|
* Once a migration is in-progress, it remains in that state until completed. Migrations that have failed
|
|
* completely should only occur during development and all impacted data would have to be manually removed
|
|
* from the migration and target tables.
|
|
*
|
|
* If data is not returned, but the query was successful, we should have STATE_NOT_FOUND returned in the
|
|
* broker event call payload status. If this is the case, we'll still return a Boolean(true) to the calling
|
|
* client, but we'll set the state of the $objMigrate object to STATE_NOT_FOUND and this MUST be tested for
|
|
* by the calling client when Boolean(true) values are returned.
|
|
*
|
|
* If data was returned, but we're unable to load the data into the $objMigrate class, set the class state to
|
|
* DATA_TYPE_ERROR and return a Boolean(false).
|
|
*
|
|
* If data was returned, check the row count - if the row count exceeds one, return an error.
|
|
*
|
|
* If data was returned, check the status field - if the status is not IN-PROGRESS, return an error.
|
|
*
|
|
* Finally, if we reach the end of the method, post-data-processing, where we shouldn't ever hit (edge case), then
|
|
* we'll return a STATE_FAIL.
|
|
*
|
|
* Possible Return States:
|
|
* -----------------------
|
|
* STATE_SUCCESS -- desired, optimal state
|
|
* STATE_FRAMEWORK_WARNING -- unable to instantiate a broker client class
|
|
* STATE_DATA_ERROR -- missing both brokerData and the migration class object data
|
|
* -- more than one migration row was returned from the generic search query
|
|
* -- a record was returned without a count of the # records already moved, or the count of
|
|
* records moved is less than 1, or the count is greater than the previously-recorded
|
|
* total number of records in the source table
|
|
* STATE_AMQP_ERROR -- we received bad data, or a false-status payload from the admin broker
|
|
* STATE_DATA_TYPE_ERROR -- we were unable to load the returned data into the $objMigrate $data member
|
|
* STATE_NOT_FOUND -- the query was successful, but no migration record exists for the request
|
|
* STATE_STATUS -- the query returned a migration record not in an IN-PROGRESS status
|
|
*
|
|
* The method is considered a success under these, and only these conditions:
|
|
*
|
|
* return TRUE and STATE_SUCCESS
|
|
* return TRUE and STATE_NOT_FOUND
|
|
*
|
|
* Anything else is, and should be considered to be, an error.
|
|
*
|
|
* In all cases, when the method returns false, the errorStack for the class is populated with a relevant message.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @return bool
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 02-09-18 mks _INF-139: original code completed
|
|
*
|
|
*/
|
|
private function fetchMigrationMetaData(): bool
|
|
{
|
|
$this->state = STATE_SUCCESS;
|
|
|
|
if (is_null($bc = $this->getRabbitClient())) return false;
|
|
|
|
// get the current migration data, if it exists
|
|
$data = $this->objMigrate->getData();
|
|
if (!is_null($data)) $data = $data[0]; // should only be one record if there's data
|
|
|
|
// build the namaste query depending on what's in the data payload
|
|
if (!is_null($data) and array_key_exists((DB_TOKEN . $this->objMigrate->ext), $data)) {
|
|
// we're going to query by GUID
|
|
$query = [ DB_TOKEN => [ OPERAND_NULL => [ OPERATOR_EQ => [$data[(DB_TOKEN . $this->objMigrate->ext)]] ] ] ];
|
|
$sort = [ $this->migrationSortKey => STRING_SORT_ASC ];
|
|
} elseif (!is_null($data) and !empty($this->brokerData)) {
|
|
// we're going to fetch by brokerData (first pass) defined keys
|
|
$query = [ MWH_SOURCE_SCHEMA => [ OPERAND_NULL => [ OPERATOR_EQ => [ $this->objMigrate->getColumn(MWH_SOURCE_SCHEMA . $this->objMigrate->ext)]]],
|
|
MWH_SOURCE_TABLE => [ OPERAND_NULL => [ OPERATOR_EQ => [ $this->objMigrate->getColumn(MWH_SOURCE_TABLE . $this->objMigrate->ext)]]],
|
|
MWH_DEST_SCHEMA => [ OPERAND_NULL => [ OPERATOR_EQ => [ $this->objMigrate->getColumn(MWH_DEST_SCHEMA . $this->objMigrate->ext)]]],
|
|
MWH_DEST_TABLE => [ OPERAND_NULL => [ OPERATOR_EQ => [ $this->objMigrate->getColumn(MWH_DEST_TABLE . $this->objMigrate->ext)]]],
|
|
OPERAND_AND => null
|
|
];
|
|
$sort = [ $this->migrationSortKey => STRING_SORT_ASC ];
|
|
} else {
|
|
// we have neither brokerData in the current object, or migration data in the widget
|
|
$msg = ERROR_DATA_MISSING_ARRAY . STRING_MIGRATION_DATA;
|
|
$this->errorStack[] = $msg;
|
|
$this->state = STATE_DATA_ERROR;
|
|
return false;
|
|
}
|
|
// build the request payload
|
|
$request = [
|
|
BROKER_REQUEST => BROKER_REQUEST_ADMIN_MWH_EVENT_FETCH,
|
|
BROKER_DATA => [
|
|
STRING_QUERY_DATA => $query,
|
|
STRING_SORT_DATA => $sort,
|
|
],
|
|
BROKER_META_DATA => [
|
|
META_TEMPLATE => TEMPLATE_CLASS_MIGRATIONS,
|
|
META_CLIENT => CLIENT_SYSTEM
|
|
]
|
|
];
|
|
$payload = gzcompress(json_encode($request));
|
|
|
|
// submit the request and process results
|
|
$response = $bc->call($payload);
|
|
// release the rabbit-client object
|
|
if (is_object($bc)) $bc->__destruct();
|
|
unset($bc);
|
|
|
|
// process the response returned from the adminOut broker
|
|
$response = json_decode(gzuncompress($response), true);
|
|
if (!is_array($response) or !$response[PAYLOAD_STATUS]) {
|
|
$msg = ERROR_BROKER_RESPONSE_BAD;
|
|
$this->errorStack[] = $msg;
|
|
if (is_array($response[PAYLOAD_DIAGNOSTICS])) {
|
|
$this->errorStack = array_merge($this->errorStack, $response[PAYLOAD_DIAGNOSTICS]);
|
|
}
|
|
$this->state = STATE_AMQP_ERROR;
|
|
return false;
|
|
} elseif ($response[PAYLOAD_STATUS] and $response[PAYLOAD_STATE] == STATE_SUCCESS) {
|
|
// todo -- check for the number of records and check for the status!
|
|
if (!$this->objMigrate->addData($response[PAYLOAD_RESULTS])) {
|
|
if (is_array($this->objMigrate->eventMessages)) {
|
|
$this->errorStack = array_merge($this->objMigrate->eventMessages, $this->errorStack);
|
|
}
|
|
$this->state = STATE_DATA_TYPE_ERROR;
|
|
return false;
|
|
}
|
|
if ($this->objMigrate->count > 1) {
|
|
$this->state = STATE_DATA_ERROR;
|
|
$this->errorStack[] = sprintf(ERROR_DATA_ARRAY_COUNT, 1, MONGO_STRING_COUNT, $this->objMigrate->count);
|
|
return false;
|
|
}
|
|
if ($this->objMigrate->getColumn(DB_STATUS) != STATUS_IN_PROGRESS) {
|
|
$this->errorStack[] = sprintf(ERROR_INVALID_STATUS, STATUS_IN_PROGRESS, $this->objMigrate->getColumn(DB_STATUS));
|
|
$this->state = STATE_STATUS;
|
|
return false;
|
|
}
|
|
// populate the skip member
|
|
$this->recordCount = $this->objMigrate->getColumn(MWH_NUM_RECS_MOVED) + $this->objMigrate->getColumn(MWH_NUM_RECS_DROPPED);
|
|
// $this->skip = $this->objMigrate->getColumn(MIGRATIONS_NUM_RECS_MOVED) + $this->objMigrate->getColumn(MIGRATIONS_NUM_RECS_DROPPED);
|
|
if (is_null($this->recordCount) or $this->recordCount < 0 or $this->recordCount > $this->recordsInSourceTable) {
|
|
// if (is_null($this->skip) or $this->skip < 0 or $this->skip > $this->recordsInSourceTable) {
|
|
$this->errorStack[] = ERROR_DATA_INCONSISTENT_COUNT;
|
|
$this->state = STATE_DATA_ERROR;
|
|
return false;
|
|
}
|
|
return true;
|
|
} elseif ($response[PAYLOAD_STATUS] and $response[PAYLOAD_STATE] == STATE_NOT_FOUND) {
|
|
$this->errorStack[] = INFO_QUERY_RETURNED_NO_DATA;
|
|
$this->objMigrate->state = STATE_NOT_FOUND;
|
|
$this->recordCount = 0;
|
|
$this->state = STATE_SUCCESS;
|
|
// $this->skip = 0;
|
|
return true;
|
|
}
|
|
// we should never execute past this point but, if we do, return an error...
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_UNKNOWN_EVENT;
|
|
$this->state = STATE_FAIL;
|
|
return false;
|
|
}
|
|
|
|
|
|
/**
|
|
* doDataMap() -- private method
|
|
*
|
|
* This method is a generic method meant to perform the mapping between the source and destination schemas.
|
|
*
|
|
* The method requires an input parameter which should be an array of records fetched from the source. For every
|
|
* record in source input array, do the following:
|
|
*
|
|
* 1. check to see if we're filtering out soft-deleted records and, if the current record's status field (as
|
|
* defined in the template migration member is set to the flagged value, then we're skip this record.
|
|
* 2. loop through each key->value pair in the migration map (field list) and, for Namaste reserved fields,
|
|
* inject the requisite data or, in the case of the time fields, force a conversion from MYSQL date stamp.
|
|
*
|
|
* At the end of processing the migration map, where each migrated field (value) was copied to a new key, append
|
|
* that record to the end of the result set.
|
|
*
|
|
* At the end of processing of the input records, return the result set to the calling client.
|
|
*
|
|
* Note that this method does not evaluate or validate any of the data returned from the source other than the
|
|
* conversion of date strings to epoch times and these values are copied into the new record as NEW values.
|
|
* (e.g.: the old values are preserved.)
|
|
*
|
|
* todo -- case conversion? e.g.: for incoming guids, should these be converted to UC?
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @param array $_sourceData
|
|
* @return array|null
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 02-02-18 mks _INF-139: original coding
|
|
*
|
|
*/
|
|
private function doDataMap(array $_sourceData): ?array
|
|
{
|
|
$resultSet = null;
|
|
/** @var gacMongoDB $widget */
|
|
$widget = $this->widget;
|
|
$ext = $widget->ext;
|
|
$meta = $widget->getMetaDataPayload();
|
|
|
|
$validStatusValues = [
|
|
STATUS_PENDING, STATUS_ACTIVE, STATUS_DELETED, STATUS_SUSPENDED, STATUS_EXPIRED, STATUS_REVOKED,
|
|
STATUS_LOCKED, STATUS_COMPLETE, STATUS_IN_PROGRESS, STATUS_ABANDONED, STATUS_INVALID, STATUS_VALID,
|
|
STATUS_REJECTED, STATUS_APPROVED, STATUS_FAILED, STATUS_CLOSED, STATUS_SENT, STATUS_REQUEST_SENT,
|
|
STATUS_INACTIVE, STATUS_NEW, STATUS_FAKE
|
|
];
|
|
|
|
|
|
foreach ($_sourceData as $record) {
|
|
$newRecord = null;
|
|
$skipRecord = false;
|
|
|
|
// skip this record if we're not importing soft-deleted records and current record is soft-deleted
|
|
if ($this->filterSoftDeletes and $record[key($this->migrationStatusKV)] == $this->migrationStatusKV[key($this->migrationStatusKV)]) {
|
|
$this->droppedRecords++;
|
|
continue;
|
|
}
|
|
// otherwise spin through the migration map fields and transfer/assign data
|
|
foreach ($this->migrationMap as $key => $value) {
|
|
switch ($key) {
|
|
case DB_CREATED :
|
|
case DB_ACCESSED :
|
|
$newRecord[$key . $ext] = (is_integer($record[$value])) ? $record[$value] : date('Y-m-d H:i:s', strtotime($record[$value]));
|
|
break;
|
|
case DB_EVENT_GUID :
|
|
if (!is_null($meta) and isset($meta[DB_EVENT_GUID])) {
|
|
$newRecord[$key . $ext] = $meta[DB_EVENT_GUID];
|
|
}
|
|
break;
|
|
case DB_TOKEN :
|
|
// do nothing -- prematurely inserting a guid will cause _createRecord() to fail
|
|
break;
|
|
case DB_STATUS :
|
|
if (!is_null($value)) {
|
|
if (in_array(strtoupper($record[$value]), $validStatusValues)) {
|
|
$newRecord[$key . $ext] = strtoupper($record[$value]);
|
|
} else {
|
|
$newRecord[$key . $ext] = $value;
|
|
}
|
|
} else {
|
|
$newRecord[DB_STATUS . $ext] = STATUS_ACTIVE;
|
|
}
|
|
break;
|
|
default :
|
|
if (!is_null($value)) {
|
|
if (!is_null($record[$value])) {
|
|
$newRecord[$key . $ext] = $record[$value];
|
|
} elseif (is_null($record[$value])) {
|
|
$newRecord[$key . $ext] = null;
|
|
} elseif (1 == $this->brokerData[MIGRATION_FILTER_PARTIALS]) {
|
|
$skipRecord = true;
|
|
$this->droppedRecords++;
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
if (!is_null($newRecord) and !$skipRecord) {
|
|
$resultSet[] = $newRecord;
|
|
}
|
|
}
|
|
return $resultSet;
|
|
}
|
|
|
|
|
|
/**
|
|
* calculateCoverage() -- private method
|
|
*
|
|
* This method examines the source fields (schema) and compares it to the migration map in the destination template
|
|
* and, from that, calculates coverage as a percentage of each record (by column name) for the migration.
|
|
*
|
|
* Any fields that are dropped are done so solely by the fact that the field(s) do not appear in the target schema's
|
|
* migration map. Fields that are dropped are stored in the local member list: $omittedFields. Fields that are
|
|
* not dropped are stored in the local member list: $migratedFields.
|
|
*
|
|
* The coverage (percent) along with the field lists are used in the migration report that will be generated and
|
|
* returned to the requesting client.
|
|
*
|
|
* The method has no input parameters -- all non-derived values are culled from class member storage.
|
|
* The method returns a Boolean value indicating that the coverage was successfully calculated (returns a
|
|
* calculated percentage greater-than zero and less-than or equal-to 100.
|
|
*
|
|
* If the method returns a Boolean(false), you can assume that the method failed to calculate coverage and that
|
|
* further investigation as to why is warranted.
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 01-31-18 mks _INF-139: original coding
|
|
*
|
|
*/
|
|
private function calculateCoverage(): bool
|
|
{
|
|
$droppedCount = 0;
|
|
// loop through the migration map and look and each SOURCE field name
|
|
if ($this->schema == STRING_MYSQL) {
|
|
$totalCount = count($this->mysqlFieldList);
|
|
foreach ($this->mysqlFieldList as $fieldName) {
|
|
if (in_array($fieldName, $this->migrationMap)) {
|
|
$this->migratedFields[] = $fieldName;
|
|
} elseif (!in_array($fieldName,$this->omittedFields)) {
|
|
$this->omittedFields[] = $fieldName;
|
|
$droppedCount++;
|
|
}
|
|
}
|
|
} elseif ($this->schema == STRING_MONGO) {
|
|
// todo -- code this
|
|
$totalCount = count($this->mongoSourceSchema);
|
|
} else {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_SCHEMA_NOT_SUPPORTED, $this->schema);
|
|
return false;
|
|
}
|
|
// calculate the migration (record-fields) coverage
|
|
if (!$droppedCount and is_array($this->migratedFields) and count($this->migratedFields) == $totalCount) {
|
|
$this->coverage = 100;
|
|
} elseif ($droppedCount and count($this->migratedFields) < $totalCount) {
|
|
$this->coverage = intval(count($this->migratedFields) / $totalCount * 100);
|
|
} else {
|
|
$this->coverage = 0;
|
|
}
|
|
return boolval($this->coverage);
|
|
}
|
|
|
|
|
|
/**
|
|
* getNamasteObjects() -- private method
|
|
*
|
|
* This method, like the others, is invoked from the constructor method and requires the meta-data payload as
|
|
* a stored member variable.
|
|
*
|
|
* The method will instantiate two namaste class objects and assign both to different migrations-class member
|
|
* variables on success.
|
|
*
|
|
* The destination class is instantiated - whether or not this is a mongo or mysql class is handled by the
|
|
* template during instantiation.
|
|
*
|
|
* If the destination class fails to instantiate, then we return a false value immediately. Since the migration
|
|
* class error stack was passed to the gacFactory constructor, error messages are preserved.
|
|
*
|
|
* 2. The migration class is instantiated. This is a system-class that will keep track of the migration. As each
|
|
* "batch" of records is updated, the migration data is updated and saved providing us with a vector for recovery
|
|
* or roll-back of the migration request.
|
|
*
|
|
* If the migration class fails to instantiate, then we return a false value immediately. Otherwise, assuming that
|
|
* this is the first-iteration of this particular migration, we populate the migration class with initial data.
|
|
*
|
|
* If (to-do) we detect that this is resumption of a migration process, we'll need the migration guid in order
|
|
* to instantiate the migration classes $data member with the already-existing data.
|
|
*
|
|
* On successful instantiations of both classes, we'll return a Boolean(true) value to the calling client.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 01-30-18 mks _INF-139: original coding
|
|
* 03-22-18 mks CORE-852: removed a logic-dud
|
|
*
|
|
*/
|
|
private function getNamasteObjects(): bool
|
|
{
|
|
$retVal = false;
|
|
|
|
try {
|
|
// instantiate the destination object first
|
|
$tmpObj = new gacFactory($this->meta, FACTORY_EVENT_NEW_CLASS, '', $this->errorStack);
|
|
} catch (Throwable $t) {
|
|
$msg = basename(__METHOD__) . AT . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
|
|
$this->errorStack[] = $msg;
|
|
$this->errorStack[] = $t->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
$this->logger->warn($t->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
consoleLog($this->res, CON_ERROR, $t->getMessage());
|
|
}
|
|
$this->state = STATE_CLASS_INSTANTIATION_ERROR;
|
|
return false;
|
|
}
|
|
// copy new Namaste data class if instantiation was successful
|
|
if ($tmpObj->status) {
|
|
// todo: validate that required fields are present in the template:
|
|
$this->widget = $tmpObj->widget;
|
|
$this->migrationMap = $tmpObj->widget->template->migrationMap;
|
|
$this->migrationSortKey = $tmpObj->widget->template->migrationSortKey;
|
|
$this->migrationStatusKV = $tmpObj->widget->template->migrationStatusKV;
|
|
$retVal = true;
|
|
}
|
|
if (is_object($tmpObj)) $tmpObj->__destruct();
|
|
unset($tmpObj);
|
|
|
|
// todo -- handle MIGRATION_RESUMPTION processing! (3/22/18: really? thought I tested this...)
|
|
|
|
if ($retVal) {
|
|
$retVal = false;
|
|
// build a fake meta for the migrations class importing the broker-event guid
|
|
$tmpMeta[META_TEMPLATE] = TEMPLATE_CLASS_MIGRATIONS;
|
|
// if the event guid is present, validate and copy
|
|
if (isset($_meta[META_EVENT_GUID]) and validateGUID($this->meta[META_EVENT_GUID])) {
|
|
$tmpMeta[META_EVENT_GUID] = $this->meta[META_EVENT_GUID];
|
|
}
|
|
|
|
try {
|
|
// instantiate the migration object
|
|
$tmpObj = new gacFactory($tmpMeta, FACTORY_EVENT_NEW_CLASS, '', $this->errorStack);
|
|
} catch (Throwable $t) {
|
|
$msg = basename(__METHOD__) . AT . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
|
|
$this->errorStack[] = $msg;
|
|
$this->errorStack[] = $t->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
$this->logger->warn($t->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
consoleLog($this->res, CON_ERROR, $t->getMessage());
|
|
}
|
|
$this->state = STATE_CLASS_INSTANTIATION_ERROR;
|
|
return false;
|
|
}
|
|
// for a new migrate-class object, pre-populate the data prior to the first migration batch
|
|
if ($tmpObj->status) {
|
|
// validate that the template has the fields required for migration
|
|
if (!$this->validationTemplateMigrationData()) {
|
|
if (is_object($tmpObj)) $tmpObj->__destruct();
|
|
unset($tmpObj);
|
|
return false;
|
|
}
|
|
|
|
// extract the widget (class object) and pull data from the broker payload
|
|
/** @var gacMongoDB $widget */
|
|
$widget = $tmpObj->widget;
|
|
$data[MWH_SOURCE_TABLE] = $this->brokerData[MWH_SOURCE_TABLE];
|
|
$data[MWH_SOURCE_SCHEMA] = $this->brokerData[MWH_SOURCE_SCHEMA];
|
|
$data[MIGRATION_DEST_SCHEMA] = $this->brokerData[MIGRATION_DEST_SCHEMA];
|
|
$data[MWH_DEST_TABLE] = $this->brokerData[MWH_DEST_TABLE];
|
|
$data[MWH_NUM_RECS_SOURCE] = $this->recordsInSourceTable;
|
|
if (isset($this->meta[META_EVENT_GUID])) $data[META_EVENT_GUID] = $this->meta[META_EVENT_GUID];
|
|
$data[DB_STATUS] = STATUS_IN_PROGRESS;
|
|
$data[MWH_NUM_RECS_MOVED] = 0;
|
|
$data[MWH_NUM_RECS_DROPPED] = 0;
|
|
$data[MWH_DATE_STARTED] = time();
|
|
|
|
// let's see if a migrations-record for this move already exists in the db: (hint: migrations table
|
|
// is *always* stored in mongo...on the admin server!)
|
|
|
|
// validate and add the data to the widget
|
|
if ($widget->addData([$data])) {
|
|
// override the Namaste record limit with the migration record limit
|
|
try {
|
|
if ($widget->setRecordLimit($this->config[CONFIG_MIGRATION_RECORD_LIMIT])) {
|
|
// todo -- system event notification?
|
|
$msg = sprintf(INFO_RECORD_LIMIT_OVERRIDE, gasConfig::$settings[CONFIG_DATABASE][CONFIG_DATABASE_QUERY_RECORD_LIMIT], $this->config[CONFIG_MIGRATION_RECORD_LIMIT]);
|
|
consoleLog($this->res, CON_SUCCESS, $msg);
|
|
}
|
|
$this->recordLimit = $this->config[CONFIG_MIGRATION_RECORD_LIMIT];
|
|
} catch (Throwable $t) {
|
|
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_THROWABLE_EXCEPTION;
|
|
$this->errorStack[] = $msg;
|
|
$this->errorStack[] = $t->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
$this->logger->warn($t->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
consoleLog($this->res, CON_ERROR, $t->getMessage());
|
|
}
|
|
return false;
|
|
}
|
|
$retVal = true;
|
|
$this->objMigrate = $widget;
|
|
}
|
|
if (is_object($widget)) $widget->__destruct();
|
|
unset($widget);
|
|
}
|
|
if (is_object($tmpObj)) $tmpObj->__destruct();
|
|
unset($tmpObj);
|
|
}
|
|
return $retVal;
|
|
}
|
|
|
|
|
|
/**
|
|
* getTableBonafides() -- private method
|
|
*
|
|
* This method requires no input parameters because all necessary data has been assigned to class members prior
|
|
* to the invocation of this method. And the method returns a Boolean value to indicate if the request
|
|
* completed successfully or not.
|
|
*
|
|
* Based on the current request schema, we create a query for the source table, named in the broker-event request,
|
|
* and we pass that generic query to the source db schema/engine.
|
|
*
|
|
* If the target schema is mySQL, then we're going to issue two queries:
|
|
* 1. Get the table schema using "desc table-name" syntax
|
|
* 2. Get the record count using "select count(*)" syntax
|
|
*
|
|
* If the target schema is mongo, the best we can do is get the record count.
|
|
*
|
|
* The return results are stored in a class member variable.
|
|
*
|
|
* All db queries are exception trapped and, if raised, will add error messages to the member's error stack and
|
|
* output copies of the error to the console log.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.00
|
|
*
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 01-25-18 mks _INF-139: original coding
|
|
*
|
|
*/
|
|
private function getTableBonafides(): bool
|
|
{
|
|
$status = false;
|
|
$this->state = STATE_DB_ERROR;
|
|
$this->recordsInSourceTable = intval(0);
|
|
$table = $this->brokerData[MIGRATION_SOURCE_TABLE];
|
|
|
|
if ($this->schema == STRING_MYSQL) {
|
|
/** @var PDO $dbLink */
|
|
$dbLink = $this->resRemote;
|
|
|
|
// first, get the schema...
|
|
$query = 'DESC /* ' . basename(__FILE__) . COLON_NS . __LINE__ . ' */ ' . $table;
|
|
try {
|
|
foreach ($dbLink->query($query) as $row) {
|
|
$this->mysqlSourceSchema[] = $row;
|
|
$this->mysqlFieldList[] = $row[STRING_FIELD];
|
|
}
|
|
} catch (PDOException | Throwable $p) {
|
|
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_PDO_EXCEPTION . $query);
|
|
$this->errorStack[] = $msg;
|
|
$this->errorStack[] = $p->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
$this->logger->warn($p->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
consoleLog($this->res, CON_ERROR, $p->getMessage());
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false; // return control to calling client if the query failed
|
|
}
|
|
|
|
// next, get the record count
|
|
$query = 'SELECT /* ' . basename(__FILE__) . COLON_NS . __LINE__ . ' */ count(*) AS rowCount ';
|
|
$query .= 'FROM ' . $table;
|
|
|
|
try {
|
|
// fetchColumn returns the value of a single column of a single row
|
|
$this->recordsInSourceTable = $dbLink->query($query)->fetchColumn();
|
|
$this->state = STATE_SUCCESS;
|
|
$status = true;
|
|
} catch (PDOException | Throwable $p) {
|
|
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_PDO_EXCEPTION, $query);
|
|
$this->errorStack[] = $msg;
|
|
$this->errorStack[] = $p->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
$this->logger->warn($p->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
consoleLog($this->res, CON_ERROR, $p->getMessage());
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
}
|
|
} elseif ($this->schema == STRING_MONGO) {
|
|
// get the mongo table count
|
|
/** @var MongoDB\Driver\Manager $dbLink */
|
|
$dbLink = $this->resRemote;
|
|
$db = $this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE];
|
|
try {
|
|
$command = new MongoDB\Driver\Command(["count" => $table, "query" => []]);
|
|
$this->recordsInSourceTable = (int) $dbLink->executeCommand($db, $command);
|
|
$this->state = STATE_SUCCESS;
|
|
$status = true;
|
|
} catch (MongoDB\Driver\Exception\InvalidArgumentException | MongoDB\Driver\Exception\Exception | Throwable $e) {
|
|
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MONGO_EXCEPTION . $e->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
}
|
|
}
|
|
return $status;
|
|
}
|
|
|
|
|
|
/**
|
|
* remoteConnect() -- private method
|
|
*
|
|
* This method takes no input parameters and returns a boolean value indicating success or failure of being able
|
|
* to connect to the remote source database.
|
|
*
|
|
* This method handles all supported schemas which currently are:
|
|
*
|
|
* -- mysql
|
|
* -- mongo
|
|
*
|
|
* The class members have pre-loaded the XML configuration which identifies the resource location and, from this
|
|
* configuration we attempt to build the correct resource connector and then attempt to make a connection to the
|
|
* remote service.
|
|
*
|
|
* All connection requests are exception wrapped and trapped and errors will be logged to both the class error
|
|
* stack and the console log.
|
|
*
|
|
* Remote connection information, regardless of schema-type, are stored in class member variables on success.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @return bool
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 01-25-18 mks _INF-139: original coding
|
|
*
|
|
*/
|
|
private function remoteConnect(): bool
|
|
{
|
|
$this->remoteAvailable = false;
|
|
|
|
// build the resource DSN
|
|
if ($this->brokerData[MIGRATION_SOURCE_SCHEMA] == STRING_MYSQL or $this->schema == STRING_MYSQL) {
|
|
$db = 'dbname=' . $this->config[CONFIG_SCHEMA_MYSQL][CONFIG_DATABASE];
|
|
$pdoAttributes = [
|
|
PDO::ATTR_EMULATE_PREPARES => false,
|
|
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
|
|
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC
|
|
];
|
|
$host = $this->config[CONFIG_SCHEMA_MYSQL][STRING_HOST];
|
|
$port = $this->config[CONFIG_SCHEMA_MYSQL][STRING_PORT];
|
|
$user = $this->config[CONFIG_SCHEMA_MYSQL][STRING_USER];
|
|
$pass = $this->config[CONFIG_SCHEMA_MYSQL][STRING_PASS];
|
|
|
|
// connect to remote mysql installation via PDO
|
|
$PDOConnectString = 'mysql:host=' . $host . COLON_NS . $port . SEMI . $db . SEMI . $this->config[CONFIG_SCHEMA_MYSQL][CONFIG_DATABASE_PDO_CHARSET];
|
|
if (gasConfig::$settings[CONFIG_DEBUG]) {
|
|
consoleLog($this->res, CON_DEBUG, 'PDO-Connection: ' . $PDOConnectString);
|
|
}
|
|
|
|
try {
|
|
$res = new PDO($PDOConnectString, $user, $pass, $pdoAttributes);
|
|
$this->resRemote =& $res;
|
|
$this->remoteAvailable = true;
|
|
$this->whSourceURI = $PDOConnectString;
|
|
} catch (PDOException | Throwable $e) {
|
|
$msg = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_PDO_CONNECT;
|
|
$this->errorStack[] = $msg;
|
|
$this->errorStack[] = $e->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->error($msg);
|
|
$this->logger->error($e->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
consoleLog($this->res, CON_ERROR, $e->getMessage());
|
|
}
|
|
}
|
|
} elseif ($this->brokerData[MIGRATION_SOURCE_SCHEMA] == STRING_MONGO or $this->schema == STRING_MONGO) {
|
|
$mongoDSN = MONGO_DSN;
|
|
if (1 === $this->config[CONFIG_SCHEMA_MONGO][CONFIG_USER_SECURITY]) {
|
|
// todo -- evaluate the connectivity of the DSN with COLON_NS as opposed to COLON
|
|
$mongoDSN .= $this->config[CONFIG_SCHEMA_MONGO][STRING_USER] . COLON_NS;
|
|
$mongoDSN .= $this->config[CONFIG_SCHEMA_MONGO][STRING_PASS] . AT;
|
|
$authSource = $this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE_MONGODB_AUTH_SOURCE];
|
|
$mongoOptions[CONFIG_DATABASE_MONGODB_AUTH_SOURCE] = $authSource;
|
|
}
|
|
|
|
// get the heartbeat and readPreferences configuration
|
|
$mongoOptions[CONFIG_DATABASE_MONGODB_HB] = intval($this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE_MONGODB_HB]);
|
|
$mongoOptions[CONFIG_DATABASE_MONGODB_RP] = $this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE_MONGODB_RP];
|
|
|
|
// get the remote configuration: standAlone, replSet or shard?
|
|
switch ($this->config[CONFIG_SCHEMA_MONGO][CONFIG_INSTANCE_TYPE]) {
|
|
case CONFIG_INSTANCE_TYPE_INST :
|
|
// stand-alone instance
|
|
$mongoDSN .= $this->config[CONFIG_SCHEMA_MONGO][STRING_HOST] . COLON_NS;
|
|
$mongoDSN .= $this->config[CONFIG_SCHEMA_MONGO][STRING_PORT];
|
|
break;
|
|
|
|
case CONFIG_INSTANCE_TYPE_REPL :
|
|
// replication set
|
|
$mongoOptions[MONGO_REPL_SET] = $this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE_MONGODB_REPLSET_NAME];
|
|
foreach ($this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE_MONGODB_REPLSET_DSN][CONFIG_DATABASE_MONGODB_ADMIN_REPLSET_SET] as $node) {
|
|
$mongoDSN .= $node . COMMA_NS;
|
|
}
|
|
$mongoDSN = rtrim($mongoDSN, COMMA_NS);
|
|
break;
|
|
|
|
case CONFIG_INSTANCE_TYPE_SHARD :
|
|
foreach ($this->config[CONFIG_SCHEMA_MONGO][CONFIG_DATABASE_MONGODB_SHARDING_MONGOS_NODES] as $node) {
|
|
$mongoDSN .= $node . COMMA_NS;
|
|
}
|
|
$mongoDSN = rtrim($mongoDSN, COMMA_NS);
|
|
break;
|
|
}
|
|
|
|
// connect to the remote mongo service
|
|
try {
|
|
$res = new MongoDB\Driver\Manager($mongoDSN, $mongoOptions);
|
|
if (!is_object($res)) {
|
|
consoleLog($this->res, CON_ERROR, ERROR_MONGO_CONNECT);
|
|
consoleLog($this->res, CON_ERROR, basename(__FILE__) . COLON_NS . __LINE__ . COLON . $mongoDSN);
|
|
} else {
|
|
$this->resRemote =& $res;
|
|
$this->remoteAvailable = true;
|
|
$this->whSourceURI = $mongoDSN;
|
|
}
|
|
} catch (MongoDB\Driver\Exception\ConnectionException |
|
|
MongoDB\Driver\Exception\InvalidArgumentException |
|
|
MongoDB\Driver\Exception\RuntimeException |
|
|
MongoDB\Driver\Exception\Exception |
|
|
Throwable $e) {
|
|
$msg = ERROR_MONGO_EXCEPTION . $e->getMessage();
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
} else {
|
|
// todo -- error: unknown schema
|
|
$msg = sprintf(ERROR_MIGRATION_SCHEMA_UNKNOWN, $this->brokerData[MIGRATION_SOURCE_SCHEMA]);
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . $msg;
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
return false;
|
|
}
|
|
return $this->remoteAvailable;
|
|
}
|
|
|
|
|
|
/**
|
|
* validateMigrationData -- private method
|
|
*
|
|
* This method is called from the class constructor and required one input parameter:
|
|
*
|
|
* The method validates the broker-data payload which was stored in a class variable in the constructor. If
|
|
* any errors are found, the class error-stack container is updated with the error message.
|
|
*
|
|
* Once all the validation checks have run (with the exception of the check of there being NO data which returns
|
|
* a Boolean(false) immediately), then we check the current error stack and return a false if it has any events.
|
|
*
|
|
* The broker-data container is not modified in this method.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @param array $_data
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 01-24-18 mks _INF-139: original coding
|
|
*
|
|
*/
|
|
private function validateMigrationData(array $_data): bool
|
|
{
|
|
if (!is_array($_data) or empty($_data)) {
|
|
$this->errorStack[] = ERROR_MIGRATION_DATA;
|
|
return false;
|
|
}
|
|
// validate the broker data - ensure the minimum data is present and that optional data is the correct type
|
|
if (!array_key_exists(MIGRATION_SOURCE_SCHEMA, $_data)) {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_SOURCE_SCHEMA;
|
|
}
|
|
if (!array_key_exists(MIGRATION_SOURCE_TABLE, $_data)) {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_DEST_TABLE;
|
|
}
|
|
if (!array_key_exists(MIGRATION_DEST_SCHEMA, $_data)) {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_DEST_SCHEMA;
|
|
}
|
|
if (!array_key_exists(MIGRATION_DEST_TABLE, $_data)) {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_DEST_TABLE;
|
|
}
|
|
if (array_key_exists(MIGRATION_FILTER_SOFT_DELETES, $_data) and !is_bool(boolval($_data[MIGRATION_FILTER_SOFT_DELETES]))) {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_MIGRATION_DATA_FIELD_TYPE, DATA_TYPE_BOOL, MIGRATION_FILTER_SOFT_DELETES, $_data[MIGRATION_FILTER_SOFT_DELETES]);
|
|
} elseif (array_key_exists(MIGRATION_FILTER_SOFT_DELETES, $_data) and true === boolval($_data[MIGRATION_FILTER_SOFT_DELETES])) {
|
|
$this->filterSoftDeletes = true;
|
|
}
|
|
|
|
// validate the source and destination schemas
|
|
if (!in_array($_data[MIGRATION_SOURCE_SCHEMA], $this->allowedSchemas)) {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_MIGRATION_SCHEMA_UNKNOWN, MIGRATION_SOURCE_SCHEMA);
|
|
}
|
|
if (!in_array($_data[MIGRATION_DEST_SCHEMA], $this->allowedSchemas)) {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_MIGRATION_SCHEMA_UNKNOWN, MIGRATION_DEST_SCHEMA);
|
|
}
|
|
|
|
// check for testMode
|
|
if (isset($_data[MIGRATION_TEST_MODE])) {
|
|
switch ($_data[MIGRATION_TEST_MODE]) {
|
|
case 1 :
|
|
case true :
|
|
$this->testMode = true;
|
|
// if we're missing the test record count in the XML, set the record limit to 10
|
|
if (!isset($this->config[CONFIG_MIGRATION_NUM_TEST_RECS]) or empty($this->config[CONFIG_MIGRATION_NUM_TEST_RECS])) {
|
|
$this->config[CONFIG_MIGRATION_NUM_TEST_RECS] = 10;
|
|
}
|
|
break;
|
|
case 0 :
|
|
case false :
|
|
$this->testMode = false;
|
|
break;
|
|
default :
|
|
$msg = sprintf(ERROR_MIGRATION_DATA_FIELD_TYPE, 'boolean', gettype($_data[MIGRATION_TEST_MODE]));
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . $msg;
|
|
break;
|
|
}
|
|
}
|
|
// return a boolean value indicating data validation success or failure
|
|
return (empty($this->errorStack));
|
|
}
|
|
|
|
|
|
/**
|
|
* validateTemplateMigrationData() -- private method
|
|
*
|
|
* This method is called from getNamasteObjects() right after the Namaste class template is successfully
|
|
* instantiated. The required template data has already been read-in and assigned to members within the
|
|
* current class.
|
|
*
|
|
* The method validates the three migration parameters from the destination template:
|
|
*
|
|
* 1. If the current request is filtering migrations based on soft-deletes, then we need to validate that
|
|
* the SOURCE status column is present in the current schema.
|
|
* 2. Validation on the mandatory SOURCE sort key (column) is done by ensuring that the column name also
|
|
* appears in the current schema.
|
|
* 3. That the migration map exists as a member element and is in array format and that every non-null value
|
|
* in the associative array exists in the current schema.
|
|
*
|
|
* Item 1 (above) is optional and will only proc if the MIGRATION_DELETED flag was set in the broker request
|
|
* data to Boolean(false).
|
|
*
|
|
* If Item 1 procs, then both the status and sort keys must be present in the current schema, as well as all
|
|
* value declared in the associative-array migration map, for the method to return a Boolean(true) value.
|
|
*
|
|
* Otherwise, Items 2 and 3 must be present in order to return a Boolean(true) value.
|
|
*
|
|
* Anything else will generate an error - an error event message will be recorded to the class error stack and
|
|
* a Boolean(false) value will be returned.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 02-01-16 mks _INF-139: original coding
|
|
*
|
|
*/
|
|
private function validationTemplateMigrationData(): bool
|
|
{
|
|
$rc = true; // return code (bool)
|
|
|
|
// basic validation - requiring all the stuffs to be present
|
|
if (!isset($this->migrationMap) or !is_array($this->migrationMap) or empty($this->migrationMap)) {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . STRING_MIGRATION_MAP;
|
|
$rc = false;
|
|
}
|
|
if (!isset($this->migrationSortKey) or empty($this->migrationSortKey)) {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_SOURCE_SORT_KEY;
|
|
$rc = false;
|
|
}
|
|
// status (array) required only if we're filtering soft-deleted records
|
|
if (array_key_exists(MIGRATION_FILTER_SOFT_DELETES, $this->brokerData) and true === boolval($this->brokerData[MIGRATION_FILTER_SOFT_DELETES])) {
|
|
if (!isset($this->migrationStatusKV) or empty($this->migrationStatusKV) or !is_array($this->migrationStatusKV)) {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_SOURCE_STATUS_KV;
|
|
$rc = false;
|
|
}
|
|
}
|
|
// validate that the named fields in the template exist in the current schema dump from the SOURCE
|
|
if ($rc and $this->schema == STRING_MYSQL) {
|
|
// if we're supporting filtering of soft-deletes, validate the status field
|
|
if (array_key_exists(MIGRATION_FILTER_SOFT_DELETES, $this->brokerData) and true === boolval($this->brokerData[MIGRATION_FILTER_SOFT_DELETES])) {
|
|
$rc = (false === array_search(key($this->migrationStatusKV), $this->mysqlFieldList)) ? false : true;
|
|
if (!$rc) {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_DATA_INVALID_KEY, key($this->migrationStatusKV));
|
|
$this->filterSoftDeletes = false; // reset this value to default
|
|
}
|
|
}
|
|
// validate that the required sort field is in the schema
|
|
if ($rc) {
|
|
if (!in_array($this->migrationSortKey, $this->mysqlFieldList)) {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . ERROR_MIGRATION_DATA_FIELD . MIGRATION_SOURCE_SORT_KEY;
|
|
$rc = false;
|
|
}
|
|
}
|
|
// validate the non-null fields in the migration map as being present in the source mysql;
|
|
if ($rc) {
|
|
$goodFields = true;
|
|
foreach ($this->migrationMap as $key => $value) {
|
|
if ($value != null and !in_array($value, $this->mysqlFieldList)) {
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_MIGRATION_DATA_FIELD_UNK, $value);
|
|
$goodFields = false;
|
|
}
|
|
}
|
|
$rc = $goodFields;
|
|
}
|
|
} elseif ($this->schema == STRING_MONGO) {
|
|
$rc = true;
|
|
// todo -- write this code once we know what fetched mongo-schema looks like...if we can ever know...
|
|
} else {
|
|
// error: unsupported schema
|
|
$this->errorStack[] = basename(__FILE__) . COLON_NS . __LINE__ . COLON . sprintf(ERROR_SCHEMA_NOT_SUPPORTED, $this->mysqlSourceSchema);
|
|
$rc = false;
|
|
}
|
|
return $rc;
|
|
}
|
|
|
|
|
|
/**
|
|
* updateWHMetaData() -- private method
|
|
*
|
|
* This method has an optional input parameter:
|
|
*
|
|
* $_data -- used to update the destination record, an assoc array containing the k->v pairs of new columns
|
|
*
|
|
* The general assumption is that this method will be invoked from several places/times:
|
|
*
|
|
* -- when we're looping through the WH data -- each iteration will prompt a call to either create or update
|
|
* -- when we're done processing WH data and we want to close the record
|
|
*
|
|
* We instantiate a brokerClass object to the admin-out queue and publish the request.
|
|
*
|
|
* The method returns a boolean indicating the success/fail of the remote update request.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @param array|null $_data
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 05-23-18 mks _INF-188: original coding completed
|
|
*
|
|
*/
|
|
private function updateWHMeta(array $_data = null): bool
|
|
{
|
|
// if we passed data (update) use it - o/w, grab all the data for a new-record create
|
|
$data = (is_null($_data)) ? $this->objWarehouseMeta->getData() : $_data;
|
|
if (is_null($data)) {
|
|
$msg = ERROR_DATA_MISSING_ARRAY . STRING_WH_META_DATA;
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_DATA_ERROR;
|
|
return false;
|
|
}
|
|
try {
|
|
/** @var gacBrokerClient $bc */
|
|
$bc = $this->getRabbitClient();
|
|
} catch (TypeError $t) {
|
|
$msg = ERROR_BROKER_CLIENT_DECLARE . BROKER_QUEUE_AO;
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_RESOURCE_ERROR_RABBIT;
|
|
if (is_object($bc)) {
|
|
$bc->__destruct();
|
|
unset($bc);
|
|
}
|
|
return false;
|
|
}
|
|
if (!$bc->status) {
|
|
$msg = ERROR_BROKER_CLIENT_DECLARE . BROKER_QUEUE_AO;
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
if (is_object($bc)) {
|
|
$bc->__destruct();
|
|
unset($bc);
|
|
}
|
|
return false;
|
|
}
|
|
try {
|
|
// we have a broker client - create the request payload, submit the request, and parse the result
|
|
if (!isset($this->adminWHMetaData[META_EVENT_GUID]))
|
|
$this->adminWHMetaData[META_EVENT_GUID] = $this->objWarehouseMeta->getColumn(DB_EVENT_GUID);
|
|
|
|
if (!is_null($_data)) {
|
|
$payload = [
|
|
STRING_QUERY_DATA => [ DB_TOKEN => [ OPERAND_NULL => [ OPERATOR_EQ => [$this->objWarehouseMeta->getColumn(DB_TOKEN)]]]],
|
|
STRING_UPDATE_DATA => $data
|
|
];
|
|
$be = BROKER_REQUEST_ADMIN_MWH_EVENT_UPDATE;
|
|
} else {
|
|
$payload = $data;
|
|
$be = BROKER_REQUEST_ADMIN_MWH_EVENT_CREATE;
|
|
}
|
|
$request = [
|
|
BROKER_REQUEST => $be,
|
|
BROKER_DATA => $payload,
|
|
BROKER_META_DATA => $this->adminWHMetaData
|
|
];
|
|
$payload = gzcompress(json_encode($request));
|
|
$response = $bc->call($payload);
|
|
$response = json_decode(gzuncompress($response), true);
|
|
if (empty($response) or !is_array($response) or !$response[PAYLOAD_STATUS]) {
|
|
$msg = sprintf(ERROR_BROKER_INTERNAL_CALL, $request[BROKER_REQUEST], BROKER_QUEUE_AO);
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
if (is_object($bc)) {
|
|
$bc->__destruct();
|
|
unset($bc);
|
|
}
|
|
return false;
|
|
}
|
|
$this->objWarehouseMeta->addData($response[PAYLOAD_RESULTS]);
|
|
} catch (Throwable $t) {
|
|
$msg = sprintf(ERROR_THROWABLE_EXCEPTION, $t->getMessage());
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available)
|
|
$this->logger->warn($msg);
|
|
else
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
$this->state = STATE_DATA_ERROR;
|
|
if (is_object($bc)) {
|
|
$bc->__destruct();
|
|
unset($bc);
|
|
}
|
|
return false;
|
|
}
|
|
if (is_object($bc)) {
|
|
$bc->__destruct();
|
|
unset($bc);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
/**
|
|
* getRabbitClient() -- private method
|
|
*
|
|
* This method is used by the class methods that need to publish migration-meta-data updates to the admin service.
|
|
*
|
|
* The method accepts an optional parameter - the queue designation which defaults to the AdminOut queue.
|
|
* Validation of the input-queue is handled during instantiation.
|
|
*
|
|
* All this method does is instantiate a new gacBrokerClient class object -- if the instantiation is successful,
|
|
* then the object is returned to the calling client. If not, a null is returned and error messages are generated
|
|
* and assigned/output.
|
|
*
|
|
* Function was created to reduce overall code footprint.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @param string $_queue
|
|
* @return gacBrokerClient|null
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 02-12-18 mks _INF-139: original coding
|
|
*
|
|
*/
|
|
private function getRabbitClient(string $_queue = BROKER_QUEUE_AO): ? gacBrokerClient
|
|
{
|
|
try {
|
|
// create a broker-client class for the query
|
|
$bc = new gacBrokerClient($_queue);
|
|
} catch (Throwable $t) {
|
|
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return null;
|
|
}
|
|
if (!is_object($bc) or !$bc->status) {
|
|
$msg = ERROR_BROKER_CLIENT_DECLARE . $_queue;
|
|
consoleLog($this->res, CON_SYSTEM, $msg);
|
|
$this->errorStack[] = $msg;
|
|
if (is_object($bc)) $bc->__destruct();
|
|
unset($bc);
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return null;
|
|
}
|
|
return $bc;
|
|
}
|
|
|
|
|
|
/**
|
|
* publishMigrationEvent() -- private method
|
|
*
|
|
* This method requires two input parameters:
|
|
*
|
|
* $_request: the associative array we're going to morph into an AMQP message.
|
|
* $_response: a call-by-reference parameter that will return the broker payload received from the admin service
|
|
*
|
|
* There's no validation for the input data as any errors will be handled and recorded in the broker event.
|
|
*
|
|
* The method returns a boolean value indicating whether or not the request was successfully published and
|
|
* returned back a payload. (Note: The payload itself may contain errors but processing those errors is
|
|
* beyond the scope of this method.)
|
|
*
|
|
* We also instantiate a gacBrokerClient to handle the AMQP request submission to the Admin service.
|
|
*
|
|
* This method was written to reduce the code footprint.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @param array $_request
|
|
* @param array $_response
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 02-12-18 mks _INF-139: original coding
|
|
*
|
|
*/
|
|
private function publishMigrationEvent(array $_request, array &$_response = []): bool
|
|
{
|
|
$bc = null;
|
|
try {
|
|
// get the broker client we'll use to publish the update/save request to the remote admin service
|
|
if (is_null($bc = $this->getRabbitClient())) return false;
|
|
|
|
$payload = gzcompress(json_encode($_request));
|
|
$_response = json_decode(gzuncompress($bc->call($payload)), true);
|
|
if (!is_array($_response) or !$_response[PAYLOAD_STATUS]) {
|
|
$msg = ERROR_BROKER_RESPONSE_BAD;
|
|
$this->errorStack[] = $msg;
|
|
if (is_array($_response[PAYLOAD_DIAGNOSTICS])) {
|
|
$this->errorStack = array_merge($this->errorStack, $_response[PAYLOAD_DIAGNOSTICS]);
|
|
}
|
|
$this->state = STATE_AMQP_ERROR;
|
|
if (is_object($bc)) $bc->__destruct();
|
|
unset($bc);
|
|
return false;
|
|
}
|
|
} catch (Throwable $t) {
|
|
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
if (is_object($bc)) $bc->__destruct();
|
|
unset($bc);
|
|
return false;
|
|
}
|
|
if (is_object($bc)) $bc->__destruct();
|
|
unset($bc);
|
|
return true;
|
|
}
|
|
|
|
|
|
/**
|
|
* processMongoResults() -- private method
|
|
*
|
|
* This method is a single-use/purpose method meant to calculate the results from a mongoDB bulk-write insert
|
|
* invoked as part of the create-record process during migration. It's only called from the migrateData()
|
|
* function and was broken out to reduce the calling function's code footprint.
|
|
*
|
|
* There are two required inputs to the method:
|
|
*
|
|
* $_widget - this is a copy of the class $widget that was created in the parent function
|
|
* $_mappedResults -- an array of the records submitted for insertion to mongoDB
|
|
*
|
|
* The algorithm is to fetch the bulk-write results from the widget (a gacMongoDB object) post _createRecord()
|
|
* and then extract data about the bulk-write operation (records created), and from data grab the last record
|
|
* written (in case we have have to resume processing), and check for dropped records. Data about the write
|
|
* event is posted to the migration (tracking) object.
|
|
*
|
|
* The method will return a Boolean(false) value if the bulk-write object returned null from the create method,
|
|
* or if we generated additional errors when parsing the return data via other method calls.
|
|
*
|
|
* Otherwise, the method returns a Boolean(true) value.
|
|
*
|
|
*
|
|
* @param gacMongoDB $_widget
|
|
* @param array $_mappedResults
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 02-18-18 mks _INF-139: original coding
|
|
* 03-27-18 mks CORE-852: corrected errors in method header comment block, fixed last-written-record fetch
|
|
*
|
|
*/
|
|
private function processMongoResults(gacMongoDB $_widget, array $_mappedResults): bool
|
|
{
|
|
// fetch number of records inserted in this pass and add it to the migration meta data
|
|
$rc = count($_mappedResults);
|
|
$esc = count($this->errorStack); // error stack count
|
|
|
|
try {
|
|
/** @var MongoDB\Driver\WriteResult $bwr */
|
|
$bwr = $_widget->getBWResults();
|
|
if (!is_null($bwr)) {
|
|
$rw = $bwr->getInsertedCount(); // count of Records Written
|
|
// if the record count is greater-than the count of records written....
|
|
if ($rc > $rw) {
|
|
// grab what we think is the last record written and encode it
|
|
$this->lastRecordWritten = json_encode($_mappedResults[($rc - 1)]);
|
|
// store it in the migration object
|
|
@$this->objMigrate->insertField(MWH_LAST_REC_WRITTEN, $this->lastRecordWritten, 0, $this->errorStack, $this->res);
|
|
// update the dropped record count
|
|
$dr = $this->objMigrate->getColumn(MWH_NUM_RECS_DROPPED);
|
|
$tdr = $dr + ($rc - $rw); // total dropped record count
|
|
@$this->objMigrate->insertField(MWH_NUM_RECS_DROPPED, $tdr, 0, $this->errorStack, $this->res);
|
|
}
|
|
$cc = $this->objMigrate->getColumn(MWH_NUM_RECS_MOVED) + $rw;
|
|
@$this->objMigrate->insertField(MWH_NUM_RECS_MOVED, $cc, 0, $this->errorStack, $this->res);
|
|
} else {
|
|
$msg = sprintf(ERROR_DATA_OBJECT_EMPTY, STRING_BULK_WRITE);
|
|
$this->errorStack[] = $msg;
|
|
consoleLog($this->res, CON_SYSTEM, $msg);
|
|
return false;
|
|
}
|
|
unset($bwr);
|
|
} catch (Throwable $t) {
|
|
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
}
|
|
// return if we got errors setting data in the migrate object
|
|
if ($esc < count($this->errorStack)) return false;
|
|
return true;
|
|
}
|
|
|
|
|
|
/**
|
|
* processPDOResults() -- private method
|
|
*
|
|
* This method is called at the end of a write-cycle during migration when mySQL is the destination schema. The
|
|
* method is meant to update the migration record, keeping track of the migration progress and metrics.
|
|
*
|
|
* There are two input parameters to this method:
|
|
*
|
|
* $_widget -- a copy of the destination class widget (Namaste data object)
|
|
* $_mappedResults -- an array containing the list of records inserted into mySQL
|
|
*
|
|
* If we wrote less records than what's contained in the data array, the assumption is that some records were
|
|
* dropped during processing. If that's the case, then we want to capture the number of records written, dropped,
|
|
* and the last written record and post all this information to the migration record.
|
|
*
|
|
* If no records were dropped, just update the current record count of total number of inserted objects in the
|
|
* current (ongoing) migration effort.
|
|
*
|
|
* The method returns a Boolean value that's false if we generated any errors during this collection process,
|
|
* otherwise a Boolean(true) is returned.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @param gacPDO $_widget
|
|
* @param array $_mappedResults
|
|
* @return bool
|
|
*/
|
|
private function processPDOResults(gacPDO $_widget, array $_mappedResults): bool
|
|
{
|
|
$rc = count($_mappedResults); // total record count
|
|
$esc = count($this->errorStack); // error stack count
|
|
$rw = $_widget->recordsInserted; // records written
|
|
|
|
try {
|
|
if ($rc > $rw) {
|
|
// a record was dropped - grab the last record in the array as potentially the last record written
|
|
$this->lastRecordWritten = json_encode($_mappedResults[($rc - 1)]);
|
|
// store it in the migration object
|
|
@$this->objMigrate->insertField(MWH_LAST_REC_WRITTEN, $this->lastRecordWritten, 0, $this->errorStack, $this->res);
|
|
// update the dropped record counter
|
|
$dr = $this->objMigrate->getColumn(MWH_NUM_RECS_DROPPED);
|
|
$tdr = $dr + $_widget->recordsDropped; // total dropped records
|
|
@$this->objMigrate->insertField(MWH_NUM_RECS_DROPPED, $tdr, 0, $this->errorStack, $this->res);
|
|
}
|
|
$twr = $this->objMigrate->getColumn(MWH_NUM_RECS_MOVED) + $rw;
|
|
@$this->objMigrate->insertField(MWH_NUM_RECS_MOVED, $twr, 0, $this->errorStack, $this->res);
|
|
} catch (Throwable $t) {
|
|
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
}
|
|
// return false if any new errors were generated
|
|
return ($esc < count($this->errorStack)) ? false : true;
|
|
}
|
|
|
|
|
|
/**
|
|
* doMigration() -- private method
|
|
*
|
|
* This was the old constructor code before we introduced warehousing to the class. Once we introduced warehousing
|
|
* we determine the event type in the constructor and invoke the controller function specific to the event.
|
|
*
|
|
* There is one input parameter to this method and that's an array containing the broker data as received by
|
|
* the migration broker.
|
|
*
|
|
* The method returns is embedded in the class member variables to the return type is void.
|
|
*
|
|
* If any of the migration processing steps, represented by the invoked method, fail, then we'll return immediately
|
|
* passing the results straight back to the broker event code.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 04-10-18 mks _INF-188: code refactoring completed
|
|
*
|
|
*/
|
|
private function doMigration(): void
|
|
{
|
|
if (!isset($this->brokerData[MIGRATION_TEST_MODE])) $this->brokerData[MIGRATION_TEST_MODE] = 0;
|
|
$this->schema = $this->brokerData[MIGRATION_SOURCE_SCHEMA];
|
|
$this->destinationSchema = $this->brokerData[MIGRATION_DEST_SCHEMA];
|
|
$this->lastRecordWritten = null;
|
|
|
|
// reset state/status for the query
|
|
$this->state = STATE_VALIDATION_ERROR;
|
|
$this->status = false;
|
|
|
|
try {
|
|
// connect to the remote (source) schema service
|
|
if (!$this->remoteConnect()) return;
|
|
|
|
// get the count of the number of records to migrate (incidentally testing source params)
|
|
if (!$this->getTableBonafides()) return;
|
|
|
|
// instantiate the Namaste destination widget and the system migrations widget
|
|
if (!$this->getNamasteObjects()) return;
|
|
|
|
// make a call to the admin/migrations table and see if we're already done this migration or
|
|
// if it's a migration in-progress...
|
|
$this->fetchMigrationMetaData();
|
|
if ($this->state != STATE_SUCCESS) return;
|
|
$this->state = STATE_VALIDATION_ERROR;
|
|
|
|
// calculate the coverage of the migration
|
|
if (!$this->calculateCoverage()) return;
|
|
|
|
// call the method to do the data migration
|
|
if (!$this->migrateData()) return;
|
|
$this->state = STATE_SUCCESS;
|
|
$this->status = true;
|
|
} catch (Throwable $t) {
|
|
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* startWHRequest() -- private method
|
|
*
|
|
* This method has no input parameters and returns no value -- status and values are stored within the class member
|
|
* and, as such, should be evaluated by the calling client on return.
|
|
*
|
|
* The method validates the request basics:
|
|
*
|
|
* 1. do we have a data payload?
|
|
* 2. do we have meta data?
|
|
* 3. can we load the data template?
|
|
* 4. does the template have WH settings
|
|
*
|
|
* At this stage, once the above has been validated, then we call the method to validate the request details.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 04-24-18 mks _INF-188: original coding
|
|
*
|
|
*/
|
|
private function startWHRequest(): void
|
|
{
|
|
$this->status = false;
|
|
$debug = gasConfig::$settings[CONFIG_DEBUG];
|
|
|
|
// first thing to do is validate that we have a data payload
|
|
if (empty($this->brokerData) or !isset($this->brokerData)) {
|
|
$msg = ERROR_DATA_ARRAY_EMPTY . COLON . BROKER_DATA;
|
|
$this->errorStack[] = $msg;
|
|
$this->state = STATE_DATA_ERROR;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->error($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
return;
|
|
}
|
|
// next - validate the payload as a valid warehousing request
|
|
if (!isset($this->meta[META_TEMPLATE]) or empty($this->meta[META_TEMPLATE])) {
|
|
$msg = ERROR_META_CLIENT_404;
|
|
$this->errorStack[] = $msg;
|
|
$this->state = STATE_META_ERROR;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->error($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
return;
|
|
}
|
|
// load the template
|
|
try {
|
|
/** @var gatProductRegistrations $template */
|
|
$tc = STRING_CLASS_GAT . $this->meta[META_TEMPLATE];
|
|
$template = new $tc();
|
|
} catch (Throwable $t) {
|
|
$this->errorStack[] = ERROR_EXCEPTION;
|
|
$this->errorStack[] = $t->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->error(ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage());
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return;
|
|
}
|
|
$this->whSettings = $template->wareHouse;
|
|
if (!is_array($this->whSettings)) {
|
|
$msg = ERROR_DATA_ARRAY_EMPTY . COLON . STRING_WH_SETTINGS;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->error($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
$this->errorStack[] = $msg;
|
|
$this->state = STATE_TEMPLATE_ERROR;
|
|
return;
|
|
}
|
|
|
|
try {
|
|
// data validation is complete -- let's validate the query against the class template settings
|
|
if (!$this->validateWHRequest($debug)) return;
|
|
} catch (TypeError $t) {
|
|
$msg = ERROR_TYPE_EXCEPTION . COLON . $t->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->error($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* validateWHRequest() -- private method
|
|
*
|
|
* This method is called by the constructor on a new warehousing request. The method takes a single, optional,
|
|
* input parameter which is the current debug state. (The class does not normally access the framework's debug
|
|
* setting which is why the default is set to false.)
|
|
*
|
|
* The method is responsible for first-pass validation of the WH request -- and, importantly, if this source
|
|
* data for the request is remote to Namaste, we confirm that we can create a resource connection to the remote
|
|
* service.
|
|
*
|
|
* The validation stubs just ensure that the WH request doesn't violate individual rules set in the destination
|
|
* class template.
|
|
*
|
|
* We instantiate two other Namaste classes in this method and assign them to the following members:
|
|
*
|
|
* $widget -- this is the local data class instantiation specified in the meta data META_TEMPLATE value
|
|
* $objWarehouse -- this is the admin data class that records the wh operation progress
|
|
*
|
|
* The ultimate objective of this method is to generate these two objects so that, when control is returned to
|
|
* the calling client, we can pass control back to the admin WH broker, which returns the task GUID back to
|
|
* the requesting client -- then the broker will invoke the processing method in this class to perform the
|
|
* actual warehousing operation without blocking the original client.
|
|
*
|
|
* The method returns a boolean value - if the request passes initial validation, then we return a boolean(true),
|
|
* otherwise, a boolean(false) is returned.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @param bool $_debug
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 04-24-16 mks _INF-188: original coding completed
|
|
* 01-08-20 mks DB-150: fixed bug in assignment for $mysqlConfig and $mongoConfig
|
|
*/
|
|
private function validateWHRequest(bool $_debug = false): bool
|
|
{
|
|
$skipCheck = false;
|
|
$mysqlConfig = false;
|
|
$mongoConfig = false;
|
|
|
|
// check 1: Does the destination data class support warehousing?
|
|
if (!$this->whSettings[WH_SUPPORTED]) {
|
|
$error = sprintf(ERROR_WH_CLASS_NOT_SUPPORTED, $this->meta[META_TEMPLATE]);
|
|
$this->errorStack[] = $error;
|
|
$this->state = STATE_DATA_ERROR;
|
|
if ($_debug) $this->logger->debug($error);
|
|
return false;
|
|
}
|
|
// check 2: Check if the source is remote (to Namaste) and, if so, is the class auth to fetch remote data
|
|
if (isset($this->brokerData[WH_SOURCE_IS_REMOTE]) and $this->brokerData[WH_SOURCE_IS_REMOTE]) {
|
|
if (!$this->whSettings[WH_REMOTE_SUPPORT]) {
|
|
$error = sprintf(ERROR_WH_REMOTE_SOURCE_NOT_AUTH, $this->meta[META_TEMPLATE]);
|
|
$this->errorStack[] = $error;
|
|
$this->state = STATE_DATA_ERROR;
|
|
if ($_debug) $this->logger->debug($error);
|
|
return false;
|
|
}
|
|
// check 2.1: if this is a remote request, ensure we have a remote table name in the payload
|
|
if (!isset($this->brokerData[WH_REMOTE_TABLE]) or empty($this->brokerData[WH_REMOTE_TABLE])) {
|
|
$this->errorStack[] = ERROR_WH_REMOTE_SOURCE_404;
|
|
if ($_debug) $this->logger->debug(ERROR_WH_REMOTE_SOURCE_404);
|
|
$this->state = STATE_DATA_ERROR;
|
|
return false;
|
|
}
|
|
$this->remoteSource = true; // set the remote-source flag which we'll check later
|
|
}
|
|
// check 3: if this is an automated (cron-launched) request, validate that automation is supported
|
|
if (isset($this->brokerData[WH_AUTOMATED]) and $this->brokerData[WH_AUTOMATED]) {
|
|
if (false == $this->whSettings[WH_AUTOMATED]) {
|
|
$error = sprintf(ERROR_WH_CRON_NOT_SUPPORTED, $this->meta[META_TEMPLATE]);
|
|
$this->errorStack[] = $error;
|
|
$this->state = STATE_DATA_ERROR;
|
|
if ($_debug) $this->logger->debug($error);
|
|
return false;
|
|
}
|
|
$skipCheck = true;
|
|
}
|
|
// check 4: if this is an ad-hoc request, validate that the class supports such requests
|
|
if (!$this->whSettings[WH_DYNAMIC] and !$skipCheck) {
|
|
$error = sprintf(ERROR_WH_DYNAMIC_NOT_SUPPORTED, $this->meta[META_TEMPLATE]);
|
|
$this->errorStack[] = $error;
|
|
$this->state = STATE_DATA_ERROR;
|
|
if ($_debug) $this->logger->debug($error);
|
|
return false;
|
|
}
|
|
// check 5: WH_FILTER_VAL is required, should be an indexed array with two valid short-dates
|
|
if (!isset($this->brokerData[WH_FILTER_VALUES])) {
|
|
$error = ERROR_DATA_KEY_404 . WH_FILTER_VALUES;
|
|
$this->errorStack[] = $error;
|
|
if ($this->widget->debug) $this->logger->debug($error);
|
|
$this->state = STATE_DATA_ERROR;
|
|
return false;
|
|
} elseif (!is_array($this->brokerData[WH_FILTER_VALUES])) {
|
|
$error = ERROR_DATA_ARRAY_NOT_ARRAY . WH_FILTER_VALUES;
|
|
$this->errorStack[] = $error;
|
|
if ($this->widget->debug) $this->logger->debug($error);
|
|
$this->state = STATE_DATA_ERROR;
|
|
return false;
|
|
} elseif (count($this->brokerData[WH_FILTER_VALUES]) != 2) {
|
|
$error = sprintf(ERROR_DATA_RECORD_COUNT, 2, count($this->brokerData[WH_FILTER_VALUES]));
|
|
$this->errorStack[] = $error;
|
|
if ($this->widget->debug) $this->logger->debug($error);
|
|
$this->state = STATE_DATA_ERROR;
|
|
return false;
|
|
} elseif (!validateMigrationDate($this->brokerData[WH_FILTER_VALUES][0])) {
|
|
$error = sprintf(ERROR_DATE_INVALID, $this->brokerData[WH_FILTER_VALUES][0]);
|
|
$this->errorStack[] = $error;
|
|
$this->state = STATE_DATA_ERROR;
|
|
if ($_debug) $this->logger->debug($error);
|
|
return false;
|
|
} elseif (!validateMigrationDate($this->brokerData[WH_FILTER_VALUES][1])) {
|
|
$error = sprintf(ERROR_DATE_INVALID, $this->brokerData[WH_FILTER_VALUES][1]);
|
|
$this->errorStack[] = $error;
|
|
$this->state = STATE_DATA_ERROR;
|
|
if ($_debug) $this->logger->debug($error);
|
|
return false;
|
|
}
|
|
|
|
// check 6: if this is a remote source, then check for the migration URI configuration to be present
|
|
if ($this->remoteSource) {
|
|
if (!isset(gasConfig::$settings[CONFIG_MIGRATION]) or empty(gasConfig::$settings[CONFIG_MIGRATION]) or !is_array(gasConfig::$settings[CONFIG_MIGRATION])) {
|
|
$error = ERROR_WH_REMOTE_MIG_CFG_404;
|
|
$this->errorStack[] = $error;
|
|
$this->state = STATE_DATA_ERROR;
|
|
if ($_debug) $this->logger->debug($error);
|
|
return false;
|
|
}
|
|
$mysqlConfig = (isset(gasConfig::$settings[CONFIG_MIGRATION][CONFIG_SCHEMA_MYSQL]) and is_array(gasConfig::$settings[CONFIG_MIGRATION][CONFIG_SCHEMA_MYSQL])) ? gasConfig::$settings[CONFIG_MIGRATION][CONFIG_SCHEMA_MYSQL] : new stdClass();
|
|
$mongoConfig = (isset(gasConfig::$settings[CONFIG_MIGRATION][CONFIG_SCHEMA_MONGO]) and is_array(gasConfig::$settings[CONFIG_MIGRATION][CONFIG_SCHEMA_MONGO])) ? gasConfig::$settings[CONFIG_MIGRATION][CONFIG_SCHEMA_MONGO] : new stdClass();
|
|
}
|
|
// check 7: if we have test-mode set in the meta, or we're in debug mode, override the warehouse default setting
|
|
if ((isset($this->brokerData[MWH_TEST_MODE]) and $this->brokerData[MWH_TEST_MODE]) or $_debug) {
|
|
$this->whSettings[WH_DELETE] = 'T';
|
|
}
|
|
|
|
// if we get to this point, the request is valid in terms of permissions/authorizations/scope,
|
|
// next, if an override query was submitted, we have to validate the query: (will it build?).
|
|
$widget = new gacFactory($this->meta, FACTORY_EVENT_NEW_CLASS, '', $this->errorStack);
|
|
if (!$widget->status) {
|
|
$error = ERROR_TEMPLATE_INSTANTIATE . $this->meta[META_TEMPLATE];
|
|
$this->errorStack[] = $error;
|
|
$this->state = STATE_TEMPLATE_ERROR;
|
|
if ($_debug) $this->logger->debug($error);
|
|
return false;
|
|
}
|
|
$this->widget = $widget->widget;
|
|
if (is_object($widget)) $widget->__destruct();
|
|
unset($widget);
|
|
|
|
// assign the source schema
|
|
switch ($this->widget->template->schema) {
|
|
case CONFIG_DATABASE_PDO :
|
|
case STRING_MYSQL:
|
|
$this->schema = STRING_MYSQL;
|
|
$lTime = $this->brokerData[WH_FILTER_VALUES][0] . TIME_ZEROS;
|
|
$rTime = $this->brokerData[WH_FILTER_VALUES][1] . TIME_ZEROS;
|
|
if ($this->remoteSource) {
|
|
$this->brokerData[MIGRATION_SOURCE_SCHEMA] = STRING_MYSQL;
|
|
$this->remoteSourceConfig = $mysqlConfig;
|
|
if (!$mysqlConfig) {
|
|
$this->errorStack[] = ERROR_WH_REMOTE_MYSQL_CFG_404;
|
|
$this->state = STATE_CONFIG_ERROR;
|
|
if ($_debug) $this->logger->debug(ERROR_WH_REMOTE_MYSQL_CFG_404);
|
|
return false;
|
|
}
|
|
}
|
|
break;
|
|
|
|
case CONFIG_DATABASE_MONGODB :
|
|
case STRING_MONGO :
|
|
case TEMPLATE_DB_MONGO :
|
|
$this->schema = STRING_MONGO;
|
|
// convert to epoch time if the source schema is mongo
|
|
$lTime = unPrettyTime($this->brokerData[WH_FILTER_VALUES][0] . TIME_ZEROS);
|
|
$rTime = unPrettyTime($this->brokerData[WH_FILTER_VALUES][1] . TIME_ZEROS);
|
|
if ($this->remoteSource) {
|
|
$this->brokerData[MIGRATION_SOURCE_SCHEMA] = STRING_MONGO;
|
|
$this->remoteSourceConfig = $mongoConfig;
|
|
if (!$mongoConfig) {
|
|
$this->errorStack[] = ERROR_WH_REMOTE_MONGO_CFG_404;
|
|
$this->state = STATE_CONFIG_ERROR;
|
|
if ($_debug) $this->logger->debug(ERROR_WH_REMOTE_MONGO_CFG_404);
|
|
return false;
|
|
}
|
|
}
|
|
break;
|
|
|
|
default :
|
|
$error = sprintf(ERROR_SCHEMA_NOT_SUPPORTED, $this->widget->template->schema);
|
|
$this->errorStack[] = $error;
|
|
if ($_debug) $this->logger->debug($error);
|
|
$this->state = STATE_DATA_ERROR;
|
|
return false;
|
|
break;
|
|
}
|
|
|
|
// using the payload query-filter value, create the source query
|
|
$field = ($this->remoteSource) ? $this->brokerData[WH_REMOTE_CDATE_FIELD] : DB_CREATED;
|
|
$query[$field][OPERAND_AND] = [ OPERATOR_GT => [$lTime], OPERATOR_LT => [$rTime]];
|
|
$this->nQuery = $query;
|
|
|
|
try {
|
|
// check that we can build the namaste (local) query
|
|
if (!$this->remoteSource and !$this->widget->_checkQuery($query)) {
|
|
$error = ERROR_DATA_QUERY_BUILD;
|
|
$this->errorStack[] = $error;
|
|
$this->state = STATE_TEMPLATE_ERROR;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->error($error);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
}
|
|
return false;
|
|
} elseif ($this->remoteSource and $this->schema == STRING_MYSQL) {
|
|
$query = $field . " > '" . $lTime . "' AND " . $field . " < '" . $rTime . "' ";
|
|
}
|
|
$this->whereClause = $query; // stash the built where-clause
|
|
$this->status = false; // reset from _checkQuery()
|
|
} catch (Throwable $t) {
|
|
$error = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $error;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->error($error);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
// set the query limit for the source-fetch, overriding the framework defaults using undocumented feature
|
|
$this->meta[META_LIMIT_OVERRIDE] = gasConfig::$settings[CONFIG_BROKER_SERVICES][CONFIG_WH_RECS_PER_XFER];
|
|
|
|
// load the migration map into the widget class member
|
|
if (!isset($this->widget->template->migrationMap)) {
|
|
$error = ERROR_MIGRATION_MAP_404 . COLON . $this->meta[META_TEMPLATE];
|
|
$this->errorStack[] = $error;
|
|
$this->state = STATE_TEMPLATE_ERROR;
|
|
if ($_debug) $this->logger->debug($error);
|
|
return false;
|
|
}
|
|
|
|
try {
|
|
// if source is remote, check the connection
|
|
if ($this->remoteSource and !$this->remoteConnect()) return false; // check val of remoteAvailable
|
|
|
|
// we need to return a GUID back to the event requester so that they're not blocked the entire time we're
|
|
// doing the actual warehousing operation.
|
|
// instantiate and populate the warehousing object that will contain the progress/final reports of the request.
|
|
$widget = new gacFactory([META_TEMPLATE => TEMPLATE_CLASS_WAREHOUSE], FACTORY_EVENT_NEW_CLASS, '', $this->errorStack);
|
|
if (!$widget->status) {
|
|
$error = ERROR_TEMPLATE_INSTANTIATE . TEMPLATE_CLASS_WAREHOUSE;
|
|
$this->errorStack[] = $error;
|
|
if ($_debug) $this->logger->debug($error);
|
|
$this->state = STATE_TEMPLATE_ERROR;
|
|
return false;
|
|
}
|
|
$this->objWarehouseMeta = $widget->widget;
|
|
if (is_object($widget)) $widget->__destruct();
|
|
unset($widget);
|
|
|
|
// build the wh meta data and add to the wh obj
|
|
$whData = [
|
|
MWH_SOURCE_SCHEMA => $this->schema,
|
|
MWH_SOURCE_TABLE => (($this->remoteSource) ? $this->brokerData[WH_REMOTE_TABLE] : $this->widget->collectionName),
|
|
MWH_DEST_SCHEMA => $this->schema,
|
|
MWH_DEST_TABLE => ($this->widget->template->whTemplate . $this->widget->template->extension),
|
|
MWH_QUERY => $this->widget->strQuery,
|
|
MWH_QUERY_DATA => json_encode([$lTime, $rTime]),
|
|
DB_EVENT_GUID => $this->meta[META_EVENT_GUID]
|
|
];
|
|
if (!$this->objWarehouseMeta->addData([$whData])) {
|
|
$error = ERROR_DATA_ARRAY_ADD . $this->objWarehouseMeta->class;
|
|
$this->errorStack[] = $error;
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->error($error);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
}
|
|
return false;
|
|
}
|
|
// save the warehouse meta record so we can return the guid back to the appserver requestor
|
|
$this->updateWHMeta();
|
|
} catch (Throwable $t) {
|
|
$error = ERROR_THROWABLE_EXCEPTION . basename(__FILE__) . AT . __LINE__ . COLON;
|
|
$this->errorStack[] = $error;
|
|
$this->errorStack[] = $t->getMessage();
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->error($error);
|
|
$this->logger->error($t->getMessage());
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $error);
|
|
consoleLog($this->res, CON_ERROR, $t->getMessage());
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
$this->status = true;
|
|
$this->state = STATE_SUCCESS;
|
|
return true;
|
|
}
|
|
|
|
|
|
/**
|
|
* issueRemoteSQLQuery() -- private method
|
|
*
|
|
* This method is used to issue a remote, non-prepared, query to a remote mySQL/mariaDB resource.
|
|
*
|
|
* The method has three input parameters, one of which is required...
|
|
*
|
|
* $_query - the fully-qualified query string to be passed to the remote db for execution
|
|
* $_isCount - boolean value, defaulting to false, that indicates if we're exec'ing a remote count(*) query
|
|
* $_retVal - call-by-reference param, if isCount == true, the query results will be loaded here
|
|
*
|
|
* The method returns a Boolean value to indicate if the query successfully executed or not. It is not an
|
|
* indication that data was returned.
|
|
*
|
|
* If data is not returned (assume that the remote query exec'd successfully), then return a Boolean(true)
|
|
* value but set the member variable $noData == true to indicate the lack of data.
|
|
*
|
|
* If data is returned, set the $recordCount equal to the number of rows returned if this was a non-count query.
|
|
* If the query is a count(*) fetch, then populate $recordCount with that count value. Otherwise, the returned
|
|
* data is stored in the $queryData member variable.
|
|
*
|
|
* The client calls to this method should be wrapped in a TypeException handler.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @param string $_query
|
|
* @param bool $_isCount
|
|
* @param int $_retVal
|
|
* @return bool
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 05-03-18 mks _INF-188: original coding
|
|
* 05-23-18 mks _INF-188: added $_retVal param
|
|
*
|
|
*/
|
|
private function issueRemoteSQLQuery(string $_query, bool $_isCount = false, int &$_retVal = 0): bool
|
|
{
|
|
$this->noData = false;
|
|
$startTime = floatval(0);
|
|
|
|
try {
|
|
// start the query timer if supported
|
|
if ($this->widget->useTimers) $startTime = gasStatic::doingTime();
|
|
foreach ($this->resRemote->query($_query) as $row) {
|
|
$queryResults[] = $row;
|
|
}
|
|
// log the query time if supported
|
|
if ($this->widget->useTimers) $this->logger->metrics($_query, gasStatic::doingTime($startTime));
|
|
if (!empty($queryResults)) {
|
|
if ($_isCount) {
|
|
$_retVal = (!empty($queryResults)) ? $queryResults[0][STRING_NUM_RECS] : 0;
|
|
} else {
|
|
$this->recordCount = count($queryResults);
|
|
$this->queryData = $queryResults;
|
|
}
|
|
} else {
|
|
$this->noData = true;
|
|
$this->errorStack[] = ERROR_UT_QUERY_RETURNED_ZERO;
|
|
return true;
|
|
}
|
|
return true;
|
|
} catch (PDOException $p) {
|
|
$msg = sprintf(ERROR_PDO_EXCEPTION, $_query);
|
|
$this->errorStack[] = $msg;
|
|
$this->errorStack[] = $p->getMessage();
|
|
$this->logger->warn($msg);
|
|
$this->logger->warn($p->getMessage());
|
|
$this->state = STATE_DB_ERROR;
|
|
return false;
|
|
} catch (Throwable $t) {
|
|
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
$this->state = STATE_FRAMEWORK_WARNING;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* deleteWHSourceData() -- private method
|
|
*
|
|
* This method is called as part of the wareHousing process -- it is not called during data migration.
|
|
*
|
|
* The method should already have all internal members populated prior to invocation. As such, no input parameters
|
|
* are required. The method returns a Boolean value to indicate success or failure of the request and, as always,
|
|
* it is the responsibility of the calling client to check the return value.
|
|
*
|
|
* The method looks at the current schema of the source data ($this->widget) and if the source is remote, (as in:
|
|
* external to namaste), then builds the query to execute directly via the remote resource API. If the source data
|
|
* is local then we'll build a broker delete-event request and publish this request to the namaste write broker.
|
|
*
|
|
* the namaste-delete operation will take care of the delete request (in terms of soft/hard deletes) and will
|
|
* return a standard payload. On a successful remote event, we simple return -- otherwise the remote-request's
|
|
* diagnostics are integrated into the class error stack.
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
* @return bool
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 05-21-18 mks _INF-188: original coding completed
|
|
*
|
|
*/
|
|
private function deleteWHSourceData(): bool
|
|
{
|
|
switch ($this->widget->schema) {
|
|
case CONFIG_SCHEMA_MYSQL :
|
|
case CONFIG_DATABASE_PDO :
|
|
if ($this->remoteSource) {
|
|
// call exec-remote-sql method
|
|
$query = preg_replace('/\?/', "'" . $this->widget->queryVariables[0] . "'", $this->widget->strQuery, 1);
|
|
$query = preg_replace('/\?/', "'" . $this->widget->queryVariables[1] . "'", $query, 1);
|
|
$query = 'DELETE /* ' . basename(__FILE__) . AT . __LINE__ . ' */ FROM ' . $this->widget->collectionName . ' WHERE ' . $query;
|
|
$msg = INFO_SHOULD_NOT_SEE_THIS . COLON . $query;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->info($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
} else {
|
|
try {
|
|
// build a query to submit to namaste write broker
|
|
$bc = new gacBrokerClient(BROKER_QUEUE_W, __METHOD__ . AT . __LINE__);
|
|
if (!$bc->status) {
|
|
$error = ERROR_BROKER_CLIENT_DECLARE . BROKER_QUEUE_W;
|
|
$this->errorStack[] = $error;
|
|
if ($this->widget->debug) $this->logger->debug($error);
|
|
$this->errorStack[] = ERROR_WH_DEL_REMOTE_RECS;
|
|
consoleLog($this->res, CON_SYSTEM, ERROR_WH_DEL_REMOTE_RECS);
|
|
$this->widget->logger->error(ERROR_WH_DEL_REMOTE_RECS);
|
|
if (is_object($bc)) $bc->__destruct();
|
|
unset($bc);
|
|
return false;
|
|
}
|
|
$payload = [
|
|
BROKER_REQUEST => BROKER_REQUEST_DELETE,
|
|
BROKER_DATA => [STRING_QUERY_DATA => $this->whereClause],
|
|
BROKER_META_DATA => $this->meta
|
|
];
|
|
$request = gzcompress(json_encode($payload));
|
|
$response = json_decode(gzuncompress($bc->call($request)), true);
|
|
if (!$response[PAYLOAD_STATUS]) {
|
|
$error = sprintf(ERROR_UT_BROKER_EVENT_FAIL, BROKER_REQUEST_DELETE, $response[PAYLOAD_STATE]);
|
|
if (is_array($response[PAYLOAD_DIAGNOSTICS])) $this->errorStack = array_merge($this->errorStack, $response[PAYLOAD_DIAGNOSTICS]);
|
|
$this->errorStack[] = $error;
|
|
if ($this->widget->debug) $this->logger->debug($error);
|
|
if (is_object($bc)) $bc->__destruct();
|
|
unset($bc);
|
|
return false;
|
|
}
|
|
if (is_object($bc)) $bc->__destruct();
|
|
unset($bc);
|
|
return true;
|
|
} catch (Throwable $t) {
|
|
$msg = ERROR_THROWABLE_EXCEPTION . COLON . $t->getMessage();
|
|
$this->errorStack[] = $msg;
|
|
if (isset($this->logger) and $this->logger->available) {
|
|
$this->logger->warn($msg);
|
|
} else {
|
|
consoleLog($this->res, CON_ERROR, $msg);
|
|
}
|
|
return false;
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
|
|
/**
|
|
* __destruct() -- public function
|
|
*
|
|
* class destructor
|
|
*
|
|
*
|
|
* @author mike@givingassistant.org
|
|
* @version 1.0
|
|
*
|
|
*
|
|
* HISTORY:
|
|
* ========
|
|
* 02-21-18 mks _INF-139: original coding
|
|
*
|
|
*/
|
|
public function __destruct()
|
|
{
|
|
// As of PHP 5.3.10 destructors are not run on shutdown caused by fatal errors.
|
|
//
|
|
// destructor is registered shut-down function in constructor -- so any recovery
|
|
// efforts should go in this method.
|
|
}
|
|
}
|