Source code for co2mpas.core.model.physical.control.hybrid

# -*- coding: utf-8 -*-
#
# Copyright 2015-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
Functions and `dsp` model to model the Energy Management System.
"""
import numpy as np
import schedula as sh
from co2mpas.defaults import dfl
import co2mpas.utils as co2_utl

dsp = sh.BlueDispatcher(
    name='ems',
    description='Models the Energy Management System.'
)

dsp.add_function(
    function_id='define_motors_efficiencies',
    function=sh.bypass,
    inputs=[
        'motor_p4_front_efficiency', 'motor_p4_rear_efficiency',
        'motor_p3_front_efficiency', 'motor_p3_rear_efficiency',
        'motor_p2_efficiency', 'motor_p2_planetary_efficiency',
        'motor_p1_efficiency', 'motor_p0_efficiency'
    ],
    outputs=['motors_efficiencies']
)

dsp.add_function(
    function_id='define_motors_maximum_powers',
    function=sh.bypass,
    inputs=[
        'motor_p4_front_maximum_power', 'motor_p4_rear_maximum_power',
        'motor_p3_front_maximum_power', 'motor_p3_rear_maximum_power',
        'motor_p2_maximum_power', 'motor_p2_planetary_maximum_power',
        'motor_p1_maximum_power', 'motor_p0_maximum_power'
    ],
    outputs=['motors_maximum_powers']
)


[docs]@sh.add_function(dsp, outputs=['motors_maximums_powers']) def define_motors_maximums_powers( motor_p4_front_maximum_powers, motor_p4_rear_maximum_powers, motor_p3_front_maximum_powers, motor_p3_rear_maximum_powers, motor_p2_maximum_powers, engine_speeds_parallel, final_drive_speeds_in, motor_p2_planetary_maximum_power_function, planetary_ratio, motor_p2_planetary_speed_ratio, motor_p1_maximum_power_function, motor_p1_speed_ratio, motor_p0_maximum_power_function, motor_p0_speed_ratio): """ Defines maximum powers of electric motors [kW]. :param motor_p4_front_maximum_powers: Maximum power vector of motor P4 front [kW]. :type motor_p4_front_maximum_powers: numpy.array :param motor_p4_rear_maximum_powers: Maximum power vector of motor P4 rear [kW]. :type motor_p4_rear_maximum_powers: numpy.array :param motor_p3_front_maximum_powers: Maximum power vector of motor P3 front [kW]. :type motor_p3_front_maximum_powers: numpy.array :param motor_p3_rear_maximum_powers: Maximum power vector of motor P3 rear [kW]. :type motor_p3_rear_maximum_powers: numpy.array :param motor_p2_maximum_powers: Maximum power vector of motor P2 [kW]. :type motor_p2_maximum_powers: numpy.array :param engine_speeds_parallel: Hypothetical engine speed in parallel mode [RPM]. :type engine_speeds_parallel: numpy.array :param final_drive_speeds_in: Final drive speed in [RPM]. :type final_drive_speeds_in: numpy.array :param motor_p2_planetary_maximum_power_function: Maximum power function of planetary motor P2. :type motor_p2_planetary_maximum_power_function: function :param planetary_ratio: Fundamental planetary speed ratio [-]. :type planetary_ratio: float :param motor_p2_planetary_speed_ratio: Ratio between planetary motor P2 speed and planetary speed [-]. :type motor_p2_planetary_speed_ratio: float :param motor_p1_maximum_power_function: Maximum power function of motor P1. :type motor_p1_maximum_power_function: function :param motor_p1_speed_ratio: Ratio between motor P1 speed and engine speed [-]. :type motor_p1_speed_ratio: float :param motor_p0_maximum_power_function: Maximum power function of motor P0. :type motor_p0_maximum_power_function: function :param motor_p0_speed_ratio: Ratio between motor P0 speed and engine speed [-]. :type motor_p0_speed_ratio: float :return: Maximum powers of electric motors [kW]. :rtype: numpy.array """ es, p2_powers = engine_speeds_parallel, motor_p2_maximum_powers planet_f = motor_p2_planetary_maximum_power_function from ..electrics.motors.planet import calculate_planetary_speeds_in as func ps = func(es, final_drive_speeds_in, planetary_ratio) return np.column_stack(( motor_p4_front_maximum_powers, motor_p4_rear_maximum_powers, motor_p3_front_maximum_powers, motor_p3_rear_maximum_powers, p2_powers, planet_f(ps * motor_p2_planetary_speed_ratio), motor_p1_maximum_power_function(es * motor_p1_speed_ratio), motor_p0_maximum_power_function(es * motor_p0_speed_ratio) ))
[docs]@sh.add_function(dsp, outputs=['drive_line_efficiencies']) def define_drive_line_efficiencies( final_drive_mean_efficiency, gear_box_mean_efficiency, clutch_tc_mean_efficiency, planetary_mean_efficiency, belt_mean_efficiency): """ Defines drive line efficiencies vector. :param final_drive_mean_efficiency: Final drive mean efficiency [-]. :type final_drive_mean_efficiency: float :param gear_box_mean_efficiency: Gear box mean efficiency [-]. :type gear_box_mean_efficiency: float :param clutch_tc_mean_efficiency: Clutch or torque converter mean efficiency [-]. :type clutch_tc_mean_efficiency: float :param planetary_mean_efficiency: Planetary mean efficiency [-]. :type planetary_mean_efficiency: float :param belt_mean_efficiency: Belt mean efficiency [-]. :type belt_mean_efficiency: float :return: Drive line efficiencies vector. :rtype: tuple[float] """ return ( 1.0, final_drive_mean_efficiency, gear_box_mean_efficiency, planetary_mean_efficiency, clutch_tc_mean_efficiency, belt_mean_efficiency )
[docs]@sh.add_function(dsp, outputs=['engine_speeds_parallel'], weight=sh.inf(11, 0)) def calculate_engine_speeds_parallel(gear_box_speeds_in, idle_engine_speed): """ Calculate hypothetical engine speed in parallel mode [RPM]. :param gear_box_speeds_in: Gear box speed [RPM]. :type gear_box_speeds_in: numpy.array | float :param idle_engine_speed: Idle engine speed and its standard deviation [RPM]. :type idle_engine_speed: (float, float) :return: Hypothetical engine speed in parallel mode [RPM]. :rtype: numpy.array | float """ return np.maximum(gear_box_speeds_in, idle_engine_speed[0])
[docs]@sh.add_function(dsp, outputs=['engine_speeds_parallel']) def calculate_engine_speeds_parallel_v1( on_engine, engine_speeds_out, gear_box_speeds_in, idle_engine_speed): """ Calculate hypothetical engine speed [RPM]. :param on_engine: If the engine is on [-]. :type on_engine: numpy.array :param engine_speeds_out: Engine speed [RPM]. :type engine_speeds_out: numpy.array :param gear_box_speeds_in: Gear box speed [RPM]. :type gear_box_speeds_in: numpy.array | float :param idle_engine_speed: Idle engine speed and its standard deviation [RPM]. :type idle_engine_speed: (float, float) :return: Hypothetical engine speed in parallel mode [RPM]. :rtype: numpy.array | float """ s = calculate_engine_speeds_parallel(gear_box_speeds_in, idle_engine_speed) return np.where(on_engine, engine_speeds_out, s)
def _invert(y, xp, fp): with np.errstate(divide='ignore', invalid='ignore'): x = xp[:-1] + (np.diff(xp) / np.diff(fp)) * (y[:, None] - fp[:-1]) b = (xp[:-1] - dfl.EPS <= x) & (x <= (xp[1:] + dfl.EPS)) # noinspection PyUnresolvedReferences return x[np.arange(b.shape[0]), np.nanargmax(b, 1)], y # noinspection PyMissingOrEmptyDocstring
[docs]class FuelMapModel:
[docs] def __init__(self, fuel_map, full_load_curve): from scipy.interpolate import UnivariateSpline as Spl from scipy.interpolate import RegularGridInterpolator as Interpolator self.full_load_curve = full_load_curve self.fc = Interpolator( (fuel_map['speed'], fuel_map['power']), fuel_map['fuel'], bounds_error=False, fill_value=0 ) (s, p), fc = self.fc.grid, self.fc.values with np.errstate(divide='ignore', invalid='ignore'): e = np.maximum(0, p / fc) e[(p > full_load_curve(s)[:, None]) | (p < 0)] = np.nan b = ~np.isnan(e).all(1) (s, i), e = np.unique(s[b], return_index=True), e[b] b = ~np.isnan(e).all(0) p = p[b][np.nanargmax(e[:, b], 1)][i] func = Spl(s, p, w=1 / np.clip(p * .01, dfl.EPS, 1)) s = np.unique(np.append(s, np.linspace(s.min(), s.max(), 1000))) p = func(s) self.max_power = p.max() self.speed_power = Spl(s, p, s=0) self.power_speed = Spl(*_invert(np.unique(p), s, p)[::-1], s=0, ext=3) self.idle_fc = self.fc((self.power_speed(0), 0))
def __call__(self, speed, power): return self.fc((speed, power))
[docs]@sh.add_function(dsp, outputs=['fuel_map_model']) def define_fuel_map_model(fuel_map, full_load_curve): """ Define the fuel map model. :param fuel_map: Fuel consumption map [RPM, kW, g/s]. :type fuel_map: dict :param full_load_curve: Vehicle full load curve. :type full_load_curve: function :return: Fuel map model. :rtype: FuelMapModel """ return FuelMapModel(fuel_map, full_load_curve)
def _interp(x, xp, fp): x = np.asarray(x).clip(xp[:, 0, None], xp[:, -1, None]) j = np.maximum(1, np.argmax(x[:, :, None] <= xp[:, None, :], axis=-1)) i, k = j - 1, np.arange(xp.shape[0])[:, None] (x0, dx), (y0, dy) = xp[k, [i, j]], fp[k, [i, j]] dx -= x0 dy -= y0 with np.errstate(divide='ignore', invalid='ignore'): return (x - x0) * np.where(np.isclose(dx, 0), 0, dy / dx) + y0, x # noinspection PyMissingOrEmptyDocstring,PyArgumentEqualDefault
[docs]class HEV:
[docs] def __init__(self, drive_line_efficiencies, motors_efficiencies): self.m_eff = np.array(motors_efficiencies) m_dl_eff = np.multiply.accumulate(drive_line_efficiencies) self.ice_eff = m_dl_eff[-2] m_dl_eff = np.insert(m_dl_eff, 1, m_dl_eff[1]) m_dl_eff = np.insert(m_dl_eff, 0, m_dl_eff[0]) # Electric assist (dp < 0). self.m_ds = m_dl_eff / self.ice_eff self.i_ds = np.argsort(self.m_eff * self.m_ds)[::-1] self.j_ds = np.argsort(self.i_ds) # Charging (dp > 0). self.m_ch = m_ch_eff = self.m_ds.copy() m_ch_eff[-1] = 1 / m_ch_eff[-1] self.i_ch = np.argsort(m_ch_eff / self.m_eff) self.j_ch = np.argsort(self.i_ch)
[docs] def delta_ice_power(self, motors_maximums_powers): mmp = np.asarray(motors_maximums_powers) n, acc = mmp.shape[-1], np.add.accumulate mmp = mmp.reshape((mmp.shape[:-1] or (1,)) + (n,)) dp_ice, p_bat = np.zeros((2, 2 * n + 1) + mmp.shape[:-1]) dp_ice[:n] = -acc((mmp * self.m_ds).take(self.i_ds, axis=-1).T)[::-1] dp_ice[-n:] = acc((mmp * self.m_ch).take(self.i_ch, axis=-1).T) p_bat[:n] = -acc((mmp / self.m_eff).take(self.i_ds, axis=-1).T)[::-1] p_bat[-n:] = acc((mmp * self.m_eff).take(self.i_ch, axis=-1).T) # noinspection PyUnusedLocal def battery_power_split(battery_powers, *args): p = np.asarray(battery_powers) p = p.reshape((p.shape[:-1] or (1,)) + p.shape[-1:]) b = p < 0 pm = np.where(b, -p_bat[:n + 1][::-1], p_bat[-1 - n:]) pm = np.diff(pm + np.minimum(np.abs(p) - pm, 0), axis=0) pm = np.where(b, pm[self.j_ds], -pm[self.j_ch]) return pm return dp_ice, p_bat, battery_power_split
[docs] def ice_power(self, motive_power, motors_maximum_powers): dp_ice, p_bat, power_split = self.delta_ice_power(motors_maximum_powers) p_ice = dp_ice + motive_power / self.ice_eff return p_ice, p_bat, power_split
[docs] def parallel(self, motive_powers, motors_maximums_powers, engine_powers_out=None, ice_power_losses=0, battery_power_losses=0): pi, pb, bps = self.ice_power(motive_powers, motors_maximums_powers) if ice_power_losses is not 0: pi += np.asarray(ice_power_losses).ravel()[None, :] if battery_power_losses is not 0: pb += np.asarray(battery_power_losses).ravel()[None, :] if engine_powers_out is not None: pb, pi = _interp(engine_powers_out, pi.T, pb.T) return pi, pb, bps
[docs] def ev(self, motive_powers, motors_maximums_powers, battery_power_losses=0): return self.parallel(motive_powers, np.pad( motors_maximums_powers[:, :-3], ((0, 0), (0, 3)), 'constant' ), 0, battery_power_losses=battery_power_losses)
[docs] def serial(self, motive_powers, motors_maximums_powers, engine_powers_out, ice_power_losses=0, battery_power_losses=0): pi, pb_ev, bps_ev = self.ev(motive_powers, motors_maximums_powers) from ..electrics.motors.p4 import calculate_motor_p4_powers_v1 as func pw = func(pi.ravel(), self.ice_eff) pi, pb, bps = self.parallel( pw, np.pad( motors_maximums_powers[:, -3:], ((0, 0), (5, 0)), 'constant' ), engine_powers_out=engine_powers_out, ice_power_losses=ice_power_losses, battery_power_losses=battery_power_losses ) def battery_power_split(battery_powers): return bps(battery_powers - pb_ev.T) + bps_ev(pb_ev.T) return pi, pb + pb_ev, battery_power_split
[docs]@sh.add_function(dsp, outputs=['hev_power_model']) def define_hev_power_model(motors_efficiencies, drive_line_efficiencies): """ Define Hybrid Electric Vehicle power balance model. :param motors_efficiencies: Electric motors efficiencies vector. :type motors_efficiencies: tuple[float] :param drive_line_efficiencies: Drive line efficiencies vector. :type drive_line_efficiencies: tuple[float] :return: Hybrid Electric Vehicle power balance model. :rtype: HEV """ return HEV(drive_line_efficiencies, motors_efficiencies)
[docs]@sh.add_function(dsp, outputs=['hybrid_modes']) def identify_hybrid_modes( times, gear_box_speeds_in, engine_speeds_out, idle_engine_speed, on_engine, is_serial, has_motor_p2_planetary): """ Identify the hybrid mode status (0: EV, 1: Parallel, 2: Serial). :param times: Time vector [s]. :type times: numpy.array :param gear_box_speeds_in: Gear box speed [RPM]. :type gear_box_speeds_in: numpy.array :param engine_speeds_out: Engine speed [RPM]. :type engine_speeds_out: numpy.array :param idle_engine_speed: Idle engine speed and its standard deviation [RPM]. :type idle_engine_speed: (float, float) :param on_engine: If the engine is on [-]. :type on_engine: numpy.array :param is_serial: Is the vehicle serial hybrid? :type is_serial: bool :param has_motor_p2_planetary: Has the vehicle a motor in planetary P2? :type has_motor_p2_planetary: bool :return: Hybrid mode status (0: EV, 1: Parallel, 2: Serial). :rtype: numpy.array """ # noinspection PyProtectedMember from ..gear_box.mechanical import _shift from ....report import _correlation_coefficient if has_motor_p2_planetary: return on_engine.astype(int) if is_serial: return np.where(on_engine, 2, 0) mode = on_engine.astype(int) es, gbs = engine_speeds_out, gear_box_speeds_in b = idle_engine_speed[0] > gbs b |= (gbs - idle_engine_speed[1]) > es mode[on_engine & b] = 2 i = np.where(mode == 1)[0] mode[i[~co2_utl.get_inliers(gbs[i] / es[i], 3)[0]]] = 2 mode = co2_utl.median_filter(times, mode, 4) mode = co2_utl.clear_fluctuations(times, mode, 4).astype(int) mode[~on_engine] = 0 with np.errstate(divide='ignore', invalid='ignore'): for i, j in sh.pairwise(_shift(mode)): if mode[i] and times[j - 1] - times[i] < 5: if _correlation_coefficient(es[i:j], gbs[i:j]) < .6: mode[i:j] = 3 - mode[i] return mode
[docs]@sh.add_function(dsp, outputs=['engine_speeds_base']) def identify_engine_speeds_base( times, hybrid_modes, on_engine, engine_speeds_out, gear_box_speeds_in, idle_engine_speed): """ Identify the engine speed at hot condition [RPM]. :param times: Time vector [s]. :type times: numpy.array :param hybrid_modes: Hybrid mode status (0: EV, 1: Parallel, 2: Serial). :type hybrid_modes: numpy.array :param on_engine: If the engine is on [-]. :type on_engine: numpy.array :param engine_speeds_out: Engine speed [RPM]. :type engine_speeds_out: numpy.array :param gear_box_speeds_in: Gear box speed [RPM]. :type gear_box_speeds_in: numpy.array :param idle_engine_speed: Engine speed idle median and std [RPM]. :type idle_engine_speed: (float, float) :return: Engine speed at hot condition [RPM]. :rtype: numpy.array, float """ b = hybrid_modes == 2 from .conventional import calculate_engine_speeds_out_hot as func speeds = func(gear_box_speeds_in, on_engine, idle_engine_speed) speeds[b] = engine_speeds_out[b] speeds[b] = co2_utl.median_filter(times, speeds, 5)[b] return speeds
[docs]@sh.add_function(dsp, outputs=['serial_motor_maximum_power_function']) def define_serial_motor_maximum_power_function( motor_p2_planetary_maximum_power_function, planetary_ratio, motor_p1_maximum_power_function, motor_p0_maximum_power_function, motor_p2_planetary_speed_ratio, motor_p1_speed_ratio, motor_p0_speed_ratio): """ Define serial motor maximum power function. :param motor_p2_planetary_maximum_power_function: Maximum power function of planetary motor P2. :type motor_p2_planetary_maximum_power_function: function :param planetary_ratio: Fundamental planetary speed ratio [-]. :type planetary_ratio: float :param motor_p2_planetary_speed_ratio: Ratio between planetary motor P2 speed and planetary speed [-]. :type motor_p2_planetary_speed_ratio: float :param motor_p1_maximum_power_function: Maximum power function of motor P1. :type motor_p1_maximum_power_function: function :param motor_p1_speed_ratio: Ratio between motor P1 speed and engine speed [-]. :type motor_p1_speed_ratio: float :param motor_p0_maximum_power_function: Maximum power function of motor P0. :type motor_p0_maximum_power_function: function :param motor_p0_speed_ratio: Ratio between motor P0 speed and engine speed [-]. :type motor_p0_speed_ratio: float :return: Serial motor maximum power function. :rtype: function """ planet_f = motor_p2_planetary_maximum_power_function from ..electrics.motors.planet import calculate_planetary_speeds_in as func # noinspection PyMissingOrEmptyDocstring def calculate_serial_motor_maximum_power(engine_speed, final_drive_speed): es = np.atleast_1d(engine_speed) ps = func(engine_speed, final_drive_speed, planetary_ratio) # noinspection PyArgumentEqualDefault return np.pad( np.column_stack(( planet_f(ps * motor_p2_planetary_speed_ratio), motor_p1_maximum_power_function(es * motor_p1_speed_ratio), motor_p0_maximum_power_function(es * motor_p0_speed_ratio) )), ((0, 0), (5, 0)), 'constant' ) return calculate_serial_motor_maximum_power
[docs]@sh.add_function(dsp, outputs=['engine_power_losses_function']) def define_engine_power_losses_function( engine_moment_inertia, auxiliaries_torque_loss, auxiliaries_power_loss): """ Define engine power losses function. :param engine_moment_inertia: Engine moment of inertia [kg*m2]. :type engine_moment_inertia: float :param auxiliaries_torque_loss: Constant torque loss due to engine auxiliaries [N*m]. :type auxiliaries_torque_loss: float :param auxiliaries_power_loss: Constant power loss due to engine auxiliaries [kW]. :type auxiliaries_power_loss: float :return: Engine power losses function. :rtype: function """ from ..engine import ( calculate_auxiliaries_power_losses as aux_p, calculate_auxiliaries_torque_losses as aux_t, calculate_engine_inertia_powers_losses as ine_p ) # noinspection PyMissingOrEmptyDocstring def engine_power_losses_function(times, engine_speeds, inertia=True): p = 0 if inertia: p = ine_p(times, engine_speeds, engine_moment_inertia) return p + aux_p( aux_t(times, auxiliaries_torque_loss), engine_speeds, np.ones_like(engine_speeds, bool), auxiliaries_power_loss ) return engine_power_losses_function
# noinspection PyMissingOrEmptyDocstring
[docs]class EMS:
[docs] def __init__(self, has_motor_p2_planetary, is_serial, battery_model, hev_power_model, fuel_map_model, serial_motor_maximum_power_function, engine_power_losses_function, s_ch=None, s_ds=None): self.battery_model = battery_model self.hev_power_model = hev_power_model self.fuel_map_model = fuel_map_model self.s_ch, self.s_ds = s_ch, s_ds self.battery_fuel_mode = 'current' self._battery_power = None self.serial_motor_maximum_power = serial_motor_maximum_power_function self.engine_power_losses = engine_power_losses_function self.has_motor_p2_planetary = has_motor_p2_planetary self.is_serial = is_serial
[docs] def set_virtual(self, motors_maximum_powers): from scipy.interpolate import UnivariateSpline as Spline p, pb = self.hev_power_model.delta_ice_power(motors_maximum_powers)[:-1] pb, i = np.unique(-pb, return_index=True) self._battery_power = Spline(pb, np.abs(p.ravel()[i]), k=1, s=0)
# noinspection PyUnresolvedReferences
[docs] def fit(self, hybrid_modes, times, motive_powers, motors_maximums_powers, engine_powers_out, engine_speeds_out, ice_power_losses=None): pl = ice_power_losses if pl is None: pl = self.engine_power_losses(times, engine_speeds_out) hev, fc = self.hev_power_model, self.fuel_map_model pi = engine_powers_out[:, None] + [-dfl.EPS, dfl.EPS] pb = np.where(hybrid_modes[:, None] == 1, *(func( motive_powers, motors_maximums_powers, pi, ice_power_losses=pl )[1] for func in (hev.parallel, hev.serial))) b = hybrid_modes.astype(bool) pi, bc = pi[b], self.battery_model.currents(pb[b]) s = -np.diff(fc(engine_speeds_out[b, None], pi), axis=1) with np.errstate(divide='ignore', invalid='ignore'): s /= np.diff(bc, axis=1) b = np.isfinite(s) b, s = bc[b.ravel()].mean(1) >= 0, s[b] self.s_ch, self.s_ds = np.median(s[b]), np.median(s[~b]) return self
[docs] def battery_fuel(self, battery_currents): bc = battery_currents if self.battery_fuel_mode == 'current': return np.where(bc >= 0, self.s_ch, self.s_ds) * bc pi = self._battery_power(self.battery_model.powers(bc)) fc = self.fuel_map_model(self.fuel_map_model.power_speed(pi), pi) fc -= self.fuel_map_model.idle_fc fc *= -np.sign(bc) return fc
[docs] def electric(self, motive_powers, motors_maximums_powers, final_drive_speeds_in, battery_power_losses=0): res, hev = {}, self.hev_power_model mmp = self.serial_motor_maximum_power( np.zeros_like(motive_powers), final_drive_speeds_in ) mmp[:, :-3] = motors_maximums_powers[:, :-3] pi, pb, bps = hev.ice_power(motive_powers, mmp) res['power_bat'], res['power_ice'] = pb, pi = _interp(0, pi.T, pb.T) pb += np.atleast_2d(battery_power_losses).T res['current_bat'] = bc = self.battery_model.currents(pb) res['fc_bat'] = res['fc_eq'] = self.battery_fuel(bc) res['fc_ice'] = res['speed_ice'] = np.zeros_like(bc) res['battery_power_split'] = bps return res
[docs] def parallel(self, times, motive_powers, motors_maximums_powers, engine_speeds_out, ice_power_losses=None, battery_power_losses=0, opt=True): hev, fc = self.hev_power_model, self.fuel_map_model pi, pb, bps = hev.ice_power(motive_powers, motors_maximums_powers) if ice_power_losses is None: ice_power_losses = self.engine_power_losses( times, engine_speeds_out ) pi += np.atleast_2d(ice_power_losses) ep = np.minimum(fc.full_load_curve(engine_speeds_out), pi[-1]) ep = np.linspace(0, 1, 200) * ep[:, None] pb, pi = _interp(ep, pi.T, pb.T) es = np.atleast_2d(engine_speeds_out).T fc_ice = fc(es, pi) pb += np.atleast_2d(battery_power_losses).T bc = self.battery_model.currents(pb) fc_bat = self.battery_fuel(bc) fc_eq = fc_ice + fc_bat res = dict( fc_eq=fc_eq, fc_ice=fc_ice, fc_bat=fc_bat, power_bat=pb, power_ice=pi, current_bat=bc, speed_ice=es, battery_power_split=bps ) if opt: i, j = np.arange(fc_eq.shape[0]), np.nanargmin(fc_eq, axis=1) res = self.min(res, i, j) return res
[docs] def serial(self, times, motive_powers, final_drive_speeds_in, motors_maximums_powers, engine_speeds_out=None, ice_power_losses=None, battery_power_losses=0, opt=True): hev, fc, n = self.hev_power_model, self.fuel_map_model, 200 # noinspection PyArgumentEqualDefault pi, pb, bps_ev = hev.ice_power(motive_powers, np.pad( motors_maximums_powers[:, :-3], ((0, 0), (0, 3)), 'constant' )) (pb_ev, pi), pl = _interp(0, pi.T, pb.T), ice_power_losses pb_ev, fds = pb_ev.ravel(), final_drive_speeds_in[:, None] from ..electrics.motors.p4 import calculate_motor_p4_powers_v1 as func pw = func(pi.ravel(), hev.ice_eff) del pi if engine_speeds_out is None: # noinspection PyProtectedMember es = self.fuel_map_model.speed_power._data[0] es = np.linspace(es[0], es[-1], n)[:, None] if not hasattr(pl, 'shape'): pl = np.ones_like(motive_powers) * (pl or 0) es, pl = np.meshgrid(es, pl, indexing='ij') if ice_power_losses is None: pl = self.engine_power_losses(times, es, inertia=False) es, pl = es.ravel()[:, None], pl.ravel() fds = np.tile(fds, n).ravel()[:, None] pw = np.tile(pw[:, None], n).ravel() else: es = engine_speeds_out[:, None] if pl is None: pl = self.engine_power_losses(times, engine_speeds_out) mmp = self.serial_motor_maximum_power(es, fds) pi, pb, bps = hev.ice_power(pw, mmp) pi += np.atleast_2d(pl) pb, pi = _interp(fc.speed_power(es), pi.T, pb.T) if engine_speeds_out is None: pb, pi = pb.reshape(n, -1).T, pi.reshape(n, -1).T es = es.reshape(n, -1).T pb += np.atleast_2d(pb_ev + battery_power_losses).T fc_ice = fc(es, pi) bc = self.battery_model.currents(pb) fc_bat = self.battery_fuel(bc) fc_eq = fc_ice + fc_bat def battery_power_split( battery_powers, engine_speeds, final_drive_speeds): return hev.ice_power( 0, self.serial_motor_maximum_power( engine_speeds, final_drive_speeds ) )[-1](battery_powers - battery_power_losses - pb_ev) + bps_ev(pb_ev) res = dict( fc_eq=fc_eq, fc_ice=fc_ice, fc_bat=fc_bat, power_bat=pb, power_ice=pi, current_bat=bc, speed_ice=es, battery_power_split=battery_power_split ) if engine_speeds_out is None and opt: with np.errstate(divide='ignore', invalid='ignore'): eff, b = pi / fc_ice, (bc < 0) | (pi < 0) b[np.all(np.isnan(eff) | b, 1)] = False eff[b] = np.nan i, j = np.arange(pi.shape[0]), np.nanargmax(eff, 1) res = self.min(res, i, j) res['speed_ice'] = res['speed_ice'][i, j, None] return res
[docs] @staticmethod def min(res, i, j): res, keys = res.copy(), set(res) - {'battery_power_split', 'speed_ice'} for k in keys: res[k] = res[k][i, j, None] return res
def __call__(self, times, motive_powers, final_drive_speeds_in, motors_maximums_powers, gear_box_speeds_in, engine_speeds_parallel, idle_engine_speed, battery_power_losses=0): e = self.electric( motive_powers, motors_maximums_powers, final_drive_speeds_in ) p = self.parallel( times, motive_powers, motors_maximums_powers, engine_speeds_parallel, battery_power_losses=battery_power_losses ) s = self.serial( times, motive_powers, final_drive_speeds_in, motors_maximums_powers, battery_power_losses=battery_power_losses ) if self.is_serial: mode = np.zeros_like(s['fc_eq'], int) + 2 be_serial = np.zeros_like(times, bool) else: # noinspection PyUnresolvedReferences mode = (s['fc_eq'] < p['fc_eq']).astype(int) + 1 mode[(s['current_bat'] < 0) & (p['current_bat'] >= 0)] = 1 mode[(s['current_bat'] >= 0) & (p['current_bat'] < 0)] = 2 mode[gear_box_speeds_in < -np.diff(idle_engine_speed)] = 2 be_serial = e['power_ice'].ravel() > dfl.EPS be_serial &= motive_powers > 0.01 mode[be_serial] = 1 if self.has_motor_p2_planetary: mode[mode == 2] = 1 return dict( hybrid_modes=mode, serial=s, parallel=p, electric=e, force_on_engine=be_serial )
[docs]@sh.add_function(dsp, outputs=['ecms_s']) def calibrate_ems_model( drive_battery_model, hev_power_model, fuel_map_model, hybrid_modes, serial_motor_maximum_power_function, motive_powers, motors_maximums_powers, engine_powers_out, engine_speeds_out, times, engine_power_losses_function, is_serial, has_motor_p2_planetary): """ Calibrate Energy Management Strategy model. :param drive_battery_model: Drive battery current model. :type drive_battery_model: DriveBatteryModel :param hev_power_model: Hybrid Electric Vehicle power balance model. :type hev_power_model: HEV :param fuel_map_model: Fuel map model. :type fuel_map_model: FuelMapModel :param hybrid_modes: Hybrid mode status (0: EV, 1: Parallel, 2: Serial). :type hybrid_modes: numpy.array :param serial_motor_maximum_power_function: Serial motor maximum power function. :type serial_motor_maximum_power_function: function :param motive_powers: Motive power [kW]. :type motive_powers: numpy.array :param motors_maximums_powers: Maximum powers of electric motors [kW]. :type motors_maximums_powers: numpy.array :param engine_powers_out: Engine power vector [kW]. :type engine_powers_out: numpy.array :param engine_speeds_out: Engine speed [RPM]. :type engine_speeds_out: numpy.array :param times: Time vector [s]. :type times: numpy.array :param engine_power_losses_function: Engine power losses function. :type engine_power_losses_function: function :param is_serial: Is the vehicle serial hybrid? :type is_serial: bool :param has_motor_p2_planetary: Has the vehicle a motor in planetary P2? :type has_motor_p2_planetary: bool :return: Equivalent Consumption Minimization Strategy params. :rtype: tuple[float] """ model = EMS( has_motor_p2_planetary, is_serial, drive_battery_model, hev_power_model, fuel_map_model, serial_motor_maximum_power_function, engine_power_losses_function ).fit( hybrid_modes, times, motive_powers, motors_maximums_powers, engine_powers_out, engine_speeds_out ) return model.s_ch, model.s_ds
[docs]@sh.add_function(dsp, outputs=['ems_model']) def define_ems_model( has_motor_p2_planetary, is_serial, drive_battery_model, hev_power_model, fuel_map_model, serial_motor_maximum_power_function, engine_power_losses_function, ecms_s): """ Define Energy Management Strategy model. :param has_motor_p2_planetary: Has the vehicle a motor in planetary P2? :type has_motor_p2_planetary: bool :param is_serial: Is the vehicle serial hybrid? :type is_serial: bool :param drive_battery_model: Drive battery current model. :type drive_battery_model: DriveBatteryModel :param hev_power_model: Hybrid Electric Vehicle power balance model. :type hev_power_model: HEV :param fuel_map_model: Fuel map model. :type fuel_map_model: FuelMapModel :param serial_motor_maximum_power_function: Serial motor maximum power function. :type serial_motor_maximum_power_function: function :param engine_power_losses_function: Engine power losses function. :type engine_power_losses_function: function :param ecms_s: Equivalent Consumption Minimization Strategy params. :type ecms_s: tuple[float] :return: Energy Management Strategy model. :rtype: EMS """ return EMS( has_motor_p2_planetary, is_serial, drive_battery_model, hev_power_model, fuel_map_model, serial_motor_maximum_power_function, engine_power_losses_function, s_ch=ecms_s[0], s_ds=ecms_s[1] )
[docs]@sh.add_function(dsp, outputs=['ems_data']) def calculate_ems_data( ems_model, times, motive_powers, final_drive_speeds_in, motors_maximums_powers, gear_box_speeds_in, engine_speeds_parallel, idle_engine_speed): """ Calculate EMS decision data. :param ems_model: Energy Management Strategy model. :type ems_model: EMS :param times: Time vector [s]. :type times: numpy.array :param motive_powers: Motive power [kW]. :type motive_powers: numpy.array :param final_drive_speeds_in: Final drive speed in [RPM]. :type final_drive_speeds_in: numpy.array :param motors_maximums_powers: Maximum powers of electric motors [kW]. :type motors_maximums_powers: numpy.array :param gear_box_speeds_in: Gear box speed [RPM]. :type gear_box_speeds_in: numpy.array :param engine_speeds_parallel: Hypothetical engine speed in parallel mode [RPM]. :type engine_speeds_parallel: numpy.array :param idle_engine_speed: Engine speed idle median and std [RPM]. :type idle_engine_speed: (float, float) :return: EMS decision data. :rtype: dict """ return ems_model( times, motive_powers, final_drive_speeds_in, motors_maximums_powers, gear_box_speeds_in, engine_speeds_parallel, idle_engine_speed )
[docs]@sh.add_function( dsp, inputs_kwargs=True, outputs=['after_treatment_warm_up_phases'] ) def identify_after_treatment_warm_up_phases( times, engine_powers_out, engine_coolant_temperatures, on_engine, engine_thermostat_temperature, ems_data, hybrid_modes, is_cycle_hot=False): """ Identifies after treatment warm up phase. :param times: Time vector [s]. :type times: numpy.array :param engine_powers_out: Engine power vector [kW]. :type engine_powers_out: numpy.array :param engine_coolant_temperatures: Engine coolant temperature vector [°C]. :type engine_coolant_temperatures: numpy.array :param on_engine: If the engine is on [-]. :type on_engine: numpy.array :param ems_data: EMS decision data. :type ems_data: dict :param hybrid_modes: Hybrid mode status (0: EV, 1: Parallel, 2: Serial). :type hybrid_modes: numpy.array :param engine_thermostat_temperature: Engine thermostat temperature [°C]. :type engine_thermostat_temperature: float :param is_cycle_hot: Is an hot cycle? :type is_cycle_hot: bool :return: After treatment warm up phase. :rtype: numpy.array """ warm_up = np.zeros_like(times) if not on_engine.any(): return warm_up.astype(bool) i = np.where(on_engine)[0] p = np.choose(ems_data['hybrid_modes'].ravel() - 1, [ ems_data[k]['power_ice'].ravel() for k in ('parallel', 'serial') ])[i] with np.errstate(divide='ignore', invalid='ignore'): # noinspection PyUnresolvedReferences warm_up[i[~co2_utl.get_inliers( engine_powers_out[i] / p, 2, np.nanmedian, co2_utl.mad )[0]]] = 1 warm_up = co2_utl.median_filter(times, warm_up, 5) warm_up = co2_utl.clear_fluctuations(times, warm_up, 5).astype(bool) if not warm_up.any(): return warm_up indices, temp = co2_utl.index_phases(warm_up), engine_coolant_temperatures b = np.diff(temp[indices], axis=1) > 4 b &= temp[indices[:, 0], None] < (engine_thermostat_temperature - 10) b &= np.diff(times[indices], axis=1) > 5 b &= np.apply_along_axis( lambda a: np.in1d(2, hybrid_modes[slice(*a)]), 1, indices ) t_cool = dfl.functions.identify_after_treatment_warm_up_phases.cooling_time t0 = is_cycle_hot and times[0] + t_cool or 0 warming_phases = np.zeros_like(on_engine, bool) for i, j in co2_utl.index_phases(on_engine): warming_phases[i:j + 1] = times[i] >= t0 t0 = times[j] + t_cool warm_up[:] = False for i, j in indices[b.ravel()]: if warming_phases[i]: while i and warming_phases.take(i - 1, mode='clip'): i -= 1 while hybrid_modes[j] == 1 and j > i: j -= 1 warm_up[i:j + 1] = True return warm_up
[docs]@sh.add_function(dsp, outputs=['force_on_engine']) def get_force_on_engine(ems_data): """ Returns the phases when engine is on because parallel mode is forced [-]. :param ems_data: EMS decision data. :type ems_data: dict :return: Phases when engine is on because parallel mode is forced [-]. :rtype: numpy.array """ return ems_data['force_on_engine']
# noinspection PyMissingOrEmptyDocstring
[docs]class StartStopHybrid:
[docs] def __init__(self, ems_model, dcdc_converter_efficiency, starter_model, params=None): self.params = params self.ems_model = ems_model self.dcdc_converter_efficiency = dcdc_converter_efficiency self.starter_model = starter_model
[docs] def fit(self, ems_data, on_engine, drive_battery_state_of_charges, after_treatment_warm_up_phases): import lmfit k = np.where(~on_engine, *self.reference(ems_data)['k_reference'].T) # Filter data. b = ~after_treatment_warm_up_phases & ~ems_data['force_on_engine'] b &= np.isfinite(k) s = np.where(on_engine[b], 1, -1) soc = drive_battery_state_of_charges[b] def _(x): x = x.valuesdict() time = x['starter_time'] return np.float32(np.maximum(0, s * (self._k(x, soc) - np.where( ~on_engine, *self.reference(ems_data, time)['k_reference'].T )[b].T)).sum()) p = lmfit.Parameters() starter_time = self.starter_model.time p.add('starter_time', 1, min=starter_time, max=starter_time * 10) p.add('k0', 0, min=0) p.add('soc0', 0, min=0, max=100) p.add('alpha', 0, min=0) p.add('beta', 0, min=0) with co2_utl.numpy_random_seed(0): # noinspection PyUnresolvedReferences self.params = lmfit.minimize(_, p, 'ampgo').params.valuesdict() return self
[docs] def penalties(self, data, starter_time): eff, res = self.dcdc_converter_efficiency, {} c = np.array([eff, -1 / eff]) / starter_time pb = self.starter_model(data['speed_ice'])[None, :] * c[:, None, None] bc = self.ems_model.battery_model.currents(pb) return sh.combine_dicts(data, base=dict( power_stop=pb[0], power_start=pb[1], current_stop=bc[0], current_start=bc[1] ))
[docs] def reference(self, ems_data, starter_time=None): st = starter_time or self.params and self.params['starter_time'] st = st or self.starter_model.time e, res = ems_data['electric'], ems_data.copy() p = res['parallel'] = self.penalties(ems_data['parallel'], st) s = res['serial'] = self.penalties(ems_data['serial'], st) with np.errstate(divide='ignore', invalid='ignore'): k = 'current_bat' c_ser = np.column_stack((s['current_start'], -s['current_stop'])) k_ser = ((s[k] - e[k] + c_ser) / s['fc_ice']).T c_par = np.column_stack((p['current_start'], -p['current_stop'])) k_par = ((p[k] - e[k] + c_par) / p['fc_ice']).T res['k_reference'] = np.choose( ems_data['hybrid_modes'] - 1, [k_par.T, k_ser.T] ) return res
@staticmethod def _k(p, soc): dsoc = soc - p['soc0'] return dsoc * (p['alpha'] + p['beta'] * dsoc ** 2) + p['k0'] def __call__(self, drive_battery_state_of_charges): return self._k(self.params, drive_battery_state_of_charges)
[docs]@sh.add_function(dsp, outputs=['start_stop_hybrid_params']) def calibrate_start_stop_hybrid_params( ems_model, dcdc_converter_efficiency, starter_model, ems_data, on_engine, drive_battery_state_of_charges, after_treatment_warm_up_phases): """ Calibrate start stop model for hybrid electric vehicles. :param ems_model: Energy Management Strategy model. :type ems_model: EMS :param starter_model: Starter model. :type starter_model: StarterModel :param dcdc_converter_efficiency: DC/DC converter efficiency [-]. :type dcdc_converter_efficiency: float :param ems_data: EMS decision data. :type ems_data: dict :param on_engine: If the engine is on [-]. :type on_engine: numpy.array :param drive_battery_state_of_charges: State of charge of the drive battery [%]. :type drive_battery_state_of_charges: numpy.array :param after_treatment_warm_up_phases: Phases when engine is affected by the after treatment warm up [-]. :type after_treatment_warm_up_phases: numpy.array :return: Params of start stop model for hybrid electric vehicles. :rtype: dict """ return StartStopHybrid( ems_model, dcdc_converter_efficiency, starter_model).fit( ems_data, on_engine, drive_battery_state_of_charges, after_treatment_warm_up_phases ).params
[docs]@sh.add_function(dsp, outputs=['start_stop_hybrid']) def define_start_stop_hybrid( ems_model, dcdc_converter_efficiency, starter_model, start_stop_hybrid_params): """ Defines start stop model for hybrid electric vehicles. :param ems_model: Energy Management Strategy model. :type ems_model: EMS :param starter_model: Starter model. :type starter_model: StarterModel :param dcdc_converter_efficiency: DC/DC converter efficiency [-]. :type dcdc_converter_efficiency: float :param start_stop_hybrid_params: Params of start stop model for hybrid electric vehicles. :type start_stop_hybrid_params: dict :return: Start stop model for hybrid electric vehicles. :rtype: StartStopHybrid """ return StartStopHybrid( ems_model, dcdc_converter_efficiency, starter_model, params=start_stop_hybrid_params )
[docs]@sh.add_function(dsp, outputs=['hybrid_modes']) def calculate_hybrid_modes(ems_data, on_engine, after_treatment_warm_up_phases): """ Calculate the hybrid mode status (0: EV, 1: Parallel, 2: Serial). :param ems_data: EMS decision data. :type ems_data: dict :param on_engine: If the engine is on [-]. :type on_engine: numpy.array :param after_treatment_warm_up_phases: Phases when engine is affected by the after treatment warm up [-]. :type after_treatment_warm_up_phases: numpy.array :return: Hybrid mode status (0: EV, 1: Parallel, 2: Serial). :rtype: numpy.array """ modes = ems_data['hybrid_modes'].ravel().copy() modes[after_treatment_warm_up_phases] = 2 modes[~on_engine] = 0 return modes
[docs]@sh.add_function(dsp, outputs=['hybrid_modes']) def predict_hybrid_modes( start_stop_hybrid, ems_data, drive_battery_model, times, motive_powers, accelerations, after_treatment_warm_up_duration, after_treatment_cooling_duration, start_stop_activation_time, min_time_engine_on_after_start, is_cycle_hot): """ Predicts the hybrid mode status (0: EV, 1: Parallel, 2: Serial). :param start_stop_hybrid: Start stop model for hybrid electric vehicles. :type start_stop_hybrid: StartStopHybrid :param ems_data: EMS decision data. :type ems_data: dict :param drive_battery_model: Drive battery current model. :type drive_battery_model: DriveBatteryModel :param times: Time vector [s]. :type times: numpy.array :param motive_powers: Motive power [kW]. :type motive_powers: numpy.array :param accelerations: Acceleration [m/s2]. :type accelerations: numpy.array :param after_treatment_warm_up_duration: After treatment warm up duration [s]. :type after_treatment_warm_up_duration: float :param after_treatment_cooling_duration: After treatment cooling duration [s]. :type after_treatment_cooling_duration: float :param min_time_engine_on_after_start: Minimum time of engine on after a start [s]. :type min_time_engine_on_after_start: float :param start_stop_activation_time: Start-stop activation time threshold [s]. :type start_stop_activation_time: float :param is_cycle_hot: Is an hot cycle? :type is_cycle_hot: bool :return: Hybrid mode status (0: EV, 1: Parallel, 2: Serial). :rtype: numpy.array """ r = start_stop_hybrid.reference(ems_data) ele, par, ser = r['electric'], r['parallel'], r['serial'] current_bat = { i: v['current_bat'].ravel() for i, v in enumerate((ele, par, ser, ele)) } starter_bat = { i: v['power_start'].ravel() for i, v in enumerate((par, ser, ser), 1) } starter_bat[0] = np.where( r['hybrid_modes'] == 1, *(v['power_stop'].T for v in (par, ser)) ).ravel() from ..electrics.motors.starter import calculate_starter_currents as func nom_volt = drive_battery_model.service.nominal_voltage starter_bat = {k: func(v, nom_volt) for k, v in starter_bat.items()} # noinspection PyUnresolvedReferences hybrid_modes = r['force_on_engine'].astype(int) drive_battery_model.reset() soc, t0 = drive_battery_model.init_soc, start_stop_activation_time it = enumerate(zip( times, motive_powers, accelerations, hybrid_modes, r['k_reference'], r['hybrid_modes'].ravel() )) t_after_treatment = times[0] if is_cycle_hot: t_after_treatment += after_treatment_cooling_duration for i, (t, motive_power, acc, mode, k_ref, mode_ref) in it: pre_mode = hybrid_modes.take(i - 1, mode='clip') j = int(bool(pre_mode)) if not mode: if t < t0: mode = pre_mode elif k_ref[j] > start_stop_hybrid(soc): mode = mode_ref starter_curr = 0 if bool(pre_mode) ^ bool(mode) and i: if mode: t0 = t + min_time_engine_on_after_start if t >= t_after_treatment: if not hybrid_modes[i]: mode = 3 j = np.searchsorted( times, t + after_treatment_warm_up_duration ) + 1 hybrid_modes[i:j][hybrid_modes[i:j] == 0] = 3 else: t_after_treatment = t + after_treatment_cooling_duration starter_curr = starter_bat[mode][i] soc = drive_battery_model( current_bat[mode][i], t, motive_power, acc, bool(mode), starter_curr ) hybrid_modes[i] = mode return np.minimum(hybrid_modes, 2)
[docs]@sh.add_function(dsp, outputs=['on_engine']) def identify_on_engine(hybrid_modes): """ Identifies if the engine is on [-]. :param hybrid_modes: Hybrid mode status (0: EV, 1: Parallel, 2: Serial). :type hybrid_modes: numpy.array :return: If the engine is on [-]. :rtype: numpy.array """ return hybrid_modes.astype(bool)
[docs]@sh.add_function(dsp, outputs=['engine_speeds_out_hot']) def identify_engine_speeds_out_hot( engine_speeds_base, after_treatment_warm_up_phases, idle_engine_speed): """ Predicts the engine speed at hot condition [RPM]. :param engine_speeds_base: Base engine speed (i.e., without clutch/TC effect) [RPM]. :type engine_speeds_base: numpy.array :param after_treatment_warm_up_phases: Phases when engine is affected by the after treatment warm up [-]. :type after_treatment_warm_up_phases: numpy.array :param idle_engine_speed: Engine speed idle median and std [RPM]. :type idle_engine_speed: (float, float) :return: Engine speed at hot condition [RPM]. :rtype: numpy.array """ return np.where( after_treatment_warm_up_phases, idle_engine_speed[0], engine_speeds_base )
[docs]@sh.add_function(dsp, outputs=['engine_speeds_out_hot'], weight=1) def predict_engine_speeds_out_hot( ems_data, hybrid_modes, after_treatment_warm_up_phases, idle_engine_speed): """ Predicts the engine speed at hot condition [RPM]. :param ems_data: EMS decision data. :type ems_data: dict :param hybrid_modes: Hybrid mode status (0: EV, 1: Parallel, 2: Serial). :type hybrid_modes: numpy.array :param after_treatment_warm_up_phases: Phases when engine is affected by the after treatment warm up [-]. :type after_treatment_warm_up_phases: numpy.array :param idle_engine_speed: Engine speed idle median and std [RPM]. :type idle_engine_speed: (float, float) :return: Engine speed at hot condition [RPM]. :rtype: numpy.array """ it = ems_data['electric'], ems_data['parallel'], ems_data['serial'] speeds = np.choose(hybrid_modes, [d['speed_ice'].ravel() for d in it]) phases = after_treatment_warm_up_phases speeds[phases & (hybrid_modes == 2)] = idle_engine_speed[0] return speeds
[docs]@sh.add_function(dsp, outputs=[ 'motor_p4_front_electric_powers', 'motor_p4_rear_electric_powers', 'motor_p3_front_electric_powers', 'motor_p3_rear_electric_powers', 'motor_p2_electric_powers', 'motor_p2_planetary_electric_powers', 'motor_p1_electric_powers', 'motor_p0_electric_powers' ]) def predict_motors_electric_powers( ems_data, after_treatment_warm_up_phases, hybrid_modes, engine_speeds_out_hot, final_drive_speeds_in): """ Predicts motors electric power split [kW]. :param ems_data: EMS decision data. :type ems_data: dict :param after_treatment_warm_up_phases: Phases when engine is affected by the after treatment warm up [-]. :type after_treatment_warm_up_phases: numpy.array :param hybrid_modes: Hybrid mode status (0: EV, 1: Parallel, 2: Serial). :type hybrid_modes: numpy.array :param engine_speeds_out_hot: Engine speed at hot condition [RPM]. :type engine_speeds_out_hot: numpy.array :param final_drive_speeds_in: Final drive speed in [RPM]. :type final_drive_speeds_in: numpy.array :return: Motors electric powers [kW]. :rtype: tuple[numpy.array] """ mode = np.where( after_treatment_warm_up_phases & (hybrid_modes == 2), 0, hybrid_modes ) return np.choose(mode, [ d['battery_power_split']( d['power_bat'].ravel(), engine_speeds_out_hot, final_drive_speeds_in ) for d in (ems_data[k] for k in ('electric', 'parallel', 'serial')) ])
dsp.add_data('is_serial', dfl.values.is_serial)
[docs]@sh.add_function(dsp, outputs=[ 'motor_p4_front_electric_powers', 'motor_p4_rear_electric_powers', 'motor_p3_front_electric_powers', 'motor_p3_rear_electric_powers', 'motor_p2_electric_powers', 'motor_p2_planetary_electric_powers', 'motor_p1_electric_powers', 'motor_p0_electric_powers' ]) def identify_motors_electric_powers( hev_power_model, hybrid_modes, motive_powers, motors_maximums_powers, motors_electric_powers): """ Identify motors electric power split [kW]. :param hev_power_model: Hybrid Electric Vehicle power balance model. :type hev_power_model: HEV :param motive_powers: Motive power [kW]. :type motive_powers: numpy.array :param hybrid_modes: Hybrid mode status (0: EV, 1: Parallel, 2: Serial). :type hybrid_modes: numpy.array :param motors_maximums_powers: Maximum powers of electric motors [kW]. :type motors_maximums_powers: numpy.array :param motors_electric_powers: Cumulative motors electric power [kW]. :type motors_electric_powers: numpy.array :return: Motors electric powers [kW]. :rtype: tuple[numpy.array] """ hev, pb = hev_power_model, -motors_electric_powers[None, :] return np.choose(hybrid_modes, [ func(motive_powers, motors_maximums_powers, 0)[-1](pb) for func in (hev.ev, hev.parallel, hev.serial) ])
[docs]@sh.add_function(dsp, outputs=[ 'motor_p4_front_electric_powers', 'motor_p4_rear_electric_powers', 'motor_p3_front_electric_powers', 'motor_p3_rear_electric_powers', 'motor_p2_electric_powers', 'motor_p2_planetary_electric_powers', 'motor_p1_electric_powers', 'motor_p0_electric_powers' ], weight=sh.inf(1, 0)) def identify_motors_electric_powers_v1( final_drive_mean_efficiency, gear_box_mean_efficiency_guess, planetary_mean_efficiency, belt_mean_efficiency, motors_efficiencies, hybrid_modes, motive_powers, motors_maximums_powers, motors_electric_powers, has_motor_p2_planetary): """ Identify motors electric power split [kW]. :param final_drive_mean_efficiency: Final drive mean efficiency [-]. :type final_drive_mean_efficiency: float :param gear_box_mean_efficiency_guess: Gear box mean efficiency guess [-]. :type final_drive_mean_efficiency: float :param planetary_mean_efficiency: Planetary mean efficiency [-]. :type planetary_mean_efficiency: float :param belt_mean_efficiency: Belt mean efficiency [-]. :type belt_mean_efficiency: float :param motors_efficiencies: Electric motors efficiencies vector. :type motors_efficiencies: tuple[float] :param motive_powers: Motive power [kW]. :type motive_powers: numpy.array :param hybrid_modes: Hybrid mode status (0: EV, 1: Parallel, 2: Serial). :type hybrid_modes: numpy.array :param motors_maximums_powers: Maximum powers of electric motors [kW]. :type motors_maximums_powers: numpy.array :param motors_electric_powers: Cumulative motors electric power [kW]. :type motors_electric_powers: numpy.array :param has_motor_p2_planetary: Has the vehicle a motor in planetary P2? :type has_motor_p2_planetary: bool :return: Motors electric powers [kW]. :rtype: tuple[numpy.array] """ drive_line_efficiencies = ( 1.0, final_drive_mean_efficiency, gear_box_mean_efficiency_guess, planetary_mean_efficiency, 1.0 if has_motor_p2_planetary else .99, belt_mean_efficiency ) return identify_motors_electric_powers( HEV(drive_line_efficiencies, motors_efficiencies), hybrid_modes, motive_powers, motors_maximums_powers, motors_electric_powers )
[docs]@sh.add_function(dsp, outputs=['start_stop_activation_time']) def default_start_stop_activation_time(): """ Returns the default start stop activation time threshold [s]. :return: Start-stop activation time threshold [s]. :rtype: float """ return 0