# -*- coding: utf-8 -*-
#
# Copyright 2015-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
Functions and `dsp` model to model the legislation corrections.
"""
import functools
import numpy as np
import schedula as sh
import co2mpas.utils as co2_utl
from co2mpas.defaults import dfl
dsp = sh.BlueDispatcher(
name='Legislation', description='Models the legislation corrections.'
)
[docs]@sh.add_function(dsp, outputs=['phases_indices'])
def identify_phases_indices(times, phases_integration_times):
"""
Identifies the indices of the cycle phases [-].
If phases_distances is not given the result is the cumulative CO2 of cycle
phases [CO2g] otherwise it is CO2 emission of cycle phases [CO2g/km].
:param times:
Time vector [s].
:type times: numpy.array
:param phases_integration_times:
Cycle phases integration times [s].
:type phases_integration_times: tuple
:return:
Indices of the cycle phases [-].
:rtype: numpy.array
"""
return np.searchsorted(times, phases_integration_times, side='right')
[docs]@sh.add_function(dsp, inputs_kwargs=True, outputs=['phases_co2_emissions'])
def calculate_phases_co2_emissions(
times, phases_indices, co2_emissions, phases_distances=1.0):
"""
Calculates CO2 emission or cumulative CO2 of cycle phases [CO2g/km or CO2g].
If phases_distances is not given the result is the cumulative CO2 of cycle
phases [CO2g] otherwise it is CO2 emission of cycle phases [CO2g/km].
:param times:
Time vector [s].
:type times: numpy.array
:param phases_indices:
Indices of the cycle phases [-].
:type phases_indices: numpy.array
:param co2_emissions:
CO2 instantaneous emissions vector [CO2g/s].
:type co2_emissions: numpy.array
:param phases_distances:
Cycle phases distances [km].
:type phases_distances: numpy.array | float, optional
:return:
CO2 emission or cumulative CO2 of cycle phases [CO2g/km or CO2g].
:rtype: numpy.array
"""
from scipy.integrate import trapz
co2 = []
for i, j in phases_indices:
co2.append(trapz(co2_emissions[i:j], times[i:j]))
with np.errstate(divide='ignore', invalid='ignore'):
return np.nan_to_num(np.array(co2) / phases_distances)
[docs]@sh.add_function(dsp, outputs=['co2_emission_value'])
def calculate_co2_emission_value(phases_co2_emissions, phases_distances):
"""
Calculates the CO2 emission of the cycle [CO2g/km].
:param phases_co2_emissions:
CO2 emission of cycle phases [CO2g/km].
:type phases_co2_emissions: numpy.array
:param phases_distances:
Cycle phases distances [km].
:type phases_distances: numpy.array | float
:return:
CO2 emission value of the cycle [CO2g/km].
:rtype: float
"""
n = sum(phases_co2_emissions * phases_distances)
if isinstance(phases_distances, float):
d = phases_distances * len(phases_co2_emissions)
else:
d = sum(phases_distances)
return float(n / d)
dsp.add_data(
'has_periodically_regenerating_systems',
dfl.values.has_periodically_regenerating_systems
)
dsp.add_data('ki_additive', dfl.values.ki_additive)
[docs]@sh.add_function(dsp, outputs=['ki_multiplicative'])
def default_ki_multiplicative(
has_periodically_regenerating_systems, ki_additive):
"""
Returns the default ki multiplicative factor [-].
:param has_periodically_regenerating_systems:
Does the vehicle has periodically regenerating systems? [-].
:type has_periodically_regenerating_systems: bool
:param ki_additive:
Additive correction for vehicles with periodically regenerating
systems [CO2g/km].
:type ki_additive: float
:return:
Multiplicative correction for vehicles with periodically regenerating
systems [-].
:rtype: float
"""
if ki_additive:
return 1.0
par = dfl.functions.default_ki_multiplicative.ki_multiplicative
return par.get(has_periodically_regenerating_systems, 1.0)
@sh.add_function(dsp, outputs=['phases_co2_emissions'])
def merge_wltp_phases_co2_emission(
co2_emission_low, co2_emission_medium, co2_emission_high,
co2_emission_extra_high):
"""
Merges WLTP phases co2 emission.
:param co2_emission_low:
CO2 emission on low WLTP phase [CO2g/km].
:type co2_emission_low: float
:param co2_emission_medium:
CO2 emission on medium WLTP phase [CO2g/km].
:type co2_emission_medium: float
:param co2_emission_high:
CO2 emission on high WLTP phase [CO2g/km].
:type co2_emission_high: float
:param co2_emission_extra_high:
CO2 emission on extra high WLTP phase [CO2g/km].
:type co2_emission_extra_high: float
:return:
CO2 emission of cycle phases [CO2g/km].
:rtype: tuple[float]
"""
return (co2_emission_low, co2_emission_medium, co2_emission_high,
co2_emission_extra_high)
# noinspection PyPep8Naming
[docs]@sh.add_function(dsp, outputs=['phases_co2_emissions'])
def merge_wltp_phases_co2_emission(co2_emission_UDC, co2_emission_EUDC):
"""
Merges WLTP phases co2 emission.
:param co2_emission_UDC:
CO2 emission on UDC NEDC phase [CO2g/km].
:type co2_emission_UDC: float
:param co2_emission_EUDC:
CO2 emission on EUDC NEDC phase [CO2g/km].
:type co2_emission_EUDC: float
:return:
CO2 emission of cycle phases [CO2g/km].
:rtype: tuple[float]
"""
return co2_emission_UDC, co2_emission_EUDC
def _rcb_correction(
phases_delta_energy, phases_distances, fuel_type, engine_type,
alternator_efficiency):
# noinspection PyProtectedMember
par = dfl.functions._rcb_correction.WILLANS
dco2 = -phases_delta_energy / phases_distances
dco2 *= par[fuel_type][engine_type] * 0.0036 / alternator_efficiency
return dco2
[docs]@sh.add_function(dsp, outputs=['theoretical_phases_distances'])
def calculate_theoretical_phases_distances(
times, theoretical_velocities, phases_indices):
"""
Calculates theoretical cycle phases distances [km].
:param times:
Time vector.
:type times: numpy.array
:param theoretical_velocities:
Theoretical velocity vector [km/h].
:type theoretical_velocities: numpy.array
:param phases_indices:
Indices of the cycle phases [-].
:type phases_indices: numpy.array
:return:
Theoretical cycle phases distances [km].
:rtype: numpy.array
"""
from .vehicle import calculate_distances
return calculate_phases_distances(
phases_indices, calculate_distances(times, theoretical_velocities)
)
dsp.add_data('rcb_correction', dfl.values.rcb_correction)
[docs]@sh.add_function(dsp, outputs=['speed_distance_correction'])
def default_speed_distance_correction(is_hybrid):
"""
Returns if speed distance correction is to be applied.
:param is_hybrid:
Is the vehicle hybrid?
:type is_hybrid: bool
:return:
Apply speed distance correction?
:rtype: bool
"""
return not is_hybrid
[docs]@sh.add_function(
dsp, inputs_kwargs=True,
outputs=['speed_distance_corrected_co2_emission_value']
)
def calculate_speed_distance_corrected_co2_emission_value(
phases_co2_emissions, phases_times, batteries_phases_delta_energy,
theoretical_phases_distances, phases_distances, alternator_efficiency,
engine_type, fuel_type, motive_powers, theoretical_motive_powers, times,
phases_indices, engine_max_power, speed_distance_correction=True,
is_hybrid=False, cycle_type='WLTP'):
"""
Calculates the CO2 emission value corrected for speed & distance [CO2g/km].
:param is_hybrid:
Is the vehicle hybrid?
:type is_hybrid: bool
:param cycle_type:
Cycle type (WLTP or NEDC).
:type cycle_type: str
:param phases_co2_emissions:
CO2 emission of cycle phases [CO2g/km].
:type phases_co2_emissions: numpy.array
:param phases_times:
Cycle phases times [s].
:type phases_times: numpy.array
:param batteries_phases_delta_energy:
Phases delta energy of the batteries [Wh].
:type batteries_phases_delta_energy: numpy.array
:param theoretical_phases_distances:
Theoretical cycle phases distances [km].
:type theoretical_phases_distances: numpy.array
:param phases_distances:
Cycle phases distances [km].
:type phases_distances: numpy.array
:param alternator_efficiency:
Alternator efficiency [-].
:type alternator_efficiency: float
:param engine_type:
Engine type (positive turbo, positive natural aspiration, compression).
:type engine_type: str
:param fuel_type:
Fuel type (diesel, gasoline, LPG, NG, ethanol, methanol, biodiesel,
propane).
:type fuel_type: str
:param motive_powers:
Motive power [kW].
:type motive_powers: numpy.array
:param theoretical_motive_powers:
Theoretical motive power [kW].
:type theoretical_motive_powers: numpy.array
:param times:
Time vector.
:type times: numpy.array
:param phases_indices:
Indices of the cycle phases [-].
:type phases_indices: numpy.array
:param engine_max_power:
Engine nominal power [kW].
:type engine_max_power: float
:param speed_distance_correction:
Apply speed distance correction?
:type speed_distance_correction: bool
:return:
CO2 emission value corrected for speed & distance [CO2g/km].
:rtype: float
"""
if is_hybrid:
return sh.NONE
p_co2, p_dist = np.array(phases_co2_emissions), phases_distances
if cycle_type == 'WLTP' and speed_distance_correction:
p_co2 = np.column_stack((p_co2, _rcb_correction(
batteries_phases_delta_energy, phases_distances, fuel_type,
engine_type, alternator_efficiency
))) * np.nan_to_num(phases_distances / phases_times)[:, None]
from sklearn.linear_model import LinearRegression
mdl = LinearRegression()
cpmp = functools.partial(
calculate_phases_co2_emissions, times, phases_indices,
phases_distances=phases_times
)
y, p_co2 = p_co2.sum(axis=1).ravel(), p_co2[:, 0].ravel()
x = cpmp(np.maximum(-.02 * engine_max_power, motive_powers))
mdl.fit(x[:, None], y)
p2 = -mdl.intercept_ / mdl.coef_ if mdl.coef_ else -float('inf')
dp_mp = cpmp(np.maximum(p2, motive_powers))
mdl.fit(dp_mp[:, None], y)
dp_mp -= cpmp(np.maximum(p2, theoretical_motive_powers))
p_co2 -= mdl.coef_ * dp_mp
p_co2 *= np.nan_to_num(phases_times / theoretical_phases_distances)
p_dist = theoretical_phases_distances
return calculate_co2_emission_value(p_co2, p_dist)
[docs]@sh.add_function(
dsp, inputs_kwargs=True, outputs=['rcb_corrected_co2_emission_value']
)
def calculate_rcb_corrected_co2_emission_value(
speed_distance_corrected_co2_emission_value, engine_type, fuel_type,
alternator_efficiency, phases_distances, theoretical_phases_distances,
batteries_phases_delta_energy, speed_distance_correction=True,
rcb_correction=True, is_hybrid=False, cycle_type='WLTP'):
"""
Calculates the CO2 emission value corrected for RCB [CO2g/km].
:param is_hybrid:
Is the vehicle hybrid?
:type is_hybrid: bool
:param cycle_type:
Cycle type (WLTP or NEDC).
:type cycle_type: str
:param speed_distance_corrected_co2_emission_value:
CO2 emission value corrected for speed & distance [CO2g/km].
:type speed_distance_corrected_co2_emission_value: float
:param batteries_phases_delta_energy:
Phases delta energy of the batteries [Wh].
:type batteries_phases_delta_energy: numpy.array
:param theoretical_phases_distances:
Theoretical cycle phases distances [km].
:type theoretical_phases_distances: numpy.array
:param phases_distances:
Cycle phases distances [km].
:type phases_distances: numpy.array
:param alternator_efficiency:
Alternator efficiency [-].
:type alternator_efficiency: float
:param engine_type:
Engine type (positive turbo, positive natural aspiration, compression).
:type engine_type: str
:param fuel_type:
Fuel type (diesel, gasoline, LPG, NG, ethanol, methanol, biodiesel,
propane).
:type fuel_type: str
:param rcb_correction:
Apply RCB correction?
:type rcb_correction: bool
:param speed_distance_correction:
Apply speed distance correction?
:type speed_distance_correction: bool
:return:
CO2 emission value corrected for RCB [CO2g/km].
:rtype: float
"""
if is_hybrid:
return sh.NONE
if cycle_type == 'WLTP' and rcb_correction:
d = phases_distances
if speed_distance_correction:
d = theoretical_phases_distances
return speed_distance_corrected_co2_emission_value + _rcb_correction(
np.sum(batteries_phases_delta_energy), np.sum(d), fuel_type,
engine_type, alternator_efficiency
)
return speed_distance_corrected_co2_emission_value
[docs]@sh.add_function(dsp, outputs=['batteries_phases_delta_energy'])
def calculate_batteries_phases_delta_energy(
times, phases_indices, drive_battery_electric_powers,
service_battery_electric_powers):
"""
Calculates the phases delta energy of the batteries [Wh].
:param times:
Time vector.
:type times: numpy.array
:param phases_indices:
Indices of the cycle phases [-].
:type phases_indices: numpy.array
:param drive_battery_electric_powers:
Drive battery electric power [kW].
:type drive_battery_electric_powers: numpy.array
:param service_battery_electric_powers:
Service battery electric power [kW].
:type service_battery_electric_powers: numpy.array
:return:
Phases delta energy of the batteries [Wh].
:rtype: numpy.array
"""
from scipy.integrate import cumtrapz
p = service_battery_electric_powers + drive_battery_electric_powers
e = cumtrapz(p, times, initial=0)
i = phases_indices.copy()
i[:, 1] -= 1
return np.diff(e[i], axis=1).ravel() / 3.6
[docs]@sh.add_function(
dsp, inputs_kwargs=True, outputs=['kco2_wltp_correction_factor']
)
def identify_kco2_wltp_correction_factor(
drive_battery_electric_powers, service_battery_electric_powers,
co2_emissions, times, force_on_engine, after_treatment_warm_up_phases,
velocities, is_hybrid=True):
"""
Identifies the kco2 correction factor [g/Wh].
:param drive_battery_electric_powers:
Drive battery electric power [kW].
:type drive_battery_electric_powers: numpy.array
:param service_battery_electric_powers:
Service battery electric power [kW].
:type service_battery_electric_powers: numpy.array
:param force_on_engine:
Phases when engine is on because parallel mode is forced [-].
:type force_on_engine: numpy.array
:param after_treatment_warm_up_phases:
Phases when engine speed is affected by the after treatment warm up [-].
:type after_treatment_warm_up_phases: numpy.array
:param velocities:
Vehicle velocity [km/h].
:type velocities: numpy.array
:param co2_emissions:
CO2 instantaneous emissions vector [CO2g/s].
:type co2_emissions: numpy.array
:param times:
Time vector.
:type times: numpy.array
:param is_hybrid:
Is the vehicle hybrid?
:type is_hybrid: bool
:return:
kco2 correction factor [g/Wh].
:rtype: float
"""
if not is_hybrid:
return sh.NONE
from scipy.integrate import cumtrapz
from sklearn.linear_model import RANSACRegressor, Lasso
b = ~(force_on_engine | after_treatment_warm_up_phases)
e = np.where(
b, drive_battery_electric_powers + service_battery_electric_powers, 0
)
e = cumtrapz(e, times, initial=0) / 3.6
co2 = cumtrapz(np.where(b, co2_emissions, 0), times, initial=0)
km = cumtrapz(np.where(b, velocities / 3.6, 0), times, initial=0) / 1000
# noinspection PyTypeChecker
it = co2_utl.sliding_window(list(zip(km, zip(km, e, co2))), 5)
d = np.diff(np.array([(v[0][1], v[-1][1]) for v in it]), axis=1)[:, 0, :].T
e, co2 = d[1:] / d[0]
d0 = t0 = -float('inf')
b, dkm = [], .5
dt = dkm / np.mean(velocities) * 3600
for i, (d, t) in enumerate(zip(km, times)):
if d > d0 and t > t0:
d0, t0 = d + dkm, t + dt
b.append(i)
b = np.array(b)
m = RANSACRegressor(
random_state=0,
base_estimator=Lasso(random_state=0, positive=True)
).fit(e[b, None], co2[b])
return float(m.estimator_.coef_)
[docs]@sh.add_function(
dsp, inputs_kwargs=True, outputs=['rcb_corrected_co2_emission_value']
)
def calculate_rcb_corrected_co2_emission_value_v1(
co2_emission_value, batteries_phases_delta_energy,
kco2_wltp_correction_factor, phases_distances, rcb_correction=True,
is_hybrid=True, cycle_type='WLTP'):
"""
Calculates the CO2 emission value corrected for RCB [CO2g/km].
:param is_hybrid:
Is the vehicle hybrid?
:type is_hybrid: bool
:param cycle_type:
Cycle type (WLTP or NEDC).
:type cycle_type: str
:param co2_emission_value:
CO2 emission value of the cycle [CO2g/km].
:type co2_emission_value: float
:param batteries_phases_delta_energy:
Phases delta energy of the batteries [Wh].
:type batteries_phases_delta_energy: numpy.array
:param kco2_wltp_correction_factor:
kco2 WLTP correction factor [CO2g/Wh].
:type kco2_wltp_correction_factor: float
:param phases_distances:
Cycle phases distances [km].
:type phases_distances: numpy.array
:param rcb_correction:
Apply RCB correction?
:type rcb_correction: bool
:return:
CO2 emission value corrected for RCB [CO2g/km].
:rtype: float
"""
if not is_hybrid or cycle_type == 'NEDC':
return sh.NONE
if rcb_correction and kco2_wltp_correction_factor:
de = np.sum(batteries_phases_delta_energy) / np.sum(phases_distances)
return co2_emission_value - kco2_wltp_correction_factor * de
return co2_emission_value
[docs]@sh.add_function(dsp, outputs=['kco2_nedc_correction_factor'])
def default_kco2_nedc_correction_factor(
kco2_wltp_correction_factor, drive_battery_nominal_voltage, distances):
"""
Returns the kco2 NEDC correction factor [CO2g/km/Ah].
:param kco2_wltp_correction_factor:
kco2 WLTP correction factor [CO2g/Wh].
:type kco2_wltp_correction_factor: float
:param drive_battery_nominal_voltage:
Drive battery nominal voltage [V].
:type drive_battery_nominal_voltage: float
:param distances:
Cumulative distance vector [m].
:type distances: numpy.array
:return:
kco2 NEDC correction factor [CO2g/km/Ah].
:rtype: float
"""
d = distances[-1] / 1000
return kco2_wltp_correction_factor * drive_battery_nominal_voltage / d
[docs]@sh.add_function(
dsp, inputs_kwargs=True, outputs=['rcb_corrected_co2_emission_value']
)
def calculate_rcb_corrected_co2_emission_value_v2(
co2_emission_value, drive_battery_delta_state_of_charge,
drive_battery_capacity, kco2_nedc_correction_factor,
rcb_correction=True, is_hybrid=True, cycle_type='NEDC'):
"""
Calculates the CO2 emission value corrected for RCB [CO2g/km].
:param is_hybrid:
Is the vehicle hybrid?
:type is_hybrid: bool
:param cycle_type:
Cycle type (WLTP or NEDC).
:type cycle_type: str
:param co2_emission_value:
CO2 emission value of the cycle [CO2g/km].
:type co2_emission_value: float
:param drive_battery_delta_state_of_charge:
Overall delta state of charge of the drive battery [%].
:type drive_battery_delta_state_of_charge: numpy.array
:param kco2_nedc_correction_factor:
kco2 NEDC correction factor [CO2g/km/Ah].
:type kco2_nedc_correction_factor: float
:param drive_battery_capacity:
Drive battery capacity [Ah].
:type drive_battery_capacity: float
:param rcb_correction:
Apply RCB correction?
:type rcb_correction: bool
:return:
CO2 emission value corrected for RCB [CO2g/km].
:rtype: float
"""
if not is_hybrid or cycle_type != 'NEDC':
return sh.NONE
if rcb_correction and kco2_nedc_correction_factor:
di = drive_battery_delta_state_of_charge * drive_battery_capacity / 100
return co2_emission_value - kco2_nedc_correction_factor * di
return co2_emission_value
dsp.add_data(
'atct_family_correction_factor', dfl.values.atct_family_correction_factor
)
[docs]def calculate_corrected_co2_emission(
rcb_corrected_co2_emission_value, ki_multiplicative, ki_additive,
atct_family_correction_factor=1.0):
"""
Calculates the corrected CO2 emission of the cycle [CO2g/km].
:param rcb_corrected_co2_emission_value:
CO2 emission value corrected for RCB [CO2g/km].
:type rcb_corrected_co2_emission_value: float
:param ki_multiplicative:
Multiplicative correction for vehicles with periodically regenerating
systems [-].
:type ki_multiplicative: float
:param ki_additive:
Additive correction for vehicles with periodically regenerating
systems [CO2g/km].
:type ki_multiplicative: float
:param atct_family_correction_factor:
Family correction factor for representative regional temperatures [-].
:type atct_family_correction_factor: float
:return:
Corrected CO2 emission value of the cycle [CO2g/km].
:rtype: float
"""
v = rcb_corrected_co2_emission_value * ki_multiplicative + ki_additive
return v * atct_family_correction_factor
dsp.add_data('is_plugin', dfl.values.is_plugin)
dsp.add_function(
function=sh.add_args(calculate_corrected_co2_emission),
inputs=[
'is_plugin', 'rcb_corrected_co2_emission_value', 'ki_multiplicative',
'ki_additive', 'atct_family_correction_factor'
],
outputs=['corrected_co2_emission_value'],
input_domain=co2_utl.check_first_arg_false
)
dsp.add_function(
function=sh.add_args(calculate_corrected_co2_emission),
inputs=[
'is_plugin', 'rcb_corrected_co2_emission_value', 'ki_multiplicative',
'ki_additive', 'atct_family_correction_factor'
],
outputs=['corrected_sustaining_co2_emission_value'],
input_domain=co2_utl.check_first_arg
)
# noinspection PyUnusedLocal
def _domain_calculate_corrected_co2_emission_for_conventional_nedc(
cycle_type, is_hybrid, *args):
return not is_hybrid and cycle_type == 'NEDC'
dsp.add_function(
function_id='calculate_corrected_co2_emission_for_conventional_nedc',
function=sh.add_args(calculate_corrected_co2_emission, n=2),
inputs=['cycle_type', 'is_hybrid', 'co2_emission_value',
'ki_multiplicative', 'ki_additive',
'atct_family_correction_factor'],
outputs=['corrected_co2_emission_value'],
input_domain=_domain_calculate_corrected_co2_emission_for_conventional_nedc
)
dsp.add_function(
function=sh.bypass,
inputs=['corrected_co2_emission_value'],
outputs=['declared_co2_emission_value']
)
dsp.add_function(
function=sh.bypass,
inputs=['corrected_sustaining_co2_emission_value'],
outputs=['declared_sustaining_co2_emission_value']
)
[docs]def calculate_willans_factors(
co2_params_calibrated, engine_fuel_lower_heating_value, engine_stroke,
engine_capacity, min_engine_on_speed, fmep_model, engine_speeds_out,
engine_powers_out, times, velocities, accelerations, motive_powers,
engine_coolant_temperatures, missing_powers, angle_slopes):
"""
Calculates the Willans factors.
:param co2_params_calibrated:
CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg).
The missing parameters are set equal to zero.
:type co2_params_calibrated: lmfit.Parameters
:param engine_fuel_lower_heating_value:
Fuel lower heating value [kJ/kg].
:type engine_fuel_lower_heating_value: float
:param engine_stroke:
Engine stroke [mm].
:type engine_stroke: float
:param engine_capacity:
Engine capacity [cm3].
:type engine_capacity: float
:param min_engine_on_speed:
Minimum engine speed to consider the engine to be on [RPM].
:type min_engine_on_speed: float
:param fmep_model:
Engine FMEP model.
:type fmep_model: FMEP
:param engine_speeds_out:
Engine speed vector [RPM].
:type engine_speeds_out: numpy.array
:param engine_powers_out:
Engine power vector [kW].
:type engine_powers_out: numpy.array
:param times:
Time vector [s].
:type times: numpy.array
:param velocities:
Velocity vector [km/h].
:type velocities: numpy.array
:param accelerations:
Acceleration vector [m/s2].
:type accelerations: numpy.array
:param motive_powers:
Motive power [kW].
:type motive_powers: numpy.array
:param engine_coolant_temperatures:
Engine coolant temperature vector [°C].
:type engine_coolant_temperatures: numpy.array
:param missing_powers:
Missing engine power [kW].
:type missing_powers: numpy.array
:param angle_slopes:
Angle slope vector [rad].
:type angle_slopes: numpy.array
:return:
Willans factors:
- av_velocities [km/h]
- av_slope [rad]
- distance [km]
- init_temp [°C]
- av_temp [°C]
- end_temp [°C]
- av_vel_pos_mov_pow [kw/h]
- av_pos_motive_powers [kW]
- sec_pos_mov_pow [s]
- av_neg_motive_powers [kW]
- sec_neg_mov_pow [s]
- av_pos_accelerations [m/s2]
- av_engine_speeds_out_pos_pow [RPM]
- av_pos_engine_powers_out [kW]
- engine_bmep_pos_pow [bar]
- mean_piston_speed_pos_pow [m/s]
- fuel_mep_pos_pow [bar]
- fuel_consumption_pos_pow [g/sec]
- willans_a [g/kWh]
- willans_b [g/h]
- specific_fuel_consumption [g/kWh]
- indicated_efficiency [-]
- willans_efficiency [-]
:rtype: dict
"""
from .engine import calculate_mean_piston_speeds
from .engine.fc import calculate_brake_mean_effective_pressures
av = np.average
w = np.zeros_like(times, dtype=float)
t = (times[:-1] + times[1:]) / 2
# noinspection PyUnresolvedReferences
w[0], w[1:-1], w[-1] = t[0] - times[0], np.diff(t), times[-1] - t[-1]
f = {
'av_velocities': av(velocities, weights=w), # [km/h]
'av_slope': av(angle_slopes, weights=w),
'has_sufficient_power': 1 - av(missing_powers != 0, weights=w),
'max_power_required': max(engine_powers_out + missing_powers)
}
f['distance'] = f['av_velocities'] * (times[-1] - times[0]) / 3600.0 # [km]
b = engine_powers_out >= 0
if b.any():
p = co2_params_calibrated.valuesdict()
_w = w[b]
av_s = av(engine_speeds_out[b], weights=_w)
av_p = av(engine_powers_out[b], weights=_w)
av_mp = av(missing_powers[b], weights=_w)
n_p = calculate_brake_mean_effective_pressures(
av_s, av_p, engine_capacity, min_engine_on_speed
)
n_s = calculate_mean_piston_speeds(av_s, engine_stroke)
f_mep, wfa = fmep_model(p, n_s, n_p, 1)[:2]
c = engine_capacity / engine_fuel_lower_heating_value * av_s
fc = f_mep * c / 1200.0
ieff = av_p / (fc * engine_fuel_lower_heating_value) * 1000.0
willans_a = 3600000.0 / engine_fuel_lower_heating_value / wfa
willans_b = fmep_model(p, n_s, 0, 1)[0] * c * 3.0
sfc = willans_a + willans_b / av_p
willans_eff = 3600000.0 / (sfc * engine_fuel_lower_heating_value)
f.update({
'av_engine_speeds_out_pos_pow': av_s, # [RPM]
'av_pos_engine_powers_out': av_p, # [kW]
'av_missing_powers_pos_pow': av_mp, # [kW]
'engine_bmep_pos_pow': n_p, # [bar]
'mean_piston_speed_pos_pow': n_s, # [m/s]
'fuel_mep_pos_pow': f_mep, # [bar]
'fuel_consumption_pos_pow': fc, # [g/sec]
'willans_a': willans_a, # [g/kW]
'willans_b': willans_b, # [g]
'specific_fuel_consumption': sfc, # [g/kWh]
'indicated_efficiency': ieff, # [-]
'willans_efficiency': willans_eff # [-]
})
b = motive_powers > 0
if b.any():
_w = w[b]
f['av_vel_pos_mov_pow'] = av(velocities[b], weights=_w) # [km/h]
f['av_pos_motive_powers'] = av(motive_powers[b], weights=_w) # [kW]
f['sec_pos_mov_pow'] = np.sum(_w) # [s]
b = accelerations > 0
if b.any():
_w = w[b]
f['av_pos_accelerations'] = av(accelerations[b], weights=_w) # [m/s2]
b = motive_powers < 0
if b.any():
_w = w[b]
f['av_neg_motive_powers'] = av(motive_powers[b], weights=_w) # [kW]
f['sec_neg_mov_pow'] = np.sum(_w) # [s]
f['init_temp'] = engine_coolant_temperatures[0] # [°C]
f['av_temp'] = av(engine_coolant_temperatures, weights=w) # [°C]
f['end_temp'] = engine_coolant_temperatures[-1] # [°C]
return f
dsp.add_data(
'enable_willans', dfl.values.enable_willans,
description='Enable the calculation of Willans coefficients for '
'the cycle?'
)
dsp.add_function(
function=sh.add_args(calculate_willans_factors),
inputs=[
'enable_willans', 'co2_params_calibrated',
'engine_fuel_lower_heating_value', 'engine_stroke', 'engine_capacity',
'min_engine_on_speed', 'fmep_model', 'engine_speeds_out',
'engine_powers_out', 'times', 'velocities', 'accelerations',
'motive_powers', 'engine_coolant_temperatures', 'missing_powers',
'angle_slopes'
],
outputs=['willans_factors'],
input_domain=co2_utl.check_first_arg
)
[docs]def calculate_phases_willans_factors(
params, engine_fuel_lower_heating_value, engine_stroke, engine_capacity,
min_engine_on_speed, fmep_model, times, phases_indices,
engine_speeds_out, engine_powers_out, velocities, accelerations,
motive_powers, engine_coolant_temperatures, missing_powers,
angle_slopes):
"""
Calculates the Willans factors for each phase.
:param params:
CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg).
The missing parameters are set equal to zero.
:type params: lmfit.Parameters
:param engine_fuel_lower_heating_value:
Fuel lower heating value [kJ/kg].
:type engine_fuel_lower_heating_value: float
:param engine_stroke:
Engine stroke [mm].
:type engine_stroke: float
:param engine_capacity:
Engine capacity [cm3].
:type engine_capacity: float
:param min_engine_on_speed:
Minimum engine speed to consider the engine to be on [RPM].
:type min_engine_on_speed: float
:param fmep_model:
Engine FMEP model.
:type fmep_model: FMEP
:param times:
Time vector [s].
:type times: numpy.array
:param phases_indices:
Indices of the cycle phases [-].
:type phases_indices: numpy.array
:param engine_speeds_out:
Engine speed vector [RPM].
:type engine_speeds_out: numpy.array
:param engine_powers_out:
Engine power vector [kW].
:type engine_powers_out: numpy.array
:param velocities:
Velocity vector [km/h].
:type velocities: numpy.array
:param accelerations:
Acceleration vector [m/s2].
:type accelerations: numpy.array
:param motive_powers:
Motive power [kW].
:type motive_powers: numpy.array
:param engine_coolant_temperatures:
Engine coolant temperature vector [°C].
:type engine_coolant_temperatures: numpy.array
:param missing_powers:
Missing engine power [kW].
:type missing_powers: numpy.array
:param angle_slopes:
Angle slope vector [rad].
:type angle_slopes: numpy.array
:return:
Willans factors:
- av_velocities [km/h]
- av_slope [rad]
- distance [km]
- init_temp [°C]
- av_temp [°C]
- end_temp [°C]
- av_vel_pos_mov_pow [kw/h]
- av_pos_motive_powers [kW]
- sec_pos_mov_pow [s]
- av_neg_motive_powers [kW]
- sec_neg_mov_pow [s]
- av_pos_accelerations [m/s2]
- av_engine_speeds_out_pos_pow [RPM]
- av_pos_engine_powers_out [kW]
- engine_bmep_pos_pow [bar]
- mean_piston_speed_pos_pow [m/s]
- fuel_mep_pos_pow [bar]
- fuel_consumption_pos_pow [g/sec]
- willans_a [g/kWh]
- willans_b [g/h]
- specific_fuel_consumption [g/kWh]
- indicated_efficiency [-]
- willans_efficiency [-]
:rtype: dict
"""
factors = []
for i, j in phases_indices:
factors.append(calculate_willans_factors(
params, engine_fuel_lower_heating_value, engine_stroke,
engine_capacity, min_engine_on_speed, fmep_model,
engine_speeds_out[i:j], engine_powers_out[i:j], times[i:j],
velocities[i:j], accelerations[i:j], motive_powers[i:j],
engine_coolant_temperatures[i:j], missing_powers[i:j],
angle_slopes[i:j]
))
return factors
dsp.add_data(
'enable_phases_willans', dfl.values.enable_phases_willans,
description='Enable the calculation of Willans coefficients for '
'all phases?'
)
dsp.add_function(
function=sh.add_args(calculate_phases_willans_factors),
inputs=[
'enable_phases_willans', 'co2_params_calibrated',
'engine_fuel_lower_heating_value', 'engine_stroke', 'engine_capacity',
'min_engine_on_speed', 'fmep_model', 'times', 'phases_indices',
'engine_speeds_out', 'engine_powers_out', 'velocities', 'accelerations',
'motive_powers', 'engine_coolant_temperatures', 'missing_powers',
'angle_slopes'
],
outputs=['phases_willans_factors'],
input_domain=co2_utl.check_first_arg
)
# noinspection PyPep8Naming
[docs]@sh.add_function(dsp, outputs=['optimal_efficiency'])
def calculate_optimal_efficiency(co2_params_calibrated, mean_piston_speeds):
"""
Calculates the optimal efficiency [-] and t.
:param co2_params_calibrated:
CO2 emission model parameters (a2, b2, a, b, c, l, l2, t, trg).
The missing parameters are set equal to zero.
:type co2_params_calibrated: lmfit.Parameters
:param mean_piston_speeds:
Mean piston speed vector [m/s].
:type mean_piston_speeds: numpy.array
:return:
Optimal efficiency and the respective parameters:
- mean_piston_speeds [m/s],
- engine_bmep [bar],
- efficiency [-].
:rtype: dict[str | tuple]
"""
# noinspection PyProtectedMember
from .engine.fc import _fuel_ABC
n_s = np.linspace(mean_piston_speeds.min(), mean_piston_speeds.max(), 10)
A, B, C = _fuel_ABC(n_s, **co2_params_calibrated)
# noinspection PyTypeChecker
b = np.isclose(A, 0.0)
# noinspection PyTypeChecker
A = np.where(b, np.sign(C) * dfl.EPS, A)
ac4, B2 = 4 * A * C, B ** 2
sabc = np.sqrt(ac4 * B2)
n = sabc - ac4
bmep = np.where(b, np.nan, 2 * C - sabc / (2 * A))
eff = n / (B - np.sqrt(B2 - sabc - n))
return dict(mean_piston_speeds=n_s, engine_bmep=bmep, efficiency=eff)
[docs]@sh.add_function(dsp, outputs=['phases_fuel_consumptions'])
def calculate_phases_fuel_consumptions(
phases_co2_emissions, fuel_carbon_content, fuel_density):
"""
Calculates cycle phases fuel consumption [l/100km].
:param phases_co2_emissions:
CO2 emission of cycle phases [CO2g/km].
:type phases_co2_emissions: numpy.array
:param fuel_carbon_content:
Fuel carbon content [CO2g/g].
:type fuel_carbon_content: float
:param fuel_density:
Fuel density [g/l].
:type fuel_density: float
:return:
Fuel consumption of cycle phases [l/100km].
:rtype: tuple
"""
c = 100.0 / (fuel_density * fuel_carbon_content)
return tuple(np.asarray(phases_co2_emissions) * c)
[docs]@sh.add_function(dsp, outputs=['fuel_consumption_value'])
def calculate_fuel_consumption_value(
phases_fuel_consumptions, phases_distances):
"""
Calculates the fuel consumption of the cycle [l/100km].
:param phases_fuel_consumptions:
Fuel consumption of cycle phases [l/100km].
:type phases_fuel_consumptions: numpy.array
:param phases_distances:
Cycle phases distances [km].
:type phases_distances: numpy.array | float
:return:
Fuel consumption of the cycle [l/100km].
:rtype: float
"""
return calculate_co2_emission_value(
phases_fuel_consumptions, phases_distances
)
[docs]@sh.add_function(dsp, outputs=['phases_distances'])
def calculate_phases_distances(phases_indices, distances):
"""
Calculates cycle phases distances [km].
:param phases_indices:
Indices of the cycle phases [-].
:type phases_indices: numpy.array
:param distances:
Cumulative distance vector [m].
:type distances: numpy.array
:return:
Cycle phases distances [km].
:rtype: numpy.array
"""
i = phases_indices.copy()
i[:, 1] -= 1
return np.diff(distances[i], axis=1).ravel() / 1000.0
[docs]@sh.add_function(dsp, outputs=['phases_times'])
def calculate_phases_times(phases_indices, times):
"""
Calculates cycle phases times [s].
:param phases_indices:
Indices of the cycle phases [-].
:type phases_indices: numpy.array
:param times:
Time vector [s].
:type times: numpy.array
:return:
Cycle phases times [s].
:rtype: numpy.array
"""
i = phases_indices.copy()
i[:, 1] -= 1
return np.diff(times[i], axis=1).ravel()
[docs]@sh.add_function(dsp, outputs=['fuel_density'])
def default_fuel_density(fuel_type):
"""
Returns the default fuel density [g/l].
:param fuel_type:
Fuel type (diesel, gasoline, LPG, NG, ethanol, methanol, biodiesel,
propane).
:type fuel_type: str
:return:
Fuel density [g/l].
:rtype: float
"""
return dfl.functions.default_fuel_density.FUEL_DENSITY[fuel_type]
[docs]@sh.add_function(dsp, outputs=['engine_fuel_lower_heating_value'])
def default_engine_fuel_lower_heating_value(fuel_type):
"""
Returns the default fuel lower heating value [kJ/kg].
:param fuel_type:
Fuel type (diesel, gasoline, LPG, NG, ethanol, methanol, biodiesel,
propane).
:type fuel_type: str
:return:
Fuel lower heating value [kJ/kg].
:rtype: float
"""
d = dfl.functions
if d.ENABLE_ALL_FUNCTIONS or d.default_fuel_lower_heating_value.ENABLE:
return d.default_fuel_lower_heating_value.LHV[fuel_type]
return sh.NONE
[docs]@sh.add_function(dsp, outputs=['fuel_carbon_content'], weight=3)
def default_fuel_carbon_content(fuel_type):
"""
Returns the default fuel carbon content [CO2g/g].
:param fuel_type:
Fuel type (diesel, gasoline, LPG, NG, ethanol, methanol, biodiesel,
propane).
:type fuel_type: str
:return:
Fuel carbon content [CO2g/g].
:rtype: float
"""
d = dfl.functions
if d.ENABLE_ALL_FUNCTIONS or d.default_fuel_carbon_content.ENABLE:
return d.default_fuel_carbon_content.CARBON_CONTENT[fuel_type]
return sh.NONE
[docs]@sh.add_function(dsp, outputs=['fuel_carbon_content'])
def calculate_fuel_carbon_content(fuel_carbon_content_percentage):
"""
Calculates the fuel carbon content as CO2g/g.
:param fuel_carbon_content_percentage:
Fuel carbon content [%].
:type fuel_carbon_content_percentage: float
:return:
Fuel carbon content [CO2g/g].
:rtype: float
"""
return fuel_carbon_content_percentage / 100.0 * 44.0 / 12.0
[docs]@sh.add_function(dsp, outputs=['fuel_carbon_content_percentage'])
def calculate_fuel_carbon_content_percentage(fuel_carbon_content):
"""
Calculates the fuel carbon content as %.
:param fuel_carbon_content:
Fuel carbon content [CO2g/g].
:type fuel_carbon_content: float
:return:
Fuel carbon content [%].
:rtype: float
"""
return fuel_carbon_content / calculate_fuel_carbon_content(1.0)
[docs]@sh.add_function(dsp, outputs=['fuel_heating_value'])
def calculate_fuel_heating_value(engine_fuel_lower_heating_value, fuel_density):
"""
Calculates the fuel heating value as kWh/l.
:param engine_fuel_lower_heating_value:
Fuel lower heating value [kJ/kg].
:type engine_fuel_lower_heating_value: float
:param fuel_density:
Fuel density [g/l].
:type fuel_density: float
:return:
Fuel heating value [kWh/l].
:rtype: float
"""
return engine_fuel_lower_heating_value * fuel_density / 36e5