# -*- coding: utf-8 -*-
#
# Copyright 2015-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
Functions and `dsp` model to model the idle engine speed.
"""
import numpy as np
import schedula as sh
from co2mpas.defaults import dfl
dsp = sh.BlueDispatcher(
name='calculate_idle_engine_speed',
description='Identify idle engine speed median and std.'
)
dsp.add_data('idle_engine_speed_std', dfl.values.idle_engine_speed_std, 20)
[docs]@sh.add_function(dsp, outputs=['idle_model_detector'], weight=100)
def define_idle_model_detector(
velocities, engine_speeds_out, stop_velocity, min_engine_on_speed):
"""
Defines idle engine speed model detector.
:param velocities:
Velocity vector [km/h].
:type velocities: numpy.array
:param engine_speeds_out:
Engine speed vector [RPM].
:type engine_speeds_out: numpy.array
:param stop_velocity:
Maximum velocity to consider the vehicle stopped [km/h].
:type stop_velocity: float
:param min_engine_on_speed:
Minimum engine speed to consider the engine to be on [RPM].
:type min_engine_on_speed: float
:return:
Idle engine speed model detector.
:rtype: sklearn.cluster.DBSCAN
"""
b = (velocities < stop_velocity) & (engine_speeds_out > min_engine_on_speed)
if not b.any():
return sh.NONE
x = engine_speeds_out[b, None]
from ._idle import _IdleDetector
model = _IdleDetector(eps=dfl.functions.define_idle_model_detector.EPS)
model.fit(x)
return model
[docs]@sh.add_function(dsp, outputs=['idle_engine_speed_std'])
def identify_idle_engine_speed_std(
idle_model_detector, engine_speeds_out, idle_engine_speed_median,
min_engine_on_speed):
"""
Identifies standard deviation of idle engine speed [RPM].
:param idle_model_detector:
Idle engine speed model detector.
:type idle_model_detector: _IdleDetector
:param engine_speeds_out:
Engine speed vector [RPM].
:type engine_speeds_out: numpy.array
:param idle_engine_speed_median:
Idle engine speed [RPM].
:type idle_engine_speed_median: float
:param min_engine_on_speed:
Minimum engine speed to consider the engine to be on [RPM].
:type min_engine_on_speed: float
:return:
Standard deviation of idle engine speed [RPM].
:rtype: float
"""
b = idle_model_detector.predict([(idle_engine_speed_median,)],
set_outliers=False)
b = idle_model_detector.predict(engine_speeds_out[:, None]) == b
b &= (engine_speeds_out > min_engine_on_speed)
idle_std = dfl.functions.identify_idle_engine_speed_std.MIN_STD
# noinspection PyUnresolvedReferences
if not b.any():
return idle_std
s = np.sqrt(np.mean((engine_speeds_out[b] - idle_engine_speed_median) ** 2))
p = dfl.functions.identify_idle_engine_speed_std.MAX_STD_PERC
return min(max(s, idle_std), idle_engine_speed_median * p)
dsp.add_function(
function=sh.bypass,
inputs=['idle_engine_speed_median', 'idle_engine_speed_std'],
outputs=['idle_engine_speed']
)
dsp.add_function(
function=sh.bypass,
inputs=['idle_engine_speed'],
outputs=['idle_engine_speed_median', 'idle_engine_speed_std']
)