@sh.add_function(
dsp, inputs_kwargs=True, outputs=['after_treatment_warm_up_phases']
)
def identify_after_treatment_warm_up_phases(
times, engine_powers_out, engine_coolant_temperatures, on_engine,
engine_thermostat_temperature, ems_data, hybrid_modes,
is_cycle_hot=False):
"""
Identifies after treatment warm up phase.
:param times:
Time vector [s].
:type times: numpy.array
:param engine_powers_out:
Engine power vector [kW].
:type engine_powers_out: numpy.array
:param engine_coolant_temperatures:
Engine coolant temperature vector [°C].
:type engine_coolant_temperatures: numpy.array
:param on_engine:
If the engine is on [-].
:type on_engine: numpy.array
:param ems_data:
EMS decision data.
:type ems_data: dict
:param hybrid_modes:
Hybrid mode status (0: EV, 1: Parallel, 2: Serial).
:type hybrid_modes: numpy.array
:param engine_thermostat_temperature:
Engine thermostat temperature [°C].
:type engine_thermostat_temperature: float
:param is_cycle_hot:
Is an hot cycle?
:type is_cycle_hot: bool
:return:
After treatment warm up phase.
:rtype: numpy.array
"""
warm_up = np.zeros_like(times)
if not on_engine.any():
return warm_up.astype(bool)
i = np.where(on_engine)[0]
p = np.choose(ems_data['hybrid_modes'].ravel() - 1, [
ems_data[k]['power_ice'].ravel() for k in ('parallel', 'serial')
])[i]
with np.errstate(divide='ignore', invalid='ignore'):
# noinspection PyUnresolvedReferences
warm_up[i[~co2_utl.get_inliers(
engine_powers_out[i] / p, 2, np.nanmedian, co2_utl.mad
)[0]]] = 1
warm_up = co2_utl.median_filter(times, warm_up, 5)
warm_up = co2_utl.clear_fluctuations(times, warm_up, 5).astype(bool)
if not warm_up.any():
return warm_up
indices, temp = co2_utl.index_phases(warm_up), engine_coolant_temperatures
b = np.diff(temp[indices], axis=1) > 4
b &= temp[indices[:, 0], None] < (engine_thermostat_temperature - 10)
b &= np.diff(times[indices], axis=1) > 5
b &= np.apply_along_axis(
lambda a: np.in1d(2, hybrid_modes[slice(*a)]), 1, indices
)
t_cool = dfl.functions.identify_after_treatment_warm_up_phases.cooling_time
t0 = is_cycle_hot and times[0] + t_cool or 0
warming_phases = np.zeros_like(on_engine, bool)
for i, j in co2_utl.index_phases(on_engine):
warming_phases[i:j + 1] = times[i] >= t0
t0 = times[j] + t_cool
warm_up[:] = False
for i, j in indices[b.ravel()]:
if warming_phases[i]:
while i and warming_phases.take(i - 1, mode='clip'):
i -= 1
while hybrid_modes[j] == 1 and j > i:
j -= 1
warm_up[i:j + 1] = True
return warm_up