Source code for co2mpas.core.model.physical.after_treat

# -*- coding: utf-8 -*-
#
# Copyright 2015-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
Functions and `dsp` model to model the after treatment.
"""
import numpy as np
import schedula as sh
import co2mpas.utils as co2_utl
from co2mpas.defaults import dfl

dsp = sh.BlueDispatcher(
    name='After treatment', description='Models the after treatment.'
)


[docs]@sh.add_function(dsp, inputs=[ 'times', 'velocities', 'engine_speeds_base', 'gear_box_speeds_in', 'gears', 'stop_velocity', 'min_engine_on_speed', 'on_engine', 'idle_engine_speed' ], outputs=['on_idle']) @sh.add_function(dsp, outputs=['on_idle']) def identify_on_idle( times, velocities, engine_speeds_out_hot, gear_box_speeds_in, gears, stop_velocity, min_engine_on_speed, on_engine, idle_engine_speed): """ Identifies when the engine is on idle [-]. :param times: Time vector [s]. :type times: numpy.array :param velocities: Velocity vector [km/h]. :type velocities: numpy.array :param engine_speeds_out_hot: Engine speed at hot condition [RPM]. :type engine_speeds_out_hot: numpy.array :param gear_box_speeds_in: Gear box speed [RPM]. :type gear_box_speeds_in: numpy.array :param gears: Gear vector [-]. :type gears: numpy.array :param stop_velocity: Maximum velocity to consider the vehicle stopped [km/h]. :type stop_velocity: float :param min_engine_on_speed: Minimum engine speed to consider the engine to be on [RPM]. :type min_engine_on_speed: float :param on_engine: If the engine is on [-]. :type on_engine: numpy.array :param idle_engine_speed: Engine speed idle median and std [RPM]. :type idle_engine_speed: (float, float) :return: If the engine is on idle [-]. :rtype: numpy.array """ # noinspection PyProtectedMember from .gear_box.mechanical import _shift b = engine_speeds_out_hot > min_engine_on_speed b &= (gears == 0) | (velocities <= stop_velocity) on_idle = np.zeros_like(times, int) i = np.where(on_engine)[0] ds = np.abs(gear_box_speeds_in[i] - engine_speeds_out_hot[i]) on_idle[i[ds > idle_engine_speed[1]]] = 1 on_idle = co2_utl.median_filter(times, on_idle, 4) on_idle[b] = 1 for i, j in sh.pairwise(_shift(on_idle)): if not on_idle[i] and times[j - 1] - times[i] <= 2: on_idle[i:j] = 1 return co2_utl.clear_fluctuations(times, on_idle, 4).astype(bool)
[docs]@sh.add_function( dsp, inputs_kwargs=True, outputs=['after_treatment_warm_up_phases'] ) def identify_after_treatment_warm_up_phases( times, engine_speeds_out, engine_speeds_out_hot, on_idle, on_engine, idle_engine_speed, velocities, engine_starts, stop_velocity, is_hybrid=False): """ Identifies when engine speed is affected by the after treatment warm up [-]. :param times: Time vector [s]. :type times: numpy.array :param engine_speeds_out: Engine speed [RPM]. :type engine_speeds_out: numpy.array :param engine_speeds_out_hot: Engine speed at hot condition [RPM]. :type engine_speeds_out_hot: numpy.array :param on_idle: If the engine is on idle [-]. :type on_idle: numpy.array :param on_engine: If the engine is on [-]. :type on_engine: numpy.array :param idle_engine_speed: Engine speed idle median and std [RPM]. :type idle_engine_speed: (float, float) :param velocities: Velocity vector [km/h]. :type velocities: numpy.array :param engine_starts: When the engine starts [-]. :type engine_starts: numpy.array :param stop_velocity: Maximum velocity to consider the vehicle stopped [km/h]. :type stop_velocity: float :param is_hybrid: Is the vehicle hybrid? :type is_hybrid: bool :return: Phases when engine speed is affected by the after treatment warm up [-]. :rtype: numpy.array """ from .control import identify_engine_starts i, phases = np.where(on_idle)[0], np.zeros_like(times, int) start = engine_starts.copy() if is_hybrid: with np.errstate(divide='ignore', invalid='ignore'): r = engine_speeds_out[i] / engine_speeds_out_hot[i] b = ~co2_utl.get_inliers(r, 2, np.nanmedian, co2_utl.mad)[0] phases[i[b]] = 1 else: ds = np.abs(engine_speeds_out[i] - engine_speeds_out_hot[i]) phases[i[ds > idle_engine_speed[1]]] = 1 start |= identify_engine_starts(velocities > stop_velocity) for i, j in np.searchsorted(times, times[start, None] + [-2, 5 + dfl.EPS]): phases[i:j], start[i:j] = 0, True phases = co2_utl.median_filter(times, phases, 4) phases = co2_utl.clear_fluctuations(times, phases, 4).astype(bool) indices = co2_utl.index_phases(phases) if is_hybrid: indices = indices[(np.diff(times[indices], axis=1) > 10).ravel()][:1] else: b, f = identify_engine_starts(~on_engine), False for i, j in np.searchsorted(times, times[b, None] + [-5, 2 + dfl.EPS]): b[i:j] = f f = True b = (on_idle & ~start) | b | (times > np.min(times[on_engine]) + 200) indices = indices[:co2_utl.argmax(b[indices[:, 1]])] phases[:], n = False, len(times) for i, j in indices: while i and on_idle.take(i - 1, mode='clip'): i -= 1 phases[i:j + 1] = True return phases
[docs]@sh.add_function( dsp, inputs_kwargs=True, outputs=['after_treatment_warm_up_duration'] ) def identify_after_treatment_warm_up_duration( times, after_treatment_warm_up_phases, is_hybrid=False): """ Identify after treatment warm up duration [s]. :param times: Time vector [s]. :type times: numpy.array :param after_treatment_warm_up_phases: Phases when engine speed is affected by the after treatment warm up [-]. :type after_treatment_warm_up_phases: numpy.array :param is_hybrid: Is the vehicle hybrid? :type is_hybrid: bool :return: After treatment warm up duration [s]. :rtype: float """ i = co2_utl.index_phases(after_treatment_warm_up_phases) if i.shape[0]: if not is_hybrid: return float(np.diff(times[i.ravel()[[0, -1]]])) return float(np.mean(np.diff(times[i], axis=1))) return .0
[docs]@sh.add_function( dsp, inputs_kwargs=True, outputs=['after_treatment_cooling_duration'] ) def identify_after_treatment_cooling_duration( times, after_treatment_warm_up_phases, is_hybrid=False): """ Identify after treatment cooling duration [s]. :param times: Time vector [s]. :type times: numpy.array :param after_treatment_warm_up_phases: Phases when engine speed is affected by the after treatment warm up [-]. :type after_treatment_warm_up_phases: numpy.array :param is_hybrid: Is the vehicle hybrid? :type is_hybrid: bool :return: After treatment cooling duration [s]. :rtype: float """ i = co2_utl.index_phases(after_treatment_warm_up_phases) if is_hybrid and i.shape[0] > 2: return float(np.mean(np.diff(times[i].ravel())[1::2])) return float('inf')
[docs]@sh.add_function(dsp, outputs=['after_treatment_speeds_delta']) def identify_after_treatment_speeds_delta( after_treatment_warm_up_phases, engine_speeds_out, engine_speeds_out_hot): """ Identifies the Engine speed delta due to the after treatment warm up [RPM]. :param after_treatment_warm_up_phases: Phases when engine speed is affected by the after treatment warm up [-]. :type after_treatment_warm_up_phases: numpy.array :param engine_speeds_out: Engine speed [RPM]. :type engine_speeds_out: numpy.array :param engine_speeds_out_hot: Engine speed at hot condition [RPM]. :type engine_speeds_out_hot: numpy.array :return: Engine speed delta due to the after treatment warm up [RPM]. :rtype: numpy.array """ speeds = np.zeros_like(engine_speeds_out, dtype=float) b = after_treatment_warm_up_phases speeds[b] = np.maximum(0, engine_speeds_out[b] - engine_speeds_out_hot[b]) return speeds
[docs]@sh.add_function(dsp, outputs=['engine_speeds_base']) def calculate_engine_speeds_base( engine_speeds_out_hot, after_treatment_speeds_delta): """ Calculate base engine speed (i.e., without clutch/TC effect) [RPM]. :param engine_speeds_out_hot: Engine speed at hot condition [RPM]. :type engine_speeds_out_hot: numpy.array :param after_treatment_speeds_delta: Engine speed delta due to the after treatment warm up [RPM]. :type after_treatment_speeds_delta: numpy.array :return: Base engine speed (i.e., without clutch/TC effect) [RPM]. :rtype: numpy.array """ return engine_speeds_out_hot + after_treatment_speeds_delta
[docs]@sh.add_function( dsp, inputs_kwargs=True, outputs=['after_treatment_speed_model'] ) def calibrate_after_treatment_speed_model( times, after_treatment_warm_up_phases, after_treatment_speeds_delta, is_hybrid=False): """ Calibrates the engine after treatment speed model. :param times: Time vector [s]. :type times: numpy.array :param after_treatment_warm_up_phases: Phases when engine speed is affected by the after treatment warm up [-]. :type after_treatment_warm_up_phases: numpy.array :param after_treatment_speeds_delta: Engine speed delta due to the after treatment warm up [RPM]. :type after_treatment_speeds_delta: numpy.array :param is_hybrid: Is the vehicle hybrid? :type is_hybrid: bool :return: After treatment speed model. :rtype: function """ if after_treatment_warm_up_phases.any(): from sklearn.isotonic import IsotonicRegression x, y, model = [], [], IsotonicRegression(increasing=False) for i, j in co2_utl.index_phases(after_treatment_warm_up_phases): x.extend(times[i:j + 1] - (times[i] if is_hybrid else 0.0)) y.extend(after_treatment_speeds_delta[i:j + 1]) # noinspection PyUnresolvedReferences return model.fit(x, y).predict
[docs]@sh.add_function( dsp, inputs_kwargs=True, outputs=['after_treatment_power_model'] ) def calibrate_after_treatment_power_model( times, after_treatment_warm_up_phases, engine_powers_out, is_hybrid=False): """ Calibrates the engine after treatment speed model. :param times: Time vector [s]. :type times: numpy.array :param after_treatment_warm_up_phases: Phases when engine speed is affected by the after treatment warm up [-]. :type after_treatment_warm_up_phases: numpy.array :param engine_powers_out: Engine power vector [kW]. :type engine_powers_out: numpy.array :param is_hybrid: Is the vehicle hybrid? :type is_hybrid: bool :return: After treatment speed model. :rtype: function """ if after_treatment_warm_up_phases.any(): from sklearn.isotonic import IsotonicRegression x, y = [], [] for i, j in co2_utl.index_phases(after_treatment_warm_up_phases): t = times[i:j + 1] - (times[i] if is_hybrid else 0.0) x.extend(t) y.extend(co2_utl.median_filter(t, engine_powers_out[i:j + 1], 4)) # noinspection PyUnresolvedReferences return IsotonicRegression().fit(x, np.maximum(0, y)).predict
[docs]@sh.add_function(dsp, outputs=['after_treatment_warm_up_phases']) def predict_after_treatment_warm_up_phases( after_treatment_warm_up_duration, after_treatment_cooling_duration, times, on_engine, is_cycle_hot): """ Calculates the engine speed delta due to the after treatment warm up [RPM]. :param after_treatment_warm_up_duration: After treatment warm up duration [s]. :type after_treatment_warm_up_duration: float :param after_treatment_cooling_duration: After treatment cooling duration [s]. :type after_treatment_cooling_duration: float :param times: Time vector [s]. :type times: numpy.array :param on_engine: If the engine is on [-]. :type on_engine: numpy.array :param is_cycle_hot: Is an hot cycle? :type is_cycle_hot: bool :return: Phases when engine speed is affected by the after treatment warm up [-]. :rtype: numpy.array """ phases = np.zeros_like(times, bool) if after_treatment_warm_up_duration and on_engine.any(): indices = co2_utl.index_phases(on_engine) indices = indices[np.append(not is_cycle_hot, np.diff( times[indices].ravel() )[1::2] > after_treatment_cooling_duration)] for i, j in indices: t = times[i:j + 1] - times[i] phases[i:j + 1] = t < after_treatment_warm_up_duration return phases
[docs]@sh.add_function(dsp, outputs=['after_treatment_speeds_delta']) def predict_after_treatment_speeds_delta( after_treatment_speed_model, times, after_treatment_warm_up_phases, on_idle, is_hybrid): """ Predicts the engine speed delta due to the after treatment warm up [RPM]. :param after_treatment_speed_model: After treatment speed model. :type after_treatment_speed_model: function :param times: Time vector [s]. :type times: numpy.array :param after_treatment_warm_up_phases: Phases when engine speed is affected by the after treatment warm up [-]. :type after_treatment_warm_up_phases: numpy.array :param on_idle: If the engine is on idle [-]. :type on_idle: numpy.array :param is_hybrid: Is the vehicle hybrid? :type is_hybrid: bool :return: Engine speed delta due to the after treatment warm up [RPM]. :rtype: numpy.array """ ds = np.zeros_like(times, float) if after_treatment_speed_model: for i, j in co2_utl.index_phases(after_treatment_warm_up_phases): ds[i:j + 1] = after_treatment_speed_model(times[i:j + 1] - times[i]) if not is_hybrid: ds[~on_idle] = 0 return np.nan_to_num(ds)