Merge "Migrate value column of efficacy indicator on load"

This commit is contained in:
Zuul
2025-05-16 18:16:23 +00:00
committed by Gerrit Code Review
2 changed files with 60 additions and 17 deletions

View File

@@ -921,6 +921,9 @@ class Connection(api.BaseConnection):
# database created before the 15f7375ca737 revision, use the
# value column
indicator.data = indicator.value
# store the new column in the database
self._update(models.EfficacyIndicator,
indicator.id, indicator)
return eff_ind_models
@@ -956,6 +959,9 @@ class Connection(api.BaseConnection):
# database created before the 15f7375ca737 revision, use the
# value column
efficacy_indicator.data = efficacy_indicator.value
# store the new column in the database
self._update(models.EfficacyIndicator, efficacy_indicator.id,
efficacy_indicator)
return efficacy_indicator
except exception.ResourceNotFound:
raise exception.EfficacyIndicatorNotFound(efficacy_indicator=value)

View File

@@ -119,8 +119,20 @@ class MySQLDbDataMigrationsTestCase(MySQLDbMigrationsTestCase):
metadata = sqlalchemy.MetaData()
metadata.reflect(bind=self.engine)
eff_ind_table = sqlalchemy.Table('efficacy_indicators', metadata)
connection.execute(eff_ind_table.insert(), eff_ind_values)
connection.commit()
with connection.begin():
connection.execute(eff_ind_table.insert(), eff_ind_values)
def _read_efficacy_indicator(self, connection, id):
metadata = sqlalchemy.MetaData()
metadata.reflect(bind=self.engine)
eff_ind_table = sqlalchemy.Table('efficacy_indicators', metadata)
with connection.begin():
return connection.execute(
sqlalchemy.select(eff_ind_table.c.data).where(
eff_ind_table.c.id == id
)
).one()
def _pre_upgrade_15f7375ca737(self, connection):
"""Add data to the database before applying the 15f7375ca737 revision.
@@ -148,30 +160,28 @@ class MySQLDbDataMigrationsTestCase(MySQLDbMigrationsTestCase):
action_plan_id=self.action_plan.id, id=1, uuid=None,
name="efficacy_indicator1", description="Test Indicator 1",
value=1.01234567912345678)
self._create_manual_efficacy_indicator(
connection,
action_plan_id=self.action_plan.id, id=2, uuid=None,
name="efficacy_indicator2", description="Test Indicator 2",
value=2.01234567912345678)
def _check_15f7375ca737(self, connection):
"""Check data integrity after the database migration."""
# check that creating a new efficacy_indicator after the migration
# works
utils.create_test_efficacy_indicator(
action_plan_id=self.action_plan.id, id=2, uuid=None,
name="efficacy_indicator1", description="Test Indicator 2",
action_plan_id=self.action_plan.id, id=3, uuid=None,
name="efficacy_indicator3", description="Test Indicator 3",
value=0.01234567912345678)
db_efficacy_indicator = self.dbapi.get_efficacy_indicator_by_id(
self.context, 2)
self.context, 3)
self.assertAlmostEqual(db_efficacy_indicator.value, 0.012, places=3)
# check that getting the efficacy_indicator using the
# get_efficacy_indicator_list method reports the correct values
efficacy_indicators = self.dbapi.get_efficacy_indicator_list(
self.context)
self.assertEqual(len(efficacy_indicators), 2)
self.assertEqual(efficacy_indicators[0].id, 1)
self.assertAlmostEqual(efficacy_indicators[0].value,
1.00, places=3)
# check that the 'data' column of the efficacy_indicator created before
# applying the revision is null for both efficacy_indicators
self.assertIsNone(self._read_efficacy_indicator(connection, 1)[0])
self.assertIsNone(self._read_efficacy_indicator(connection, 2)[0])
self.assertEqual(efficacy_indicators[1].id, 2)
self.assertAlmostEqual(efficacy_indicators[1].value,
0.012, places=3)
# check that the existing data is there after the migration
db_goal = self.dbapi.get_goal_by_id(self.context, 1)
self.assertEqual(db_goal.name, "GOAL_1")
@@ -189,9 +199,36 @@ class MySQLDbDataMigrationsTestCase(MySQLDbMigrationsTestCase):
self.assertAlmostEqual(db_efficacy_indicator_1.value,
1.00, places=2)
self.assertEqual(db_efficacy_indicator_1.name, "efficacy_indicator1")
# check that the 'data' column of the efficacy_indicator1 is now the
# same as the 'value' column, i.e the data migration on load worked
eff_ind_1_data = self._read_efficacy_indicator(connection, 1)[0]
self.assertAlmostEqual(eff_ind_1_data,
1.00, places=2)
# check that getting the efficacy_indicator using the
# get_efficacy_indicator_list method reports the correct values
efficacy_indicators = self.dbapi.get_efficacy_indicator_list(
self.context)
self.assertEqual(len(efficacy_indicators), 3)
self.assertEqual(efficacy_indicators[0].id, 1)
self.assertAlmostEqual(efficacy_indicators[0].value,
1.00, places=3)
self.assertEqual(efficacy_indicators[1].id, 2)
self.assertAlmostEqual(efficacy_indicators[1].value,
2.00, places=3)
self.assertEqual(efficacy_indicators[2].id, 3)
self.assertAlmostEqual(efficacy_indicators[2].value,
0.012, places=3)
# check that the 'data' column of the efficacy_indicator1 is now the
# same as the 'value' column, i.e the data migration on load worked
# after the call to get_efficacy_indicator_list
eff_ind_2_data = self._read_efficacy_indicator(connection, 2)[0]
self.assertAlmostEqual(eff_ind_2_data,
2.00, places=2)
def test_migration_revisions(self):
with self.engine.begin() as connection:
with self.engine.connect() as connection:
self.alembic_config.attributes["connection"] = connection
script = ScriptDirectory.from_config(self.alembic_config)
revisions = [x.revision for x in script.walk_revisions()]