Exciting times around the SSP office as we engage in more and more implementations of the Utility Network! As the industry spins up the technology, we have been providing solutions to familiar workflows—but now adapted to the new system.
Routine workflows performed in Geometric Network databases now must be built in Utility Network environments. Recently we were presented with solving one such workflow to migrate data between two Utility Network environments.
Many people in the industry are unfamiliar with the steps involved due to the age of the technology. In Geometric Network land we had to perform steps in a specific order like dropping and rebuilding the Geometric Network. Similarly, in Utility Network land we must do things like disabling and enabling the network, updating subnetworks, importing associations, etc.
To help shed light on the new processes involved this post will detail the specific data migration we recently performed. The bullets below constitute assumptions for the purposes of the process and code presented here:
- We had two separate electric transmission Utility Network environments, A and B.
- They lived on different machines in different locations with different Portals but had identical database schemas.
- Editors were making edits in A. These edits needed to be moved to B. This meant A was the source and B was the target to which we wanted to move data.
- B was undergoing periodic data loads of its own which prevented the use of a SQL database restore or something similar.
The edits in A involved the insertion of substation data as well as updates to conductors and other features associated with the substation data. We used python to script the process as we figured out an approach that worked. The code has been blocked into logical segments and genericized. The blocks will be accompanied with a description to aid in understanding the process to implement for your own situation.
All the python libraries used are as follows:
import os import arcpy import pyodbc import urllib.parse import requests import json import csv import uuid
1. Export Data
The first grouping of steps produces files on disk that contain the information needed to import the data we want on the other side. That is, we extract information from A to an FGDB and a couple CSV files to be used later. This step is run while connected to the A environment.
a. Export Inserts and Updates From A Into FGDB
def create_export_fgdb(fgdb_folder):
now = datetime.now()
global now_string
now_string = now.strftime("%Y%m%d-%H%M%S")
fgdb_name = 'sub_features.gdb'
global out_fgdb
out_fgdb = os.path.join(fgdb_folder, fgdb_name)
if not os.path.exists(out_fgdb):
arcpy.AddMessage("Creating export FGDB {0}.".format(fgdb_name))
arcpy.CreateFileGDB_management(fgdb_folder, fgdb_name)
def export_sub_features(in_layer, out_fgdb, out_name, where_clause):
arcpy.env.preserveGlobalIds = True
arcpy.AddMessage("Copying layer {0}.".format(in_layer))
arcpy.FeatureClassToFeatureClass_conversion(in_layer, out_fgdb, out_name,
where_clause)
fcs.append(os.path.join(out_fgdb, out_name))
b. Export Associations From A Into CSV
def sign_into_portal(url, user, pw):
arcpy.AddMessage("Signing into Portal.")
arcpy.SignInToPortal(url, user, pw)
def export_associations(out_folder, network, type):
now = datetime.now()
csv_name = 'raw_associations.csv'
global raw_csv
raw_csv = os.path.join(out_folder, csv_name)
arcpy.AddMessage("Exporting associations")
arcpy.ExportAssociations_un(network, type, raw_csv)
c. Export Subnetwork Controllers From A Into CSV
def export_subnetwork_controllers(out_folder, network):
sc_csv_name = 'subnetwork_controllers.csv'
sc_csv = os.path.join(out_folder, sc_csv_name)
arcpy.AddMessage("Exporting subnetwork controllers to {0}".format(sc_csv_name))
arcpy.ExportSubnetworkControllers_un(network, sc_csv)
return sc_csv
d. Clean Associations CSV
Note: “Cleaning” in this context means to take the association csv generated by the GP Tool and strip out any associations that have no relation to anything that we’ve exported.
def get_feature_guids(fcs):
arcpy.AddMessage("Building feature GUID list.")
for fc in fcs:
with arcpy.da.SearchCursor(fc, "GLOBALID") as search_cursor:
for row in search_cursor:
guids.append(row[0])
def clean_csv(clean_csv):
arcpy.AddMessage("Cleaning up associations CSV.")
with open(raw_csv, 'rt', newline='') as inp, open(clean_csv, 'w', newline='') as out:
writer = csv.writer(out)
writer.writerow(['ASSOCIATIONTYPE', 'FROMFEATURECLASS', ' FROMASSETGROUP', ' FROMASSETTYPE', ' FROMGLOBALID', ' FROMTERMINAL', ' TOFEATURECLASS', ' TOASSETGROUP', ' TOASSETTYPE', ' TOGLOBALID', ' TOTERMINAL', ' ISCONTENTVISIBLE'])
for row in csv.reader(inp):
row_guids = []
row_guids.append(row[4])
row_guids.append(row[9])
if any(guid in row_guids for guid in guids):
writer.writerow(row)
inp.close()
out.close()
e. Clean Subnetwork Controllers CSV
Note: Like above note, we’re stripping out terminators from the subnetwork controllers since we’re replacing those with proper feature definitions.
def clean_sncs(target_csv): arcpy.AddMessage("Cleaning up non-terminator SNCs from CSV.") with open(snc_csv_path, 'rt', newline='') as inp, open(target_csv, 'w', newline='') as out: writer = csv.writer(out) writer.writerow(['SUBNETWORKCONTROLLERNAME', 'FEATUREGLOBALID','FEATURECLASSNAME','FEATUREASSETGROUP','FEATUREASSETTYPE','FEATURETERMINAL','TIERNAME','SUBNETWORKNAME','DESCRIPTION','NOTES']) for row in csv.reader(inp): if row[4] != "Terminator": writer.writerow(row) inp.close() out.close()
f. Export Deletes From A to CSV
def fetch_deletes(key, table): cursor.execute( f'SELECT [{key}],[objectid],[GDB_DELETED_AT],[GDB_DELETED_BY],[GLOBALID] FROM [electric].[{table}] WHERE [GDB_IS_DELETE] = 1') deletedGuids = [] for row in cursor: deletedGuids.append(row[0]) return deletedGuids def compare_guids(tar_fs, f_out_dir, name, match_field, table): arcpy.AddMessage(f"Comparing {name} GUIDS...") arcpy.AddMessage("Processing... Please wait.") deletedFeatures = [] with open(f_out_dir, 'w', newline="") as fout: writer = csv.writer(fout) fieldList = [field.name for field in arcpy.ListFields(tar_fs)] writer.writerow(fieldList[0:]) fieldCheck = [] fieldCheck.extend([match_field]) fieldCheck.extend(fieldList) matchList = fetch_deletes(match_field, table) with arcpy.da.UpdateCursor(tar_fs, fieldCheck) as crs: for row in crs: if row[0] is not None and str(row[0][1:len(row[0]) - 1]) in matchList: deletedFeatures.append(row[0]) writer.writerow(row[1:]) # crs.deleteRow() arcpy.AddMessage(f"Found {len(deletedFeatures)} deleted \"{name}\" features.") etj_columns = [] cursor.execute('SELECT [column_name] FROM [dbo].[SDE_column_registry] WHERE [table_name] = \'ELECTRICTRANSMISSIONJUNCTION\'') for row in cursor: etj_columns.extend(row) query = 'SELECT ' for col in etj_columns: query += f'[{col}],' query = query[0:len(query)-1] query += ' FROM [electric].[ELECTRICTRANSMISSIONJUNCTION] WHERE [GDB_IS_DELETE] = 1' cursor.execute(query) features = [] for row in cursor: features.append(row) with open(os.path.join(output, 'ETJ_Deletes.csv'), 'w', newline='') as fout: writer = csv.writer(fout) writer.writerow(etj_columns) for row in features: writer.writerow(row) fout.close()
2. Import Data
The second grouping of steps consumes the exported files to import data to B. This step is run on the B environment and the files produced from step A must be copied into the B environment to be accessible, naturally.
a. Delete Features in B Using Deletes CSV from A
def Delete_Feature(path, key, fc): logger.info(f"Reading from {path}") rows = [] with open(path, 'r') as fin: reader = csv.reader(fin) for line in reader: rows.append(line) logger.info(f"Features found for {path} : {len(rows) - 1}") keyInd = rows[0].index(key) keys = [row[keyInd] for row in rows[1::]] matches = [] # Fetch match global IDs with arcpy.da.SearchCursor(fc, [key, "GLOBALID", "OBJECTID"]) as crs: for row in crs: if row[0] in keys and row[0] is not None: matches.append(row[1]) # Delete associations for feature. associationCount = 0 associationObjIDs = [] with arcpy.da.SearchCursor(target_sde_association_endpt, ('FROMGLOBALID', 'TOGLOBALID', 'OBJECTID')) as crs: for row in crs: if row[0] in matches or row[1] in matches: try: associationCount += 1 logger.info(f"Deleting association ({associationCount}): \n\t F: {row[0]} -> T: {row[1]}") associationObjIDs.append(row[2]) except Exception as ex: logger.error(f"Exception occurred while deleting association:\n\t F: {row[0]} -> T: {row[1]}\n\t{ex}") if len(associationObjIDs) > 0: with arcpy.da.UpdateCursor(target_sde_association_endpt, ('OBJECTID', 'GLOBALID')) as curs: for row in curs: if row[0] in associationObjIDs: try: logger.info(f"Deleting Association: {row[1]}") curs.deleteRow() except Exception as ex: logger.error(ex) # Delete matched features with arcpy.da.UpdateCursor(fc, ["GLOBALID", "OBJECTID"]) as crs: for row in crs: if row[0] in matches: try: crs.deleteRow() logger.info(f"Deleted ({key}): \n\tOID: {row[1]}, GUID: {row[0]}") except Exception as ex: logger.error(f"Exception while deleting ({key}):\n\tOID: {row[1]}, GUID: {row[0]}\n\t{ex}") logger.info(f"Found {len(matches)} matched features for deletion in target layer.") logger.info("--------------------------------------------------------------------")
b. Disable Topology in B
def sign_into_portal(portal_url, user, pw): res = arcpy.SignInToPortal(portal_url, user, pw) LogMessage("Signed into Portal as {0}.".format(user)) return res['token'] def disable_topology(in_utility_network): LogMessage("Disabling topology.") arcpy.DisableNetworkTopology_un(in_utility_network) LogMessage("Topology disabled.")
c. Determine Terminators for Deletion in B
sncCount = 0 subnetwork_controller_guids = [] subnetwork_controller_names = [] with open(source_subnetwork_controllers, 'r') as csv_file: csv_reader = csv.reader(csv_file, delimiter=',') next(csv_reader) for lines in csv_reader: subnetwork_controller_guids.append(lines[1]) subnetwork_controller_names.append(f"{lines[5]}/{lines[0]}") sncCount += 1 LogMessage(f"Found {sncCount} features in CSV") sncCount = 0 disabledSNCNames = {} duplicateSNCGUIDS = [] terminatorSNCGUIDS = [] with arcpy.da.SearchCursor(target_sde_device, ["GLOBALID", "ASSETGROUP", "SubnetworkControllerName"]) as searchCursor: delete_terminator_guids = [] for row in searchCursor: if (row[0] in subnetwork_controller_guids) or (row[0] not in subnetwork_controller_guids and row[1] is 111) or (row[2] in subnetwork_controller_names): disableSubData['featureGlobalId'] = row[0] disabledSNCNames[row[0]] = row[2] response = reqSess.post(target_un_endpt_root + '/disableSubnetworkController', disableSubData) logger.info(response.json()) sncCount += 1 if (row[0] in subnetwork_controller_guids): duplicateSNCGUIDS.append([row[0], row[1], row[2], response.json()['success']]) if (row[0] not in subnetwork_controller_guids and row[1] is 111): terminatorSNCGUIDS.append([row[0], row[1], row[2], response.json()['success']]) LogMessage(f"Processed requests for {sncCount} objects.") LogMessage(f"Found {len(duplicateSNCGUIDS)} duplicate Subnetwork controller GUIDS in target DB.") LogMessage(f"Found {len(terminatorSNCGUIDS)} terminator Subnetwork controllers in target DB not in CSV.") duplicateJSONPath = os.path.join(source_folder, "JSON_Output") if not os.path.exists(duplicateJSONPath): os.mkdir(duplicateJSONPath) duplicateJSONDump = os.path.join(source_folder, "JSON_Output", "duplicateSNCGuids.json") terminatorJSONDump = os.path.join(source_folder, "JSON_Output", "terminatorSNCGuids.json") with open(duplicateJSONDump, 'w') as fout: writer = csv.writer(fout) for entry in duplicateSNCGUIDS: writer.writerow(entry) fout.close() with open(terminatorJSONDump, 'w') as fout: writer = csv.writer(fout) for entry in terminatorSNCGUIDS: writer.writerow(entry) fout.close()
d. Remove Updated Features in B Using FGDB From A (Pre-Existing Features)
def remove_target_features(sde, source_sde_feature_class, target_sde_layer, match_field='legacyguid'): if source_sde_feature_class != source_fgdb_terminator: arcpy.env.workspace = sde remove_guids = [] LogMessage("Found feature {0}.".format(source_sde_feature_class)) with arcpy.da.SearchCursor(source_sde_feature_class, match_field) as search_cursor: for row in search_cursor: remove_guids.append(row[0]) LogMessage("Found {0} features in \'{1}\' from source export.".format(len(remove_guids), os.path.basename(os.path.normpath(source_sde_feature_class)))) with arcpy.da.SearchCursor(target_sde_layer, [match_field, "objectid", 'globalid']) as search_cursor: delete_guids = [] for row in search_cursor: if row[0] in remove_guids: delete_guids.append(row[1]) removed_guids.append(row[2]) delete_data = { 'f': 'json', 'deletes': json.dumps(delete_guids), 'token': token, 'sessionID': sessGUID } delete_endpt = target_sde_layer + "/applyEdits" response = reqSess.post(delete_endpt, delete_data) LogMessage(f"Delete Response: {response.json()}") LogMessage("Deleted {0} features in target layer.".format(len(response.json()['deleteResults'])))
e. Delete Terminator Features in B
Note: This process was batched into small chunks to prevent erroring out during the process. This seems to help in various geoprocessing tasks when we ran into unhandled errors.
def remove_terminator_features(sde, target_sde_layer, snc_dict): subnetwork_controller_guids = [] subnetwork_controller_names = [] with open(source_subnetwork_controllers, 'r') as csv_file: csv_reader = csv.reader(csv_file, delimiter=',') next(csv_reader) for lines in csv_reader: if (lines[3] == 'Terminator'): subnetwork_controller_guids.append(lines[1]) subnetwork_controller_names.append(lines[5] + "/" + lines[0]) LogMessage("Found {0} Names in subnetwork controller CSV file.".format(len(subnetwork_controller_names))) arcpy.env.workspace = sde LogMessage("Comparing target terminators with GUID list from subnetwork controller CSV file.") delete_terminator_guids = [] with arcpy.da.SearchCursor(target_sde_layer, ["GLOBALID", "ASSETGROUP", "SUBNETWORKCONTROLLERNAME", "OBJECTID"]) as search_cursor: for row in search_cursor: if row[1] == 111: # if row[0] not in subnetwork_controller_guids: # delete_terminator_guids.append(row[3]) if snc_dict[row[0]] not in subnetwork_controller_names: LogMessage(f"Delete terminator found: {row[0]}: {snc_dict[row[0]]}") delete_terminator_guids.append(row[0]) lowerLim = 0 upperLim = 20 while (upperLim < len(delete_terminator_guids)): LogMessage(f"Found {len(delete_terminator_guids[lowerLim:upperLim])} in target DB.") delete_data = { 'f': 'json', 'deletes': json.dumps(delete_terminator_guids[lowerLim:upperLim]), 'token': token, 'sessionID': sessGUID, 'useGlobalIds': True } delete_endpt = target_sde_layer + "/applyEdits" response = reqSess.post(delete_endpt, delete_data) LogMessage(f"Delete Response: {response.json()}") LogMessage("Deleted {0} features in target layer.".format(len(response.json()['deleteResults']))) lowerLim = upperLim upperLim += 20 upperLim = min(upperLim, len(delete_terminator_guids))
f. Append Features to B Using FGDB from A (Replacements)
def append_features(sde, source_sde_feature_class, target_sde_layer, subtype, group): arcpy.env.workspace = sde arcpy.env.preserveGlobalIds = True LogMessage("Appending features from source feature class \'{0}\'.".format(os.path.basename(os.path.normpath(source_sde_feature_class)))) destFolder = os.path.join(source_folder, "JSON_Output") if not os.path.isdir(destFolder): LogMessage("Directory not found, creating...") os.mkdir(destFolder) featureJsonDir = os.path.join(destFolder, group + ".json") if os.path.exists(featureJsonDir): os.remove(featureJsonDir) arcpy.FeaturesToJSON_conversion(source_sde_feature_class, featureJsonDir) with open(featureJsonDir) as jsonFile: importResults = json.load(jsonFile) jsonFile.close() newFeatures = [] addFeatureCount = len(importResults['features']) target_globalIDS = [row[0] for row in arcpy.da.SearchCursor(target_sde_layer, 'globalid')] for feature in importResults['features']: if feature['attributes']['GLOBALID'] not in target_globalIDS\ or feature['attributes']['GLOBALID'] in removed_guids: newFeatures.append(feature) if len(newFeatures) != addFeatureCount: LogMessage(f"Deleted {addFeatureCount - len(newFeatures)} features from import for {group}.") addData = { 'f': 'json', 'token': token, 'useGlobalIds': True, 'sessionID': sessGUID, 'adds': json.dumps(newFeatures), } addUrl = target_sde_layer + "/applyEdits" response = reqSess.post(addUrl, addData) LogMessage(f"Appending features completed. {len(response.json()['addResults'])} features added.") LogMessage("-----------------------------------------------------------------------------------")
g. Remove Invalid Subnetwork Controllers in CSV from A
def remove_invalid_subnets(in_csv, invalid_guid_list): LogMessage("Stripping invalid GUIDS from SNC list.") with open(in_csv, 'rt', newline='') as inp, open(source_clean_subnetwork_controllers, 'w', newline='') as out: writer = csv.writer(out) for row in csv.reader(inp): if row[1] not in invalid_guid_list: writer.writerow(row) inp.close() out.close()
h. Remove Duplicate Associations in B Using CSV from A
Note: This step removes associations from import csv group that already exist in target. Import Associations Geoprocessing tool crashes if a duplicate is encountered.
def remove_duplicate_associations(in_csv): LogMessage("Filtering duplicate associations from associations CSV.") count = 0 associationPairs = [] with open(in_csv, 'rt', newline='') as fIn: for row in csv.reader(fIn): associationPairs.append(row) fIn.close() LogMessage(f'Pairs found in CSV: {len(associationPairs)}') queryRequestData = { 'token': token, 'f': 'json', 'WHERE': "1 = 1", } with open(source_unique_associations, 'w', newline='') as fOut: writer = csv.writer(fOut) writer.writerow(associationPairs[0]) LogMessage(f"Scanning for duplicate associations.") for row in associationPairs[1::]: queryRequestData['WHERE'] = f"FromGlobalId = '{row[4]}' AND ToGlobalID = '{row[9]}'" response = reqSess.post(target_service_base + '/' + target_service_feature_server + "/500001/query", queryRequestData) if (len(response.json()['features']) > 0): count += 1 LogMessage(f"Duplicate Association found: {count}") else: writer.writerow(row) fOut.close() LogMessage(f"Filtering complete. {count} duplicate rows found.")
i. Import Subnetwork Controllers to B Using CSV from A
def import_subnetwork_controllers(network, sc_source): LogMessage("Importing subnetwork controllers from file {0}.".format(sc_source)) arcpy.ImportSubnetworkControllers_un(network, sc_source) LogMessage("Importing subnetwork controllers complete.".format(sc_source))
j. Remove Invalid Associations in CSV from A
Note: Identify any associations that have either FROM/TOGLOBALID that refers to a feature that does not exist in the imported data nor the target database.
def remove_invalid_associations(in_csv, local_guids): LogMessage("Removing Invalid Associations") associationPairs = [] with open(in_csv, 'rt') as fIn: for row in csv.reader(fIn): associationPairs.append(row) fIn.close() LogMessage(f'Unique associations found in CSV: {len(associationPairs)}') target_guids = [row for row in arcpy.da.SearchCursor(target_sde_device, "GlobalID")] target_guids.extend([row for row in arcpy.da.SearchCursor(target_sde_junction, "GlobalID")]) target_guids.extend([row for row in arcpy.da.SearchCursor(target_sde_line, "GlobalID")]) full_guids = [] full_guids.extend(local_guids) full_guids.extend(target_guids) full_guids = list(set(full_guids)) count = 0 with open(source_clean_associations, 'w', newline='') as fOut: writer = csv.writer(fOut) writer.writerow(associationPairs[0]) for row in associationPairs[1::]: if (row[4] in full_guids and row[9] in full_guids): if row[4] in local_guids: row[3] = row[3].replace("AC Disconnecting Switch", "AC Substation Disconnecting Switch") if row[9] in local_guids: row[8] = row[8].replace("AC Disconnecting Switch", "AC Substation Disconnecting Switch") writer.writerow(row) else: count += 1 # arcpy.AddMessage(f"Invalid GUID Found: {count}") fOut.close() LogMessage(f"Removing Invalid {count} Associations Complete") with open(ref_dir, 'r', newline='') as fin: reader = csv.reader(fin, delimiter=',') count = 1 lines = [] for line in reader: lines.append(line) csv_definition = lines[0] wroteHeader = False fTemp = open(f"{chunk_template}1.csv", "w+") fTemp.close() for line in lines[1::]: with open(f"{chunk_template}{fileCount}.csv", 'a', newline='') as fout: writer = csv.writer(fout) if not wroteHeader: writer.writerow(csv_definition) wroteHeader = True count += 1 if line[1] in localGuids: line[4] = line[4].replace("AC Disconnecting Switch", "AC Substation Disconnecting Switch") writer.writerow(line) if count > chunkLim: count = 1 wroteHeader = False fileCount += 1 fTemp = open(f"{chunk_template}{fileCount}.csv", "w+") fTemp.close() for num in range(1, fileCount + 1): logger.info(f"Importing subset {num}...") try: arcpy.ImportSubnetworkControllers_un(target_sde_utility_network_data_owner, f"{chunk_template}{num}.csv") # logger.info(f"{chunk_template}{num}.csv") except Exception as ex: logger.error(f"Importing subset {num} failed: {ex}")
k. Import Associations to B Using CSV from A
def import_associations(network, type, a_source, token): LogMessage("Importing associations from file {0}.".format(a_source)) arcpy.ImportAssociations_un(network, type, a_source) LogMessage("Finished importing associations.")
l. Enable Topology in B
def enable_topology(in_utility_network): LogMessage("Enabling topology.") arcpy.EnableNetworkTopology_un(in_utility_network, 10000000) LogMessage("Topology enabled.")
m. Update Subnetworks in B
def update_subnetworks(in_utility_network): LogMessage("Updating Subnetworks") try: enable_topology(in_utility_network) except: LogMessage("Topology enabled, continuing...") try: LogMessage("Updating AC High Voltage Subnetworks.") arcpy.UpdateSubnetwork_un(in_utility_network, domain_network="ElectricTransmission", tier="AC High Voltage", continue_on_failure='true') LogMessage("Updating AC High Voltage Subnetworks Completed.") except Exception as ex: logger.error(f"Encountered errors while updating AC High Voltage Subnetworks.") try: logger.error(ex) except: pass try: LogMessage("Updating DC High Voltage Subnetworks.") arcpy.UpdateSubnetwork_un(in_utility_network, domain_network="ElectricTransmission", tier="AC High Voltage", continue_on_failure='true') LogMessage("Updating DC High Voltage Subnetworks Completed.") except Exception as ex: logger.error("Encountered errors while updating DC High Voltage Subnetworks.") try: logger.error(ex) except: pass LogMessage("Subnetworks Updated.")
There you have it. The edits have gone from A to B. Obviously this situation has some challenges and this process is tailored to the narrow problem we needed to solve, but the order of operations and the steps here should be helpful to those needing to accomplish data migration between UN-enabled environments.
Like any migration, it is important to know the context in which the data lives so you can make sure the data is comfy. For UN data to be comfy, it needs its topology enabled and updates, its associations in place, valid subnetwork controllers and the like.
We’ve intentionally left out the function calls and variable declarations because we want these code blocks to be a guide instead of something you can drop a few parameters in and turn loose. That generic data migration tool might be coming in future, but the schemas and requirements could be different enough in your case this approach seemed more useful.
What do you think?