# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE', which is part of this source code package.
#
#
import os
import numpy as np
from scipy import sparse
from Mordicus.Containers import Solution
[docs]
class ProblemData(object):
"""
Class containing a problemData
Attributes
----------
problemName : str
name of the ProblemData object
dataFolder : str
name of folder containing the data of the problemData, relative to the mordicus client script
solutions : dict
dictionary with solutionNames (str) as keys and solution (Solution) as values
initialCondition : InitialConditionBase
initial condition of the problem
loadings : list
dictionary with identifier (str) as keys and loading (LoadingBase) as values
constitutiveLaws : dict
dictionary with identifier (str) as keys and constitutive law (ConstitutiveLawBase) as values
parameters : dict
dictionary with time indices as keys and a np.ndarray of size (parameterDimension,) containing the parameters
onlineData : dict(str: custom_data_structure)
dictionary with solutionNames (str) as keys and data structure used in the online stage as values.
In simple methods, onlineData can directly be an operatorCompressionData generated during the offline stage
and need in the online stage to compute the reduced problem
"""
def __init__(self, problemName):
self.problemName = problemName
self.dataFolder = None
self.solutions = {}
self.initialCondition = None
self.loadings = {}
self.constitutiveLaws = {}
self.parameters = {}
self.onlineData = {}
[docs]
def SetDataFolder(self, dataFolder):
"""
Sets the dataFolder parameter
Parameters
----------
dataFolder : str
name of folder containing the data of the problemData, relative to the mordicus client script
"""
self.dataFolder = dataFolder
[docs]
def GetDataFolder(self):
"""
Returns the dataFolder parameter
Returns
-------
str
dataFolder
"""
return self.dataFolder
[docs]
def AddSolution(self, solution):
"""
Adds a solution to solutions
Parameters
----------
solution : Solution
the solution of the problem for a given field
"""
assert isinstance(
solution, Solution.Solution
), "solution must be an instance of Containers.Solution"
if solution.GetSolutionName() in self.solutions:
print(
"Solution "
+ solution.solutionName
+ " already in problemData.solutions. Replacing it anyway."
)
self.solutions[solution.GetSolutionName()] = solution
[docs]
def DeleteSolutions(self):
"""
Reinitializes the solutions parameter
"""
self.solutions = {}
[docs]
def AddParameter(self, parameter, time = 0.):
"""
Adds a parameter at time "time"
Parameters
----------
parameter : np.ndarray
of size (parameterDimension,)
time : float
(optional) time of the snapshot, default: 0.
"""
assert type(parameter) == np.ndarray and len(parameter.shape) == 1
time = float(time)
if time in self.parameters:
print(
"Parameter at time "
+ str(time)
+ " already in parameters. Replacing it anyways."
)
else:
self.parameters[time] = parameter
[docs]
def AddConstitutiveLaw(self, constitutiveLaw):
"""
Adds a constitutive law or a list of constitutive laws to constitutiveLaws
Parameters
----------
constitutiveLaw : ConstitutiveLawBase
the constitutive law of the problem for a given set and type
"""
try:
iter(constitutiveLaw)
except TypeError:
constitutiveLaw = [constitutiveLaw]
for law in constitutiveLaw:
if law.GetIdentifier() in self.constitutiveLaws:
print(
"ConstitutiveLaw "
+ str(law.GetIdentifier())
+ " already in problemData.constitutiveLaws. Replacing it anyway."
) # pragma: no cover
self.constitutiveLaws[law.GetIdentifier()] = law
[docs]
def AddOnlineData(self, onlineData):
"""
Adds an onlineData to onlineData
Parameters
----------
onlineData : OnlineDataBase
data structure used in the online stage
"""
self.onlineData[onlineData.GetSolutionName()] = onlineData
[docs]
def GetOnlineData(self, solutionName):
"""
Returns the onlineData of name solutionName
Parameters
----------
solutionName : str
name of the onlineData to retrieve
Returns
-------
OnlineDataBase
"""
assert solutionName in self.onlineData, "onlineData for solutionName "+solutionName+" has not been initialized"
return self.onlineData[solutionName]
[docs]
def SetInitialCondition(self, initialCondition):
"""
Sets the initial condition
Parameters
----------
initialCondition : InitialCondition
initial condition of the problem
"""
self.initialCondition = initialCondition
[docs]
def GetInitialCondition(self):
"""
Returns the initial condition
Returns
-------
initialCondition : InitialCondition
initial condition of the problem
"""
return self.initialCondition
[docs]
def AddLoading(self, loading):
"""
Adds a loading or a list of loadings to loadings
Parameters
----------
loading : LoadingBase
the loading of the problem for a given set and type
"""
try:
iter(loading)
except TypeError:
loading = [loading]
for load in loading:
if load.GetIdentifier() in self.loadings:
print(
"Loading "
+ str(load.GetIdentifier())
+ " already in problemData.loadings. Replacing it anyway."
)
self.loadings[load.GetIdentifier()] = load
[docs]
def UpdateLoading(self, loading):
"""
Update a loading or a list of loadings to loadings
Parameters
----------
loading : LoadingBase
the loading of the problem for a given set and type
"""
try:
iter(loading)
except TypeError:
loading = [loading]
for load in loading:
if load.GetIdentifier() not in self.loadings: # pragma: no cover
print(
"Loading "
+ str(load.GetIdentifier())
+ " not present in problemData.loadings. Cannot update."
)
self.loadings[load.GetIdentifier()].UpdateLoading(load)
[docs]
def GetParameters(self):
"""
Returns the parameters
Returns
-------
dict
parameters
"""
if self.parameters == False:
raise AttributeError(
"Please initialize parameters before trying to retrieve them."
) # pragma: no cover
return self.parameters
[docs]
def GetParametersTimeSequence(self):
"""
Returns the time indices of the parameters
Returns
-------
list
list containing the time indices of the parameters
"""
return list(self.parameters.keys())
[docs]
def GetParametersList(self):
"""
Returns the parameter values in the form of a list
Returns
-------
list
list containing the parameters
"""
return list(self.parameters.values())
[docs]
def GetParameterDimension(self):
"""
Assert that the parameters have the same parameterDimension and return this size
Returns
-------
int or None
common parameterDimension
"""
listParameterDimension = [
parameter.shape[0] for _, parameter in self.GetParameters().items()
]
if len(listParameterDimension)>0:
assert listParameterDimension.count(listParameterDimension[0]) == len(
listParameterDimension
)
return listParameterDimension[0]
else:# pragma: no cover
return None
[docs]
def GetParameterAtTime(self, time):
"""
Returns the parameter value at a specitiy time (with time interpolation if needed)
Parameters
----------
time : float
time at which the parameter is retrieved
Returns
-------
np.ndarray
parameter
"""
from Mordicus.BasicAlgorithms import Interpolation as TI
return TI.PieceWiseLinearInterpolation(
time, list(self.parameters.keys()), list(self.parameters.values())
)
#return self.parameters[time]
[docs]
def GetLoadings(self):
"""
Returns the complete loadings dictionary
Returns
-------
dict
loadings of the problem
"""
return self.loadings
[docs]
def GetLoading(self, solutionName, type, set):
"""
Returns a specific loading for the identifiers elements
Returns
-------
loading
"""
for loading in self.GetLoadings().values():
if loading.GetIdentifier() == (solutionName,type,set):
return loading
else:# pragma: no cover
raise("loading "+str((solutionName, type, set))+" not available")
[docs]
def GetLoadingsOfType(self, type):
"""
Returns all loadings of a specific type, in a list
Returns
-------
list
list of loadings of type type
"""
li = []
for loading in self.GetLoadings().values():
if loading.GetType() == type:
li.append(loading)# pragma: no cover
return li
[docs]
def GetLoadingsForSolution(self, solutionName):
"""
Returns all loadings for a specific solution name, in a list
Returns
-------
list
list of loadings of type type
"""
li = []
for loading in self.GetLoadings().values():
if loading.GetSolutionName() == solutionName:
li.append(loading)# pragma: no cover
return li
[docs]
def GetConstitutiveLaws(self):
"""
Returns the complete constitutiveLaws dictionary
Returns
-------
dict
constitutive laws of the problem
"""
return self.constitutiveLaws
[docs]
def GetConstitutiveLawsOfType(self, type):
"""
Returns all constitutive laws of a specific type, in a list
Returns
-------
list
list of constitutive laws of type type
"""
li = []
for law in self.GetConstitutiveLaws().values():
if law.GetType() == type:
li.append(law)# pragma: no cover
return li
[docs]
def GetSetsOfConstitutiveOfType(self, type):
"""
Returns the sets of all constitutive laws of a specific type, in a set
Returns
-------
set
set of strings of elementSets
"""
se = []
for law in self.GetConstitutiveLawsOfType(type):
se.append(law.GetSet())
return set(se)
[docs]
def GetSolution(self, solutionName):
"""
Returns the solution of name solutionName
Parameters
----------
solutionName : str
name of the solution to retrieve
Returns
-------
Solution
"""
return self.solutions[solutionName]
[docs]
def GetSolutions(self):
"""
Returns the solutions of problemData
Returns
-------
dict
solutions
"""
return self.solutions
[docs]
def CompressSolution(self, solutionName, reducedOrderBasis, snapshotCorrelationOperator = None):
"""
Compress solutions of name solutionName ; does nothing if no solution of name solutionName exists
Parameters
----------
solutionName : str
name of the solutions to compress
snapshotCorrelationOperator : scipy.sparse.csr
correlation operator between the snapshots
reducedOrderBasis : np.ndarray
of size (numberOfModes, numberOfDOFs)
"""
assert isinstance(solutionName, str)
if snapshotCorrelationOperator is None:
snapshotCorrelationOperator = sparse.eye(self.GetSolution(solutionName).GetNumberOfDofs())
try:
solution = self.GetSolution(solutionName)
solution.CompressSnapshots(snapshotCorrelationOperator, reducedOrderBasis)
except KeyError:#pragma: no cover
return
[docs]
def ConvertReducedCoordinatesReducedOrderBasis(self, solutionName, projectedReducedOrderBasis):
"""
Converts the reducedSnapshot from the current reducedOrderBasis to a newReducedOrderBasis using a projectedReducedOrderBasis between the current one and a new one
Parameters
----------
solutionName : str
name of the solution whose reducedCoordinates is to convert
projectedReducedOrderBasis : np.ndarray
of size (newNumberOfModes, numberOfModes)
"""
solution = self.GetSolution(solutionName)
solution.ConvertReducedCoordinatesReducedOrderBasis(projectedReducedOrderBasis)
[docs]
def UncompressSolution(self, solutionName, reducedOrderBasis):
"""
Uncompress solutions of name solutionName
Parameters
----------
solutionName : str
name of the solutions to uncompress
reducedOrderBasis : np.ndarray
of size (numberOfModes, numberOfDOFs)
"""
assert isinstance(solutionName, str)
if solutionName not in self.solutions:
raise AttributeError(
"You must provide solutions "
+ solutionName
+ " before trying to uncompress them"
) # pragma: no cover
solution = self.GetSolution(solutionName)
solution.UncompressSnapshots(reducedOrderBasis)
def __str__(self):
res = "ProblemData of name: "+self.problemName
return res
if __name__ == "__main__":# pragma: no cover
from Mordicus import RunTestFile
RunTestFile(__file__)