Source code for co2mpas.core.model.physical.engine.fc

# -*- coding: utf-8 -*-
#
# Copyright 2015-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
Functions and `dsp` model to predict the fuel consumptions.
"""
import copy
import functools
import itertools
import numpy as np
import schedula as sh
from co2mpas.defaults import dfl
import co2mpas.utils as co2_utl

dsp = sh.BlueDispatcher(
    name='Engine fuel consumption sub model',
    description='Calculates fuel consumptions.'
)


[docs]@sh.add_function(dsp, outputs=['has_exhausted_gas_recirculation']) def default_engine_has_exhausted_gas_recirculation(fuel_type): """ Returns the default engine has exhaust gas recirculation value [-]. :param fuel_type: Fuel type (diesel, gasoline, LPG, NG, ethanol, methanol, biodiesel, propane). :type fuel_type: str :return: Does the engine have exhaust gas recirculation technology? :rtype: bool """ return fuel_type in ('diesel', 'biodiesel')
[docs]@sh.add_function(dsp, outputs=['brake_mean_effective_pressures']) def calculate_brake_mean_effective_pressures( engine_speeds_out, engine_powers_out, engine_capacity, min_engine_on_speed): """ Calculates engine brake mean effective pressure [bar]. :param engine_speeds_out: Engine speed vector [RPM]. :type engine_speeds_out: numpy.array :param engine_powers_out: Engine power vector [kW]. :type engine_powers_out: numpy.array :param engine_capacity: Engine capacity [cm3]. :type engine_capacity: float :param min_engine_on_speed: Minimum engine speed to consider the engine to be on [RPM]. :type min_engine_on_speed: float :return: Engine brake mean effective pressure vector [bar]. :rtype: numpy.array """ b = engine_speeds_out > min_engine_on_speed p = np.zeros_like(engine_powers_out) p[b] = engine_powers_out[b] / engine_speeds_out[b] p[b] *= 1200000.0 / engine_capacity return np.nan_to_num(p)
[docs]@sh.add_function(dsp, outputs=['after_treatment_temperature_threshold']) def calculate_after_treatment_temperature_threshold( engine_thermostat_temperature, initial_engine_temperature): """ Calculates the engine coolant temperature when the after treatment system is warm [°C]. :param engine_thermostat_temperature: Engine thermostat temperature [°C]. :type engine_thermostat_temperature: float :param initial_engine_temperature: Initial engine temperature [°C]. :type initial_engine_temperature: float :return: Engine coolant temperature threshold when the after treatment system is warm [°C]. :rtype: (float, float) """ ti = 273 + initial_engine_temperature t = (273 + engine_thermostat_temperature) / ti - 1 temp_mean = 40 * t + initial_engine_temperature temp_end = 40 * t ** 2 + temp_mean return temp_mean, temp_end
[docs]@sh.add_function(dsp, outputs=['tau_function']) def define_tau_function(after_treatment_temperature_threshold): """ Defines tau-function of the extended Willans curve. :param after_treatment_temperature_threshold: Engine coolant temperature threshold when the after treatment system is warm [°C]. :type after_treatment_temperature_threshold: (float, float) :return: Tau-function of the extended Willans curve. :rtype: callable """ import scipy.stats as sci_sta temp_mean, temp_end = np.array(after_treatment_temperature_threshold) + 273 s = np.log(temp_end / temp_mean) / sci_sta.norm.ppf(0.95) f = sci_sta.lognorm(max(s, dfl.EPS), 0, temp_mean).cdf def _tau_function(t0, t1, temp): return t0 + (t1 - t0) * f(temp + 273) return _tau_function
[docs]@sh.add_function(dsp, outputs=['extended_integration_times']) def calculate_extended_integration_times( times, velocities, on_engine, phases_integration_times, engine_coolant_temperatures, after_treatment_temperature_threshold, stop_velocity): """ Calculates the extended integration times [-]. :param times: Time vector [s]. :type times: numpy.array :param velocities: Velocity vector [km/h]. :type velocities: numpy.array :param on_engine: If the engine is on [-]. :type on_engine: numpy.array :param phases_integration_times: Cycle phases integration times [s]. :type phases_integration_times: tuple :param engine_coolant_temperatures: Engine coolant temperature vector [°C]. :type engine_coolant_temperatures: numpy.array :param after_treatment_temperature_threshold: Engine coolant temperature threshold when the after treatment system is warm [°C]. :type after_treatment_temperature_threshold: (float, float) :param stop_velocity: Maximum velocity to consider the vehicle stopped [km/h]. :type stop_velocity: float :return: Extended cycle phases integration times [s]. :rtype: tuple """ lv, pit = np.zeros(velocities.size + 2), np.unique(phases_integration_times) lv[1:-1] = np.asarray(velocities <= stop_velocity, int) indices = np.where(np.diff(lv) != 0)[0].reshape(-1, 2) split_points = [] for i, j in indices: t0, t1 = times[i], times[j - 1] if t1 - t0 < 20 or any(t0 <= x <= t1 for x in pit): continue b = ~on_engine[i:j] if b.any() and not b.all(): t = np.median(times[i:j][b]) else: t = (t0 + t1) / 2 split_points.append(t) try: i = np.searchsorted( engine_coolant_temperatures, (after_treatment_temperature_threshold[1],) )[0] if not lv[i + 1]: split_points.append(times[i]) except IndexError: pass return sorted(split_points)
[docs]@sh.add_function(dsp, outputs=[ 'extended_cumulative_co2_emissions', 'extended_phases_integration_times' ]) def calculate_extended_cumulative_co2_emissions( times, velocities, on_engine, extended_integration_times, co2_normalization_references, phases_integration_times, phases_co2_emissions, phases_distances, stop_velocity): """ Calculates the extended cumulative CO2 of cycle phases [CO2g]. :param times: Time vector [s]. :type times: numpy.array :param velocities: Velocity vector [km/h]. :type velocities: numpy.array :param on_engine: If the engine is on [-]. :type on_engine: numpy.array :param extended_integration_times: Extended cycle phases integration times [s]. :type extended_integration_times: tuple :param co2_normalization_references: CO2 normalization references (e.g., engine loads) [-]. :type co2_normalization_references: numpy.array :param phases_integration_times: Cycle phases integration times [s]. :type phases_integration_times: tuple :param phases_co2_emissions: CO2 emission of cycle phases [CO2g/km]. :type phases_co2_emissions: numpy.array :param phases_distances: Cycle phases distances [km]. :type phases_distances: numpy.array :param stop_velocity: Maximum velocity to consider the vehicle stopped [km/h]. :type stop_velocity: float :return: Extended cumulative CO2 of cycle phases [CO2g]. :rtype: numpy.array """ r = co2_normalization_references.copy() r[~on_engine] = 0 lv = np.asarray(velocities <= stop_velocity, int) _cco2, phases = [], [] cco2 = phases_co2_emissions * phases_distances from scipy.integrate import trapz def _stops(n, m): return trapz(lv[n:m], times[n:m]) / (times[m] - times[n]) for cco2, (t0, t1) in zip(cco2, phases_integration_times): i, j = np.searchsorted(times, (t0, t1)) if i == j: continue v = trapz(r[i:j], times[i:j]) c, k0 = [0.0], i p = [t for t in extended_integration_times if t0 < t < t1] for k, t in zip(np.searchsorted(times, p), p): if k < j and _stops(k0, k) < 0.5 and _stops(k, j) < 0.5: phases.append((t0, t)) t0, k0 = t, k c.append(trapz(r[i:k], times[i:k]) / v) phases.append((t0, t1)) c.append(1.0) _cco2.extend(np.diff(c) * cco2) return np.array(_cco2), phases
# noinspection PyMissingOrEmptyDocstring
[docs]class IdleFuelConsumptionModel:
[docs] def __init__(self, fc=None): self.fc = fc self.n_s = None self.c = None self.fmep_model = None
[docs] def fit(self, idle_engine_speed, engine_capacity, engine_stroke, lhv, fmep_model): idle = idle_engine_speed[0] from . import calculate_mean_piston_speeds self.n_s = calculate_mean_piston_speeds(idle, engine_stroke) self.c = idle * (engine_capacity / (lhv * 1200)) self.fmep_model = fmep_model return self
[docs] def consumption(self, params=None, ac_phases=None): import lmfit if isinstance(params, lmfit.Parameters): params = params.valuesdict() if self.fc is not None: ac = params.get('acr', self.fmep_model.base_acr) avv = params.get('avv', 0) lb = params.get('lb', 0) egr = params.get('egr', 0) fc = self.fc else: fc, _, ac, avv, lb, egr = self.fmep_model(params, self.n_s, 0, 1) if not (ac_phases is None or ac_phases.all() or 'acr' in params): p = params.copy() p['acr'] = self.fmep_model.base_acr _fc, _, _ac, _avv, _lb, _egr = self.fmep_model(p, self.n_s, 0, 1) fc = np.where(ac_phases, fc, _fc) ac = np.where(ac_phases, ac, _ac) avv = np.where(ac_phases, avv, _avv) lb = np.where(ac_phases, lb, _lb) egr = np.where(ac_phases, egr, _egr) fc *= self.c # [g/sec] return fc, ac, avv, lb, egr
dsp.add_data('stop_velocity', dfl.values.stop_velocity) dsp.add_data('min_engine_on_speed', dfl.values.min_engine_on_speed)
[docs]@sh.add_function( dsp, inputs_kwargs=True, inputs_defaults=True, outputs=['idle_fuel_consumption_model'] ) def define_idle_fuel_consumption_model( idle_engine_speed, engine_capacity, engine_stroke, engine_fuel_lower_heating_value, fmep_model, idle_fuel_consumption_initial_guess=None): """ Defines the idle fuel consumption model. :param idle_engine_speed: Engine speed idle median and std [RPM]. :type idle_engine_speed: (float, float) :param engine_capacity: Engine capacity [cm3]. :type engine_capacity: float :param engine_stroke: Engine stroke [mm]. :type engine_stroke: float :param engine_fuel_lower_heating_value: Fuel lower heating value [kJ/kg]. :type engine_fuel_lower_heating_value: float :param fmep_model: Engine FMEP model. :type fmep_model: FMEP :param idle_fuel_consumption_initial_guess: Initial guess of fuel consumption at hot idle engine speed [g/s]. :type idle_fuel_consumption_initial_guess: float, optional :return: Idle fuel consumption model. :rtype: IdleFuelConsumptionModel """ d = dfl.functions if idle_fuel_consumption_initial_guess is not None or \ d.ENABLE_ALL_FUNCTIONS or \ d.define_idle_fuel_consumption_model.ENABLE: model = IdleFuelConsumptionModel( idle_fuel_consumption_initial_guess).fit( idle_engine_speed, engine_capacity, engine_stroke, engine_fuel_lower_heating_value, fmep_model ) return model return sh.NONE
[docs]@sh.add_function( dsp, inputs_kwargs=True, outputs=['engine_idle_fuel_consumption'] ) def calculate_engine_idle_fuel_consumption( idle_fuel_consumption_model, co2_params_calibrated=None): """ Calculates fuel consumption at hot idle engine speed [g/s]. :param idle_fuel_consumption_model: Idle fuel consumption model. :type idle_fuel_consumption_model: IdleFuelConsumptionModel :param co2_params_calibrated: CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg). The missing parameters are set equal to zero. :type co2_params_calibrated: lmfit.Parameters :return: Fuel consumption at hot idle engine speed [g/s]. :rtype: float """ return idle_fuel_consumption_model.consumption(co2_params_calibrated)[0]
def _yield_factors(param_id, factor): try: for k, v in factor.get(param_id, {}).items(): yield k, v, 1 except TypeError: p = {} def _defaults(): return (np.zeros_like(param_id, dtype=float), np.zeros_like(param_id, dtype=int)) for m in np.unique(param_id): if not isinstance(m, np.ma.core.MaskedConstant): b = m == param_id for k, v in factor.get(m, {}).items(): j, i = sh.get_nested_dicts(p, k, default=_defaults) j[b], i[b] = v, 1 for k, (j, n) in p.items(): b = n == 0 j[b], n[b] = 1, 1 yield k, j, n def _tech_mult_factors(**params): p = {} # noinspection PyProtectedMember factors = dfl.functions._tech_mult_factors.factors for k, v in factors.items(): for i, j, n in _yield_factors(params.get(k, 0), v): s = sh.get_nested_dicts(p, i, default=lambda: [0, 0]) s[0] += j s[1] += n for k, (n, d) in p.items(): m = n / d params[k] = m * params[k] return params # noinspection PyUnusedLocal,PyPep8Naming def _ABC( n_speeds, n_powers=0, n_temperatures=1, a2=0, b2=0, a=0, b=0, c=0, t=0, l=0, l2=0, acr=1, **kw): acr2 = (acr ** 2) A = a2 / acr2 + (b2 / acr2) * n_speeds B = a / acr + (b / acr + (c / acr) * n_speeds) * n_speeds C = l + l2 * n_speeds ** 2 if n_temperatures is not 1: C *= np.power(n_temperatures, -t) C -= n_powers / acr return A, B, C # noinspection PyPep8Naming def _fuel_ABC(n_speeds, **kw): return _ABC(n_speeds, **_tech_mult_factors(**kw)) # noinspection PyPep8Naming def _calculate_fc(A, B, C): b = np.array(A, dtype=bool) if b.all(): v = np.sqrt(np.abs(B ** 2 - 4.0 * A * C)) return (-B + v) / (2 * A), v elif ~b.all(): return -C / B, B else: fc, v = np.zeros_like(C), np.zeros_like(C) fc[b], v[b] = _calculate_fc(A[b], B[b], C[b]) b = ~b fc[b], v[b] = _calculate_fc(A[b], B[b], C[b]) return fc, v # noinspection PyMissingOrEmptyDocstring
[docs]class FMEP:
[docs] def __init__(self, full_bmep_curve, active_cylinder_ratios=(1.0,), has_cylinder_deactivation=False, acr_full_bmep_curve_percentage=0.5, acr_max_mean_piston_speeds=12.0, acr_min_mean_piston_speeds=3.0, acr_after_treatment_temp=-273.0, has_variable_valve_actuation=False, has_lean_burn=False, lb_max_mean_piston_speeds=12.0, lb_full_bmep_curve_percentage=0.4, has_exhausted_gas_recirculation=False, has_selective_catalytic_reduction=False, egr_max_mean_piston_speeds=12.0, egr_full_bmep_curve_percentage=0.7, engine_type=None): if active_cylinder_ratios: self.base_acr = max(active_cylinder_ratios) else: self.base_acr = 1.0 self.active_cylinder_ratios = set(active_cylinder_ratios) self.active_cylinder_ratios -= {self.base_acr} self.fbc = full_bmep_curve self.has_cylinder_deactivation = has_cylinder_deactivation self.acr_after_treatment_temp = float(acr_after_treatment_temp) self.acr_max_mean_piston_speeds = float(acr_max_mean_piston_speeds) self.acr_min_mean_piston_speeds = float(acr_min_mean_piston_speeds) self.acr_fbc_percentage = acr_full_bmep_curve_percentage self.has_variable_valve_actuation = has_variable_valve_actuation self.has_lean_burn = has_lean_burn self.lb_max_mean_piston_speeds = float(lb_max_mean_piston_speeds) self.lb_fbc_percentage = lb_full_bmep_curve_percentage self.lb_n_temp_min = 0.5 self.has_exhausted_gas_recirculation = has_exhausted_gas_recirculation self.has_selective_catalytic_reduction = \ has_selective_catalytic_reduction self.egr_max_mean_piston_speeds = float(egr_max_mean_piston_speeds) self.egr_fbc_percentage = egr_full_bmep_curve_percentage self.engine_type = engine_type
[docs] def vva(self, params, n_powers, a=None): a = a or {} if self.has_variable_valve_actuation and 'vva' not in params: a['vva'] = ((0, True), (1, n_powers >= 0)) return a
[docs] def lb(self, params, n_speeds, n_powers, n_temp, a=None): a = a or {} if self.has_lean_burn and 'lb' not in params: b = n_temp >= self.lb_n_temp_min b &= n_speeds < self.lb_max_mean_piston_speeds b &= n_powers <= (self.fbc(n_speeds) * self.lb_fbc_percentage) a['lb'] = ((0, True), (1, b)) return a
[docs] def egr(self, params, n_speeds, n_powers, n_temp, a=None): a = a or {} if self.has_exhausted_gas_recirculation and 'egr' not in params: # b = n_speeds < self.egr_max_mean_piston_speeds # b &= n_powers <= (self.fbc(n_speeds) * self.egr_fbc_percentage) k = self.engine_type, self.has_selective_catalytic_reduction egr = dfl.functions.FMEP_egr.egr_fact_map[k] if k[0] == 'compression': if k[1]: b = n_temp < 1 else: b = n_speeds < self.egr_max_mean_piston_speeds b &= n_powers <= ( self.fbc(n_speeds) * self.egr_fbc_percentage ) if b is True: a['egr'] = (egr, True), elif b is False: a['egr'] = (0, True), else: a['egr'] = (np.where(b, egr, 0), True), else: a['egr'] = ((0, True), (egr, True)) return a
[docs] def acr(self, params, n_speeds, n_powers, n_temp, a=None, acr_valid=None): a = a or {'acr': [(self.base_acr, True)]} if self.has_cylinder_deactivation and self.active_cylinder_ratios and \ 'acr' not in params: l = a['acr'] at_n_temp = _normalized_engine_coolant_temperatures( self.acr_after_treatment_temp, params['trg'] ) b = (n_temp > at_n_temp) & (n_powers > 0) b &= (self.acr_min_mean_piston_speeds < n_speeds) b &= (n_speeds < self.acr_max_mean_piston_speeds) if acr_valid is not None: b &= acr_valid ac = self.fbc(n_speeds) * self.acr_fbc_percentage ac = n_powers / np.maximum(dfl.EPS, ac) for acr in sorted(self.active_cylinder_ratios): l.append((acr, b & (ac < acr))) return a
@staticmethod def _check_combinations(a): out = {} for k, v in a.items(): for i in v: try: if i[1] is True or i[1].any(): sh.get_nested_dicts(out, k, default=list).append(i) except AttributeError: pass return out
[docs] def combination(self, params, n_speeds, n_powers, n_temp, acr_valid=None): a = self.acr(params, n_speeds, n_powers, n_temp, acr_valid=acr_valid) a = self.lb(params, n_speeds, n_powers, n_temp, a=a) a = self.vva(params, n_powers, a=a) a = self.egr(params, n_speeds, n_powers, n_temp, a=a) a = self._check_combinations(a) keys, c = zip(*sorted(a.items())) p = params.copy() p.update({ 'n_speeds': n_speeds, 'n_powers': n_powers, 'n_temperatures': n_temp }) for s in itertools.product(*c): b, d = True, {} for k, (v, n) in zip(keys, s): b &= n d[k] = v try: if b is False or not b.any(): continue if b.all(): b = True except AttributeError: pass p.update(d) yield {k: self.g(v, b) for k, v in p.items()}, d, b
[docs] @staticmethod def g(data, b): if b is not True: try: return np.ma.masked_where(~b, data, copy=False) except (IndexError, TypeError): pass return data
def __call__(self, params, n_speeds, n_powers, n_temp, acr_valid=None): it = self.combination(params, n_speeds, n_powers, n_temp, acr_valid) s = None for p, d, n in it: d['fmep'], d['v'] = _calculate_fc(*_fuel_ABC(**p)) if s is None: s = d else: b = d['fmep'] < s['fmep'] if n is True and b is True: s = d elif b is not False: n &= b if n.all(): s = d elif n.any(): for k, v in d.items(): s[k] = np.where(n, v, s[k]) acr = s.get('acr', params.get('acr', self.base_acr)) vva = s.get('vva', params.get('vva', 0)) lb = s.get('lb', params.get('lb', 0)) egr = s.get('egr', params.get('egr', 0)) return s['fmep'], s['v'], acr, vva, lb, egr
[docs]@sh.add_function(dsp, outputs=['cylinder_deactivation_valid_phases']) def calculate_cylinder_deactivation_valid_phases(engine_inertia_powers_losses): """ Calculates valid activation phases for cylinder deactivation. :param engine_inertia_powers_losses: Engine power losses due to inertia [kW]. :type engine_inertia_powers_losses: numpy.array :return: Valid activation phases for cylinder deactivation. :rtype: numpy.array """ p = dfl.functions.calculate_cylinder_deactivation_valid_phases.LIMIT return engine_inertia_powers_losses <= p
dsp.add_data('active_cylinder_ratios', dfl.values.active_cylinder_ratios) dsp.add_data( 'engine_has_cylinder_deactivation', dfl.values.engine_has_cylinder_deactivation ) dsp.add_data( 'engine_has_variable_valve_actuation', dfl.values.engine_has_variable_valve_actuation ) dsp.add_data('has_lean_burn', dfl.values.has_lean_burn) dsp.add_data( 'has_selective_catalytic_reduction', dfl.values.has_selective_catalytic_reduction )
[docs]@sh.add_function(dsp, outputs=['fmep_model']) def define_fmep_model( full_bmep_curve, engine_max_speed, engine_stroke, active_cylinder_ratios, engine_has_cylinder_deactivation, engine_has_variable_valve_actuation, has_lean_burn, has_exhausted_gas_recirculation, has_selective_catalytic_reduction, engine_type, idle_engine_speed, after_treatment_temperature_threshold): """ Defines the vehicle FMEP model. :param full_bmep_curve: Vehicle full bmep curve. :type full_bmep_curve: function :param engine_max_speed: Maximum allowed engine speed [RPM]. :type engine_max_speed: float :param engine_stroke: Engine stroke [mm]. :type engine_stroke: float :param active_cylinder_ratios: Possible active cylinder ratios [-]. :type active_cylinder_ratios: tuple[float] :param engine_has_cylinder_deactivation: Does the engine have cylinder deactivation technology? :type engine_has_cylinder_deactivation: bool :param engine_has_variable_valve_actuation: Does the engine feature variable valve actuation? [-]. :type engine_has_variable_valve_actuation: bool :param has_lean_burn: Does the engine have lean burn technology? :type has_lean_burn: bool :param has_exhausted_gas_recirculation: Does the engine have exhaust gas recirculation technology? :type has_exhausted_gas_recirculation: bool :param has_selective_catalytic_reduction: Does the engine have selective catalytic reduction technology? :type has_selective_catalytic_reduction: bool :param engine_type: Engine type (positive turbo, positive natural aspiration, compression). :type engine_type: str :param idle_engine_speed: Engine speed idle median and std [RPM]. :type idle_engine_speed: (float, float) :param after_treatment_temperature_threshold: Engine coolant temperature threshold when the after treatment system is warm [°C]. :type after_treatment_temperature_threshold: (float, float) :return: Vehicle FMEP model. :rtype: FMEP """ d = dfl.functions.define_fmep_model acr_fbcp = d.acr_full_bmep_curve_percentage lb_fbcp = d.lb_full_bmep_curve_percentage egr_fbcp = d.egr_full_bmep_curve_percentage acr_maps = d.acr_max_mean_piston_speeds_percentage * engine_max_speed acr_mips = d.acr_min_mean_piston_speeds_percentage * idle_engine_speed[0] lb_mps = d.lb_max_mean_piston_speeds_percentage * engine_max_speed egr_mps = d.egr_max_mean_piston_speeds_percentage * engine_max_speed from . import calculate_mean_piston_speeds bmep = calculate_mean_piston_speeds model = FMEP( full_bmep_curve, active_cylinder_ratios=active_cylinder_ratios, has_cylinder_deactivation=engine_has_cylinder_deactivation, acr_full_bmep_curve_percentage=acr_fbcp, acr_max_mean_piston_speeds=bmep(acr_maps, engine_stroke), acr_min_mean_piston_speeds=bmep(acr_mips, engine_stroke), acr_after_treatment_temp=after_treatment_temperature_threshold[0], has_variable_valve_actuation=engine_has_variable_valve_actuation, has_lean_burn=has_lean_burn, lb_max_mean_piston_speeds=bmep(lb_mps, engine_stroke), lb_full_bmep_curve_percentage=lb_fbcp, has_exhausted_gas_recirculation=has_exhausted_gas_recirculation, has_selective_catalytic_reduction=has_selective_catalytic_reduction, egr_max_mean_piston_speeds=bmep(egr_mps, engine_stroke), egr_full_bmep_curve_percentage=egr_fbcp, engine_type=engine_type ) return model
# noinspection PyUnresolvedReferences
[docs]@sh.add_function(dsp, outputs=['fuel_map']) def define_fuel_map( idle_engine_speed, engine_capacity, co2_params_calibrated, fmep_model, engine_fuel_lower_heating_value, engine_stroke, full_load_speeds, full_load_powers): """ Define fuel consumption map [RPM, kW, g/s]. :param idle_engine_speed: Idle engine speed and its standard deviation [RPM]. :type idle_engine_speed: (float, float) :param full_load_speeds: T1 map speed vector [RPM]. :type full_load_speeds: numpy.array :param full_load_powers: T1 map power vector [kW]. :type full_load_powers: numpy.array :param co2_params_calibrated: CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg). The missing parameters are set equal to zero. :type co2_params_calibrated: lmfit.Parameters :param fmep_model: Engine FMEP model. :type fmep_model: FMEP :param engine_fuel_lower_heating_value: Fuel lower heating value [kJ/kg]. :type engine_fuel_lower_heating_value: float :param engine_stroke: Engine stroke [mm]. :type engine_stroke: float :param engine_capacity: Engine capacity [cm3]. :type engine_capacity: float :return: Fuel consumption map [RPM, kW, g/s]. :rtype: dict """ from . import calculate_mean_piston_speeds speed = np.linspace(full_load_speeds[0], full_load_speeds[-1], 100).tolist() speed = np.unique(speed + list(full_load_speeds)) p = co2_params_calibrated.valuesdict() lhv = engine_fuel_lower_heating_value par = dfl.functions.calculate_co2_emissions idle_cutoff = idle_engine_speed[0] * par.cutoff_idle_ratio ec_p0 = _calculate_p0( fmep_model, p, engine_capacity, engine_stroke, idle_cutoff, lhv ) flp = np.interp(speed, full_load_speeds, full_load_powers) power = np.linspace(ec_p0, np.max(full_load_powers), 100).tolist() power = np.unique(power + list(full_load_powers) + list(flp) + [0]) e_s, e_p = np.meshgrid(speed, power, indexing='ij') n_s = calculate_mean_piston_speeds(e_s, engine_stroke) n_p = calculate_brake_mean_effective_pressures(e_s, e_p, engine_capacity, 0) fc = np.maximum(0, fmep_model(p, n_s, n_p, 1)[0]) fc *= e_s * (engine_capacity / (lhv * 1200)) # [g/sec] fc[np.argmax(fc, axis=1)[:, None] < np.arange(fc.shape[1])] = np.nan b = ~np.isnan(fc).all(0) return dict( speed=speed.tolist(), power=power[b].tolist(), fuel=fc[:, b].tolist() )
def _calculate_p0( fmep_model, params, engine_capacity, engine_stroke, idle_engine_speed_median, engine_fuel_lower_heating_value): """ Calculates the engine power threshold limit [kW]. :param params: CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg). The missing parameters are set equal to zero. :type params: dict :param engine_capacity: Engine capacity [cm3]. :type engine_capacity: float :param engine_stroke: Engine stroke [mm]. :type engine_stroke: float :param idle_engine_speed_median: Engine speed idle median [RPM]. :type idle_engine_speed_median: float :param engine_fuel_lower_heating_value: Fuel lower heating value [kJ/kg]. :type engine_fuel_lower_heating_value: float :param fmep_model: Engine FMEP model. :type fmep_model: FMEP :return: Engine power threshold limit [kW]. :rtype: float """ engine_cm_idle = idle_engine_speed_median * engine_stroke / 30000.0 lhv = engine_fuel_lower_heating_value wfb_idle, wfa_idle = fmep_model(params, engine_cm_idle, 0, 1)[:2] wfa_idle = (3600000.0 / lhv) / wfa_idle wfb_idle *= (3.0 * engine_capacity / lhv * idle_engine_speed_median) return -wfb_idle / wfa_idle def _apply_ac_phases(func, fmep_model, params, *args, ac_phases=None): res_with_ac = func(fmep_model, params, *args) if ac_phases is None or ac_phases.all() or not ( fmep_model.has_cylinder_deactivation and fmep_model.active_cylinder_ratios): return res_with_ac else: p = params.copy() p['acr'] = fmep_model.base_acr return np.where(ac_phases, res_with_ac, func(fmep_model, p, *args)) def _normalized_engine_coolant_temperatures( engine_coolant_temperatures, temperature_target): """ Calculates the normalized engine coolant temperatures [-]. ..note:: Engine coolant temperatures are first converted in kelvin and then normalized. The results is between ``[0, 1]``. :param engine_coolant_temperatures: Engine coolant temperature vector [°C]. :type engine_coolant_temperatures: numpy.array :param temperature_target: Normalization temperature [°C]. :type temperature_target: float :return: Normalized engine coolant temperature [-]. :rtype: numpy.array """ temp = (engine_coolant_temperatures + 273.0) / (temperature_target + 273.0) if isinstance(temp, np.ndarray): temp[np.searchsorted(temp, (1,))[0]:] = 1 return np.minimum(1, temp) def _calculate_co2_emissions( time_series, engine_fuel_lower_heating_value, idle_engine_speed, engine_stroke, engine_capacity, idle_fuel_consumption_model, fuel_carbon_content, min_engine_on_speed, tau_function, fmep_model, params, sub_values=None): """ Calculates CO2 emissions [CO2g/s]. :param time_series: Engine speed vector [RPM], Engine power vector [kW], Engine coolant temperature vector [°C], Mean piston speed vector [m/s], and Engine brake mean effective pressure vector [bar]. :type time_series: numpy.array :param engine_fuel_lower_heating_value: Fuel lower heating value [kJ/kg]. :type engine_fuel_lower_heating_value: float :param idle_engine_speed: Engine speed idle median and std [RPM]. :type idle_engine_speed: (float, float) :param engine_stroke: Engine stroke [mm]. :type engine_stroke: float :param engine_capacity: Engine capacity [cm3]. :type engine_capacity: float :param idle_fuel_consumption_model: Model of fuel consumption at hot idle engine speed. :type idle_fuel_consumption_model: IdleFuelConsumptionModel :param fuel_carbon_content: Fuel carbon content [CO2g/g]. :type fuel_carbon_content: float :param min_engine_on_speed: Minimum engine speed to consider the engine to be on [RPM]. :type min_engine_on_speed: float :param tau_function: Tau-function of the extended Willans curve. :type tau_function: callable :param fmep_model: Engine FMEP model. :type fmep_model: FMEP :param params: CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg). The missing parameters are set equal to zero. :type params: lmfit.Parameters :param sub_values: Boolean vector. :type sub_values: numpy.array, optional :return: CO2 emissions vector [CO2g/s]. :rtype: numpy.array """ p = params.valuesdict() # namespace shortcuts if sub_values is not None: e_s, e_p, e_t, n_s, n_p, acr_v = time_series[:, sub_values] else: e_s, e_p, e_t, n_s, n_p, acr_v = time_series lhv, acr_v = engine_fuel_lower_heating_value, acr_v.astype(bool) idle_fc_model = idle_fuel_consumption_model.consumption fc, ac, vva, lb, egr = np.zeros((5, len(e_p)), dtype=float) ac[:] = 1 # Idle fc correction for temperature n = (e_s < idle_engine_speed[0] + min_engine_on_speed) _b = (e_s >= min_engine_on_speed) par = dfl.functions.calculate_co2_emissions idle_cutoff = idle_engine_speed[0] * par.cutoff_idle_ratio if p['t0'] == 0 and p['t1'] == 0: ac_phases, n_t = np.ones_like(e_p, dtype=bool), 1 ec_p0 = _calculate_p0( fmep_model, p, engine_capacity, engine_stroke, idle_cutoff, lhv ) _b &= ~((e_p <= ec_p0) & (e_s > idle_cutoff)) b = n & _b fc[b], ac[b], vva[b], lb[b], egr[b] = idle_fc_model(p) b = ~n & _b else: p['t'] = tau_function(p['t0'], p['t1'], e_t) n_t = _normalized_engine_coolant_temperatures(e_t, p['trg']) at_n_temp = _normalized_engine_coolant_temperatures( fmep_model.acr_after_treatment_temp, p['trg'] ) ac_phases = (n_t > at_n_temp) & acr_v ec_p0 = _apply_ac_phases( _calculate_p0, fmep_model, p, engine_capacity, engine_stroke, idle_cutoff, lhv, ac_phases=ac_phases ) _b &= ~((e_p <= ec_p0) & (e_s > idle_cutoff)) b = n & _b # noinspection PyUnresolvedReferences idle_fc, ac[b], vva[b], lb[b], egr[b] = idle_fc_model(p, ac_phases[b]) fc[b] = idle_fc * np.power(n_t[b], -p['t'][b]) b = ~n & _b p['t'] = p['t'][b] n_t = n_t[b] fc[b], _, ac[b], vva[b], lb[b], egr[b] = fmep_model( p, n_s[b], n_p[b], n_t, acr_valid=acr_v[b] ) fc[b] *= e_s[b] * (engine_capacity / (lhv * 1200)) # [g/sec] fc[fc < 0] = 0 co2 = fc * fuel_carbon_content return np.nan_to_num(co2), ac, vva, lb, egr # noinspection PyUnusedLocal
[docs]@sh.add_function(dsp, outputs=['co2_emissions_model']) def define_co2_emissions_model( engine_speeds_out, engine_powers_out, mean_piston_speeds, brake_mean_effective_pressures, engine_coolant_temperatures, on_engine, cylinder_deactivation_valid_phases, engine_fuel_lower_heating_value, idle_engine_speed, engine_stroke, engine_capacity, idle_fuel_consumption_model, fuel_carbon_content, min_engine_on_speed, tau_function, fmep_model): """ Returns CO2 emissions model (see :func:`calculate_co2_emissions`). :param engine_speeds_out: Engine speed vector [RPM]. :type engine_speeds_out: numpy.array :param engine_powers_out: Engine power vector [kW]. :type engine_powers_out: numpy.array :param mean_piston_speeds: Mean piston speed vector [m/s]. :type mean_piston_speeds: numpy.array :param brake_mean_effective_pressures: Engine brake mean effective pressure vector [bar]. :type brake_mean_effective_pressures: numpy.array :param engine_coolant_temperatures: Engine coolant temperature vector [°C]. :type engine_coolant_temperatures: numpy.array :param on_engine: If the engine is on [-]. :type on_engine: numpy.array :param cylinder_deactivation_valid_phases: Valid activation phases for cylinder deactivation. :type cylinder_deactivation_valid_phases: numpy.array :param engine_fuel_lower_heating_value: Fuel lower heating value [kJ/kg]. :type engine_fuel_lower_heating_value: float :param idle_engine_speed: Engine speed idle median and std [RPM]. :type idle_engine_speed: (float, float) :param engine_stroke: Engine stroke [mm]. :type engine_stroke: float :param engine_capacity: Engine capacity [cm3]. :type engine_capacity: float :param idle_fuel_consumption_model: Idle fuel consumption model. :type idle_fuel_consumption_model: IdleFuelConsumptionModel :param fuel_carbon_content: Fuel carbon content [CO2g/g]. :type fuel_carbon_content: float :param min_engine_on_speed: Minimum engine speed to consider the engine to be on [RPM]. :type min_engine_on_speed: float :param tau_function: Tau-function of the extended Willans curve. :type tau_function: callable :param fmep_model: Engine FMEP model. :type fmep_model: FMEP :return: CO2 emissions model (co2_emissions = models(params)). :rtype: callable """ ts = ( engine_speeds_out, engine_powers_out, engine_coolant_temperatures, mean_piston_speeds, brake_mean_effective_pressures, cylinder_deactivation_valid_phases ) model = functools.partial( _calculate_co2_emissions, np.array(ts, copy=False), engine_fuel_lower_heating_value, idle_engine_speed, engine_stroke, engine_capacity, idle_fuel_consumption_model, fuel_carbon_content, min_engine_on_speed, tau_function, fmep_model ) return model
def _select_initial_friction_params(co2_params_initial_guess): """ Selects initial guess of friction params l & l2 for the calculation of the motoring curve. :param co2_params_initial_guess: Initial guess of CO2 emission model params. :type co2_params_initial_guess: lmfit.Parameters :return: Initial guess of friction params l & l2. :rtype: float, float """ params = co2_params_initial_guess.valuesdict() return sh.selector(('l', 'l2'), params, output_type='list') # noinspection PyUnusedLocal def _missing_co2_params(params, *args, _not=False): """ Checks if all co2_params are not defined. :param params: CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg). :type params: dict | lmfit.Parameters :param _not: If True the function checks if not all co2_params are defined. :type _not: bool :return: If is missing some parameter. :rtype: bool """ s = {'a', 'b', 'c', 'a2', 'b2', 'l', 'l2', 't0', 'dt', 'trg'} if _not: return set(params).issuperset(s) return not set(params).issuperset(s)
[docs]@sh.add_function( dsp, inputs_kwargs=True, inputs_defaults=True, outputs=['co2_params_initial_guess', 'initial_friction_params'] ) def define_initial_co2_emission_model_params_guess( co2_params, engine_type, engine_thermostat_temperature, engine_thermostat_temperature_window, engine_n_cylinders=dfl.values.engine_n_cylinders, is_cycle_hot=dfl.values.is_cycle_hot): """ Selects initial guess and bounds of co2 emission model params. :param co2_params: CO2 emission model params (a2, b2, a, b, c, l, l2, t, trg). :type co2_params: dict :param engine_type: Engine type (positive turbo, positive natural aspiration, compression). :type engine_type: str :param engine_thermostat_temperature: Engine normalization temperature [°C]. :type engine_thermostat_temperature: float :param engine_thermostat_temperature_window: Thermostat engine temperature limits [°C]. :type engine_thermostat_temperature_window: (float, float) :param engine_n_cylinders: Number of engine cylinders [-]. :type engine_n_cylinders: int :param is_cycle_hot: Is an hot cycle? :type is_cycle_hot: bool, optional :return: Initial guess of co2 emission model params and of friction params. :rtype: lmfit.Parameters, list[float] """ import lmfit import collections bounds = {} # Parameters bounds. par = dfl.functions.define_initial_co2_emission_model_params_guess default = collections.OrderedDict( copy.deepcopy(par.CO2_PARAMS[engine_type]) ) default['trg'] = { 'value': engine_thermostat_temperature, 'min': engine_thermostat_temperature_window[0], 'max': engine_thermostat_temperature_window[1], 'vary': False } keys, n = ('l', 'l2'), engine_n_cylinders / default.pop('n_cylinders', 4) for d in sh.selector(keys, default, allow_miss=True).values(): for k in {'value', 'min', 'max'}.intersection(d): d[k] *= n if is_cycle_hot: default['t1'].update({'value': 0.0, 'vary': False}) default['dt'].update({'value': 0.0, 'vary': False}) p = lmfit.Parameters() eps = dfl.EPS for k, kw in default.items(): kw['name'] = k kw['value'] = co2_params.get(k, kw.get('value', None)) if k in bounds: b = bounds[k] kw['min'] = b.get('min', kw.get('min', None)) kw['max'] = b.get('max', kw.get('max', None)) kw['vary'] = b.get('vary', kw.get('vary', True)) elif 'vary' not in kw: kw['vary'] = k not in co2_params if kw['value'] is not None: if 'min' in kw and kw['value'] < kw['min']: kw['min'] = kw['value'] - eps if 'max' in kw and kw['value'] > kw['max']: kw['max'] = kw['value'] + eps if 'min' in kw and 'max' in kw and kw['min'] == kw['max']: kw['vary'] = False # noinspection PyTypeChecker kw['max'] = kw['min'] = None kw['min'] = kw.get('min', None) kw['max'] = kw.get('max', None) p.add(**kw) friction_params = _select_initial_friction_params(p) if not _missing_co2_params(co2_params): p = sh.NONE return p, friction_params
[docs]@sh.add_function(dsp, outputs=['extended_phases_distances']) def calculate_extended_phases_distances( times, extended_phases_integration_times, velocities): """ Calculates extended cycle phases distances [km]. :param times: Time vector [s]. :type times: numpy.array :param extended_phases_integration_times: Extended cycle phases integration times [s]. :type extended_phases_integration_times: tuple :param velocities: Velocity vector [km/h]. :type velocities: numpy.array :return: Extended cycle phases distances [km]. :rtype: numpy.array """ from ..co2 import calculate_phases_distances, identify_phases_indices indices = identify_phases_indices(times, extended_phases_integration_times) return calculate_phases_distances(times, indices, velocities)
[docs]@sh.add_function(dsp, outputs=['extended_phases_co2_emissions']) def calculate_extended_phases_co2_emissions( extended_cumulative_co2_emissions, extended_phases_distances): """ Calculates the extended CO2 emission of cycle phases [CO2g/km]. :param extended_cumulative_co2_emissions: Extended cumulative CO2 of cycle phases [CO2g]. :type extended_cumulative_co2_emissions: numpy.array :param extended_phases_distances: Extended cycle phases distances [km]. :type extended_phases_distances: numpy.array :return: Extended CO2 emission of cycle phases [CO2g/km]. :rtype: numpy.array """ with np.errstate(divide='ignore', invalid='ignore'): x = extended_cumulative_co2_emissions return np.nan_to_num(x / extended_phases_distances)
dsp.add_function(function=sh.bypass, weight=300, inputs=[ 'phases_integration_times', 'cumulative_co2_emissions', 'phases_distances' ], outputs=[ 'extended_phases_integration_times', 'extended_cumulative_co2_emissions', 'extended_phases_distances' ])
[docs]@sh.add_function(dsp, outputs=['cumulative_co2_emissions']) def calculate_cumulative_co2_v1(phases_co2_emissions, phases_distances): """ Calculates cumulative CO2 of cycle phases [CO2g]. :param phases_co2_emissions: CO2 emission of cycle phases [CO2g/km]. :type phases_co2_emissions: numpy.array :param phases_distances: Cycle phases distances [km]. :type phases_distances: numpy.array :return: Cumulative CO2 of cycle phases [CO2g]. :rtype: numpy.array """ return phases_co2_emissions * phases_distances
def _define_rescaling_function( co2_emissions_model, cumulative_co2_emissions, phases_integration_times, times, rescaling_matrix): dx, it = np.append(np.diff(times), [0]), [] for ii, jj in np.searchsorted(times, phases_integration_times): d = dx[ii:jj].copy() d[1:-1] = d[1:-1] + d[:-2] it.append((ii, jj, d[:, None] * rescaling_matrix[ii:jj, :] / 2)) def _rescaling_function(params_initial_guess): co2_emissions = co2_emissions_model(params_initial_guess)[0] mtx = [np.sum(co2_emissions[i:j, None] * m, 0) for i, j, m in it] k_factors = np.linalg.lstsq(mtx, cumulative_co2_emissions, rcond=-1)[0] co2_emissions *= np.dot(rescaling_matrix, k_factors) return co2_emissions, k_factors return _rescaling_function def _rescaling_matrix( phases_integration_times, times, velocities, stop_velocity): from scipy.interpolate import interp1d # noinspection PyProtectedMember d, eps = dfl.functions._rescaling_matrix, dfl.EPS a, b = np.array([-1, 1]) * d.a / 2, d.b pit = np.array(phases_integration_times) mean = np.mean(pit, 1) points = np.zeros((len(phases_integration_times), 4), float) points[0, 0], points[-1, 3] = -np.inf, np.inf points[1:, 0] = (pit[1:, 0] - mean[:-1]) * (1 - b) + mean[:-1] - eps points[:, 1:3] = np.column_stack((mean,) * 2) + np.diff(pit, axis=1) * a points[:-1, 3] = (mean[1:] - pit[:-1, 1]) * b + pit[:-1, 1] r, y = [], (0, 1, 1, 0) for x in points: r.append(interp1d(x, y, bounds_error=False, fill_value=0)(times)) r = np.column_stack(r) r[np.isnan(r)] = 1 b = np.asarray(velocities <= stop_velocity, int) # noinspection PyTypeChecker it = np.split(range(b.size), np.where(np.diff(b) != 0)[0] + 1)[1 - b[0]::2] for x in it: i = list(itertools.chain(*(np.where(v > 0)[0] for v in r[x]))) r[x] = 0 r[x, int(np.median(i))] = 1 return r / np.sum(r, 1)[:, None] def _rescaling_score(times, rescaling_matrix, k): x = np.dot(rescaling_matrix, k) m = np.trapz(x, times) / (times[-1] - times[0]) std = np.sqrt(np.trapz((x - m) ** 2, times) / (times[-1] - times[0])) return m, std
[docs]@sh.add_function(dsp, weight=5, outputs=[ 'identified_co2_emissions', 'co2_rescaling_scores', 'co2_params_identified' ]) def identify_co2_emissions( co2_emissions_model, co2_params_initial_guess, times, extended_phases_integration_times, extended_cumulative_co2_emissions, engine_coolant_temperatures, is_cycle_hot, velocities, stop_velocity): """ Identifies instantaneous CO2 emission vector [CO2g/s]. :param co2_emissions_model: CO2 emissions model (co2_emissions = models(params)). :type co2_emissions_model: callable :param co2_params_initial_guess: Initial guess of co2 emission model params. :type co2_params_initial_guess: dict :param times: Time vector [s]. :type times: numpy.array :param extended_phases_integration_times: Extended cycle phases integration times [s]. :type extended_phases_integration_times: tuple :param extended_cumulative_co2_emissions: Extended cumulative CO2 of cycle phases [CO2g]. :type extended_cumulative_co2_emissions: numpy.array :param engine_coolant_temperatures: Engine coolant temperature vector [°C]. :type engine_coolant_temperatures: numpy.array :param is_cycle_hot: Is an hot cycle? :type is_cycle_hot: bool :param velocities: Velocity vector [km/h]. :type velocities: numpy.array :param stop_velocity: Maximum velocity to consider the vehicle stopped [km/h]. :type stop_velocity: float :return: The instantaneous CO2 emission vector [CO2g/s], rescaling scores (i.e., mean, std, and number of perturbations) [-], and the identified initial guess of co2 emission model params. :rtype: numpy.array, tuple[float], dict """ p = co2_params_initial_guess rescaling_matrix = _rescaling_matrix( extended_phases_integration_times, times, velocities, stop_velocity ) rescale = _define_rescaling_function( co2_emissions_model, extended_cumulative_co2_emissions, extended_phases_integration_times, times, rescaling_matrix ) d = dfl.functions.identify_co2_emissions n, (co2, k0) = 0, rescale(p) if d.enable_first_step or d.enable_second_step or d.enable_third_step: calibrate = functools.partial( calibrate_co2_params, is_cycle_hot, engine_coolant_temperatures, co2_emissions_model, _1st_step=d.enable_first_step, _2nd_step=d.enable_second_step, _3rd_step=d.enable_third_step, ) xatol = d.xatol for n in range(d.n_perturbations): p = calibrate(co2, p)[0] co2, k1 = rescale(p) if np.max(np.abs(k1 - k0)) <= xatol: k0 = k1 break k0 = k1 return co2, _rescaling_score(times, rescaling_matrix, k0) + (n,), p
[docs]@sh.add_function(dsp, outputs=[ 'identified_co2_emissions', 'co2_rescaling_scores', 'co2_params_identified' ]) def fake_identification_co2_emissions(co2_emissions, co2_params_initial_guess): """ Identifies instantaneous CO2 emission vector [CO2g/s]. :param co2_emissions: CO2 instantaneous emissions vector [CO2g/s]. :type co2_emissions: numpy.array :param co2_params_initial_guess: Initial guess of co2 emission model params. :type co2_params_initial_guess: dict :return: The instantaneous CO2 emission vector [CO2g/s], rescaling scores (i.e., mean, std, and number of perturbations) [-], and the identified initial guess of co2 emission model params. :rtype: numpy.array, tuple[float], dict """ return co2_emissions, (1.0, 0, 0), co2_params_initial_guess
def _define_co2_error(co2_emissions_model, co2_emissions): def _error_func(params, sub_values=None): x = co2_emissions if sub_values is None else co2_emissions[sub_values] y = co2_emissions_model(params, sub_values=sub_values)[0] return co2_utl.mae(x, y) return _error_func def _set_attr(params, data, default=False, attr='vary'): """ Set attribute to CO2 emission model parameters. :param params: CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg). :type params: lmfit.Parameters :param data: Parameter ids to be set or key/value to set. :type data: iterable | dict :param default: Default value to set if a list of parameters ids is provided. :type default: bool | float :param attr: Parameter attribute to set. :type attr: str :return: CO2 emission model parameters. :rtype: lmfit.Parameters """ import lmfit if not isinstance(data, dict): data = dict.fromkeys(data, default) d = {'min', 'max', 'value', 'vary', 'expr'} - {attr} for k, v in data.items(): p = params[k] s = {i: getattr(p, i) for i in d} s[attr] = v p.set(**s) if lmfit.parameter.isclose(p.min, p.max, atol=1e-13, rtol=1e-13): p.set(value=np.mean((p.max, p.min)), min=None, max=None, vary=False) return params def _identify_cold_phase(p, is_cycle_hot, engine_coolant_temperatures): cold = np.zeros_like(engine_coolant_temperatures, dtype=bool) if not is_cycle_hot: i = co2_utl.argmax(engine_coolant_temperatures >= p['trg'].value) cold[:i] = True return cold def _calibrate_model_params( error_function, params, *args, method='nelder', **kws): """ Calibrates the model params minimising the error_function. :param error_function: Model error function. :type error_function: callable :param params: Initial guess of model params. If not specified a brute force is used to identify the best initial guess with in the bounds. :type params: dict, optional :param method: Name of the fitting method to use. :type method: str, optional :return: Calibrated model params. :rtype: dict """ if not any(p.vary for p in params.values()): return params, True import lmfit if callable(error_function): _error_f = error_function else: def _error_f(p, *a, **k): return sum(f(p, *a, **k) for f in error_function) min_e_and_p = [np.inf, copy.deepcopy(params)] def _err_func(p, *a, **kwargs): r = np.float32(_error_f(p, *a, **kwargs)) if r < min_e_and_p[0]: min_e_and_p[0], min_e_and_p[1] = (r, copy.deepcopy(p)) return r # See #7: Neither BFGS nor SLSQP fix "solution families". # leastsq: Improper input: N=6 must not exceed M=1. # nelder is stable (297 runs, 14 vehicles) [average time 181s/14 vehicles]. # lbfgsb is unstable (2 runs, 4 vehicles) [average time 23s/4 vehicles]. # cg is stable (20 runs, 4 vehicles) [average time 37s/4 vehicles]. # newton: Jacobian is required for Newton-CG method # cobyla is unstable (8 runs, 4 vehicles) [average time 16s/4 vehicles]. # tnc is unstable (6 runs, 4 vehicles) [average time 23s/4 vehicles]. # dogleg: Jacobian is required for dogleg minimization. # slsqp is unstable (4 runs, 4 vehicles) [average time 18s/4 vehicles]. # differential_evolution is unstable (1 runs, 4 vehicles) # [average time 270s/4 vehicles]. res = lmfit.minimize( _err_func, params, args=args, kws=kws, method=method, nan_policy='omit' ) # noinspection PyUnresolvedReferences return (res.params if res.success else min_e_and_p[1]), res.success # noinspection PyUnresolvedReferences
[docs]@sh.add_function(dsp, outputs=['co2_params_calibrated', 'calibration_status']) def calibrate_co2_params( is_cycle_hot, engine_coolant_temperatures, co2_emissions_model, identified_co2_emissions, co2_params_identified, _1st_step=dfl.functions.calibrate_co2_params.enable_first_step, _2nd_step=dfl.functions.calibrate_co2_params.enable_second_step, _3rd_step=dfl.functions.calibrate_co2_params.enable_third_step): """ Calibrates the CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg ). :param engine_coolant_temperatures: Engine coolant temperature vector [°C]. :type engine_coolant_temperatures: numpy.array :param co2_emissions_model: CO2 emissions model (co2_emissions = models(params)). :type co2_emissions_model: callable :param identified_co2_emissions: CO2 instantaneous emissions vector [CO2g/s]. :type identified_co2_emissions: numpy.array :param co2_params_identified: Identified initial guess of co2 emission model params. :type co2_params_identified: Parameters :param is_cycle_hot: Is an hot cycle? :type is_cycle_hot: bool :param _1st_step: Enable first step in the co2_params calibration? [-] :type _1st_step: bool :param _2nd_step: Enable second step in the co2_params calibration? [-] :type _2nd_step: bool :param _3rd_step: Enable third step in the co2_params calibration? [-] :type _3rd_step: bool :return: Calibrated CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg) and their calibration statuses. :rtype: (lmfit.Parameters, list) """ err = _define_co2_error(co2_emissions_model, identified_co2_emissions) # Safety measure to not modify the initial guess. p = copy.deepcopy(co2_params_identified) # Identify cold and hot phases. cold = _identify_cold_phase(p, is_cycle_hot, engine_coolant_temperatures) hot = ~cold # Definition of thermal and willans parameters. thermal_p = {'t0', 't1', 'dt', 'trg'} willans_p = {'a2', 'b2', 'a', 'b', 'c', 'l', 'l2'} # Identification of all parameters that can vary. pvary = {k for k, v in p.items() if v.vary} # Zero step: Initialization of the statuses. statuses = [(True, copy.deepcopy(p))] # Definition of the optimization function. def _op(par, params2optimize, **kws): fixp = pvary - params2optimize _set_attr(par, fixp) if pvary - fixp: par, s = _calibrate_model_params(err, par, **kws) else: s = True statuses.append((s, copy.deepcopy(par))) _set_attr(par, pvary, True) return par # First step: Calibration of willans parameters using the hot phase. p = _op(p, _1st_step and hot.any() and willans_p or set(), sub_values=hot) # Second step: Calibration of thermal parameters using the cold phase. if not cold.any(): # When the cycle has not cold phases, thermal parameters have no effect. # The third step will modify arbitrarily this parameters. # Hence, to avoid erroneous results, thermal parameters are fixed to # zero because they cannot be identified. _set_attr(p, thermal_p) _set_attr(p, pvary.intersection(('t1', 'dt')), 0, 'value') pvary -= thermal_p p = _op(p, _2nd_step and cold.any() and thermal_p or set(), sub_values=cold) # Third step: Calibration of all parameters. p = _op(p, _3rd_step and pvary or set()) return p, statuses
[docs]@sh.add_function( dsp, outputs=['co2_params_calibrated', 'calibration_status'], input_domain=functools.partial(_missing_co2_params, _not=True) ) def define_co2_params_calibrated(co2_params): """ Defines the calibrated co2_params if all co2_params are given. :param co2_params: CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg). :type co2_params: dict | lmfit.Parameters :return: Calibrated CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg) and their calibration statuses. :rtype: (lmfit.Parameters, list) """ import lmfit if isinstance(co2_params, lmfit.Parameters): p = co2_params else: p = lmfit.Parameters() for k, v in co2_params.items(): p.add(k, value=v, vary=False) success = [(None, copy.deepcopy(p))] * 4 return p, success
[docs]@sh.add_function(dsp, outputs=[ 'co2_emissions', 'active_cylinders', 'active_variable_valves', 'active_lean_burns', 'active_exhausted_gas_recirculations' ]) def predict_co2_emissions(co2_emissions_model, co2_params_calibrated): """ Predicts CO2 instantaneous emissions vector [CO2g/s]. :param co2_emissions_model: CO2 emissions model (co2_emissions = models(params)). :type co2_emissions_model: callable :param co2_params_calibrated: CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg). The missing parameters are set equal to zero. :type co2_params_calibrated: lmfit.Parameters :return: CO2 instantaneous emissions vector [CO2g/s]. :rtype: numpy.array """ return co2_emissions_model(co2_params_calibrated)
dsp.add_data('co2_params', dfl.values.co2_params.copy())
[docs]@sh.add_function(dsp, outputs=['fuel_consumptions']) def calculate_fuel_consumptions(co2_emissions, fuel_carbon_content): """ Calculates the instantaneous fuel consumption vector [g/s]. :param co2_emissions: CO2 instantaneous emissions vector [CO2g/s]. :type co2_emissions: numpy.array :param fuel_carbon_content: Fuel carbon content [CO2g/g]. :type fuel_carbon_content: float :return: The instantaneous fuel consumption vector [g/s]. :rtype: numpy.array """ return co2_emissions / fuel_carbon_content
[docs]@sh.add_function(dsp, outputs=['co2_emissions']) def calculate_co2_emissions(fuel_consumptions, fuel_carbon_content): """ Calculates instantaneous CO2 emission vector [CO2g/s]. :param fuel_consumptions: The instantaneous fuel consumption vector [g/s]. :type fuel_consumptions: numpy.array :param fuel_carbon_content: Fuel carbon content [CO2g/g]. :type fuel_carbon_content: float :return: CO2 instantaneous emissions vector [CO2g/s]. :rtype: numpy.array """ return fuel_consumptions * fuel_carbon_content