-
-
Notifications
You must be signed in to change notification settings - Fork 745
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add WIP change for migrating old executions field data which utilizes…
… the old and and very slow field type to the new field type. This is really an optional script since all the new objects will already utilize new field type and most users only care about new / recent executions (old executions are not retrieved that often so that taking a bit longer is not the end of the world).
- Loading branch information
Showing
1 changed file
with
135 additions
and
0 deletions.
There are no files selected for viewing
135 changes: 135 additions & 0 deletions
135
st2common/bin/migrations/v3.5/migrate-db-dict-field-values
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
#!/usr/bin/env python | ||
# Copyright 2021 The StackStorm Authors. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
""" | ||
Migration which which migrates data for existing objects in the database which utilize | ||
EscapedDictField or EscapedDynamicField and have been updated to use new JsonDictField. | ||
Migration step is idempotent and can be retried on failures. | ||
Keep in mind that running this migration script is optional and it may take a long time of you have | ||
a lot of very large objects in the database (aka executions) - reading a lot of data from the | ||
database using the old field types is slow and CPU intensive. | ||
New field type is automatically used for all the new objects when upgrading to v3.5 so migration is | ||
optional because in most cases users are viewing recent / new executions and not old ones which may | ||
still utilize old field typo which is slow to read / write. | ||
Right now the script utilizes no concurrency and performs migration one object by one. That's done | ||
for simplicity reasons and also to avoid massive CPU usage spikes when running this script with | ||
large concurrency on large objects. | ||
Keep in mind that only "completed" objects are processes - this means Executions in "final" states | ||
(succeeded, failed, timeout, etc.). | ||
We determine if execution is utilizing old format if it doesn't contain "result_size" field which | ||
was added along with the new field type. | ||
Actual migrating simply involves reading + re-saving the whole object to the database - everything | ||
is handled by the mongoengine and new field abastraction. | ||
TODO: Also add support for migrating (trigger instances and workflow related objects - low | ||
priority and for those objects we don't have a "result_size" attribute so it's not totally trivial | ||
to determine if object utilizes old field type (we could simply use some date threshold and migrate | ||
everything before that or execute raw pymongo query which searches for specifial string in the | ||
field value.) | ||
""" | ||
|
||
import sys | ||
import traceback | ||
|
||
from mongoengine.queryset.visitor import Q | ||
|
||
from st2common import config | ||
from st2common.service_setup import db_setup | ||
from st2common.service_setup import db_teardown | ||
from st2common.models.db.execution import ActionExecutionDB | ||
from st2common.models.db.liveaction import LiveActionDB | ||
from st2common.persistence.execution import ActionExecution | ||
from st2common.persistence.liveaction import LiveAction | ||
from st2common.constants import action as action_constants | ||
|
||
|
||
def migrate_executions() -> None: | ||
""" | ||
Perform migrations for execution related objects (ActionExecutionDB, LiveActionDB). | ||
""" | ||
# 1. Migrate ActionExecutionDB objects | ||
execution_dbs = ActionExecution.query( | ||
Q(result_size__not__exists=True) | ||
& Q(status__in=action_constants.LIVEACTION_COMPLETED_STATES) | ||
) | ||
|
||
if not execution_dbs: | ||
print("Found no ActionExecutionDB objects to migrate.") | ||
return None | ||
|
||
print("Will migrate %s ActionExecutionDB objects" % (len(execution_dbs))) | ||
|
||
for execution_db in execution_dbs: | ||
# Migrate corresponding LiveAction object | ||
print("Migrating ActionExecutionDB with id %s" % (execution_db.id)) | ||
|
||
# This is a bit of a "hack", but it's the easiest way to tell mongoengine that a specific | ||
# field has been updated and should be saved. If we don't do, nothing will be re-saved on | ||
# .save() call due to mongoengine only trying to save what has changed to make it more | ||
# efficient instead of always re-saving the whole object. | ||
execution_db._mark_as_changed("result") | ||
execution_db._mark_as_changed("result_size") | ||
|
||
# print(getattr(execution_db, "_changed_fields", [])) | ||
execution_db.save() | ||
print("ActionExecutionDB with id %s has been migrated" % (execution_db.id)) | ||
|
||
try: | ||
liveaction_db = LiveAction.get_by_id(execution_db.liveaction["id"]) | ||
except Exception: | ||
# If liveaction for some reason doesn't exist (would likely represent corrupted data) we | ||
# simply ignore that error since it's not fatal. | ||
continue | ||
|
||
liveaction_db._mark_as_changed("result") | ||
|
||
# print(getattr(liveaction_db, "_changed_fields", [])) | ||
liveaction_db.save() | ||
print("Related LiveActionDB with id %s has been migrated" % (liveaction_db.id)) | ||
print("") | ||
|
||
|
||
def migrate_objects() -> None: | ||
print("Migrating affected database objects to utilize new field type") | ||
migrate_executions() | ||
|
||
|
||
def main(): | ||
config.parse_args() | ||
|
||
db_setup() | ||
|
||
try: | ||
migrate_objects() | ||
print("SUCCESS: All database objects migrated successfully.") | ||
exit_code = 0 | ||
except Exception as e: | ||
print("ABORTED: Objects migration aborted on first failure: %s" % (str(e))) | ||
traceback.print_exc() | ||
exit_code = 1 | ||
|
||
db_teardown() | ||
sys.exit(exit_code) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |