Source code for co2mpas.core.model.physical.gear_box.at_gear.cmv

# -*- coding: utf-8 -*-
#
# Copyright 2015-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
Functions and `dsp` model to model the CMV Approach.
"""
import itertools
import collections
import numpy as np
import schedula as sh
from co2mpas.defaults import dfl
from .core import prediction_gears_gsm, define_gear_filter

dsp = sh.BlueDispatcher(name='Corrected Matrix Velocity Approach')

dsp.add_data('stop_velocity', dfl.values.stop_velocity)


def _correct_gsv(gsv, stop_velocity):
    """
    Corrects gear shifting velocity matrix from unreliable limits.

    :param gsv:
        Gear shifting velocity matrix.
    :type gsv: dict

    :param stop_velocity:
        Maximum velocity to consider the vehicle stopped [km/h].
    :type stop_velocity: float

    :return:
        Gear shifting velocity matrix corrected from unreliable limits.
    :rtype: dict
    """

    gsv[0] = [0, (stop_velocity, (dfl.INF, 0))]

    # noinspection PyMissingOrEmptyDocstring
    def func(x):
        return not x and float('inf') or 1 / x

    for v0, v1 in sh.pairwise(gsv.values()):
        up0, s0, down1, s1 = v0[1][0], v0[1][1][1], v1[0][0], v1[0][1][1]

        if down1 + s1 <= v0[0]:
            v0[1], v1[0] = up0 + s0, up0 - s0
        elif up0 >= down1:
            v0[1], v1[0] = up0 + s0, down1 - s1
            continue
        elif (v0[1][1][0], func(s0)) >= (v1[0][1][0], func(s1)):
            v0[1], v1[0] = up0 + s0, up0 - s0
        else:
            v0[1], v1[0] = down1 + s1, down1 - s1

        v0[1] += stop_velocity

    gsv[max(gsv)][1] = dfl.INF

    return gsv


def _filter_gear_shifting_velocity(limits, stop_velocity):
    def _rjt_out(x, default):
        if x:
            x = np.asarray(x)

            # noinspection PyTypeChecker
            m, (n, s) = np.median(x), (len(x), np.std(x))

            y = s and 2 > (abs(x - m) / s)

            if s and y.any():
                y = x[y]

                # noinspection PyTypeChecker
                m, (n, s) = np.median(y), (len(y), np.std(y))

            return m, (n, s)
        else:
            return default

    max_gear = max(limits)
    gsv = collections.OrderedDict()
    for k in range(max_gear + 1):
        v0, v1 = limits.get(k, [[], []])
        gsv[k] = [_rjt_out(v0, (-1, (0, 0))), _rjt_out(v1, (dfl.INF, (0, 0)))]

    return _correct_gsv(gsv, stop_velocity)


def _identify_gear_shifting_velocity_limits(gears, velocities, stop_velocity):
    """
    Identifies gear shifting velocity matrix.

    :param gears:
        Gear vector [-].
    :type gears: numpy.array

    :param velocities:
        Vehicle velocity [km/h].
    :type velocities: numpy.array

    :param stop_velocity:
        Maximum velocity to consider the vehicle stopped [km/h].
    :type stop_velocity: float

    :return:
        Gear shifting velocity matrix.
    :rtype: dict
    """

    limits = {}

    for v, (g0, g1) in zip(velocities, sh.pairwise(gears)):
        if v >= stop_velocity and g0 != g1:
            limits[g0] = limits.get(g0, [[], []])
            limits[g0][int(g0 < g1)].append(v)
    return _filter_gear_shifting_velocity(limits, stop_velocity)


def _interp(r, y, nr):
    from scipy.interpolate import InterpolatedUnivariateSpline as Spline
    n = len(r)
    if n == 0:
        return np.nan * nr
    elif n == 1:
        return (y / r) * nr
    else:
        return Spline(r, y / r, k=1)(nr) * nr


# noinspection PyPep8Naming
def _convert_limits(lu, vsr, n_vsr, stop_velocity=dfl.values.stop_velocity):
    _r, _l, _u = np.array(
        sorted([vsr.get(k, 0)] + list(v) for k, v in lu.items())
    ).T
    nk, nr = np.array(sorted(sh.combine_dicts(n_vsr, base={0: 0}).items())).T
    klu = sorted(zip(
        nk.astype(int),
        _interp(_r[2:], _l[2:], nr),
        _interp(_r[1:-1], _u[1:-1], nr)
    ))
    res, n0, n1 = {}, min(k for k in n_vsr if k > 0), max(n_vsr)
    for k, l, u in klu:
        if k == 0:
            l, u = list(lu.get(k, [0, stop_velocity + 1]))
        else:
            if k == n0:
                l = lu.get(k, (stop_velocity,))[0]
            if k == n1:
                u = lu.get(k, (None, dfl.INF))[1]
        res[k] = [l, u]
    return res


def _grouper(iterable, n):
    """
    Collect data into fixed-length chunks or blocks.

    :param iterable:
        Iterable object.
    :param iterable: iter

    :param n:
        Length chunks or blocks.
    :type n: int
    """
    args = [iter(iterable)] * n
    return zip(*args)


# noinspection PyMissingOrEmptyDocstring,PyUnusedLocal
[docs]class CMV(collections.OrderedDict):
[docs] def __init__(self, *args, velocity_speed_ratios=None): super(CMV, self).__init__(*args) if args and isinstance(args[0], CMV): if velocity_speed_ratios: self.convert(velocity_speed_ratios) else: velocity_speed_ratios = args[0].velocity_speed_ratios self.velocity_speed_ratios = velocity_speed_ratios or {}
def __repr__(self): from pprint import pformat name = self.__class__.__name__ items = [(k, v) for k, v in self.items()] vsr = pformat(self.velocity_speed_ratios) s = '{}({}, velocity_speed_ratios={})'.format(name, items, vsr) return s.replace('inf', "float('inf')")
[docs] def fit(self, correct_gear, gears, engine_speeds_out, times, velocities, accelerations, motive_powers, velocity_speed_ratios, stop_velocity): from ..mechanical import calculate_gear_box_speeds_in self.clear() self.velocity_speed_ratios = velocity_speed_ratios self.update(_identify_gear_shifting_velocity_limits( gears, velocities, stop_velocity )) if dfl.functions.CMV.ENABLE_OPT_LOOP: from co2mpas.utils import mae gear_id, velocity_limits = zip(*sorted(self.items())[1:]) max_gear, _inf = gear_id[-1], float('inf') update, predict = self.update, self.predict def _update_gvs(vel_limits): self[0] = (0, vel_limits[0]) self[max_gear] = (vel_limits[-1], _inf) update(dict(zip(gear_id, _grouper(vel_limits[1:-1], 2)))) def _error_fun(vel_limits): _update_gvs(vel_limits) g_pre = predict( times, velocities, accelerations, motive_powers, correct_gear=correct_gear ) speed_pred = calculate_gear_box_speeds_in( g_pre, velocities, velocity_speed_ratios, stop_velocity ) return np.float32(mae(speed_pred, engine_speeds_out)) x0 = [self[0][1]].__add__( list(itertools.chain(*velocity_limits))[:-1] ) from scipy.optimize import fmin _update_gvs(fmin(_error_fun, x0, disp=False)) return self
[docs] def correct_constant_velocity( self, up_cns_vel=(), up_window=0.0, up_delta=0.0, dn_cns_vel=(), dn_window=0.0, dn_delta=0.0): """ Corrects the gear shifting matrix velocity for constant velocities. :param up_cns_vel: Constant velocities to correct the upper limits [km/h]. :type up_cns_vel: tuple[float] :param up_window: Window to identify if the shifting matrix has limits close to `up_cns_vel` [km/h]. :type up_window: float :param up_delta: Delta to add to the limit if this is close to `up_cns_vel` [km/h]. :type up_delta: float :param dn_cns_vel: Constant velocities to correct the bottom limits [km/h]. :type dn_cns_vel: tuple[float] :param dn_window: Window to identify if the shifting matrix has limits close to `dn_cns_vel` [km/h]. :type dn_window: float :param dn_delta: Delta to add to the limit if this is close to `dn_cns_vel` [km/h]. :type dn_delta: float :return: A gear shifting velocity matrix corrected from NEDC velocities. :rtype: dict """ def _set_velocity(velocity, const_steps, window, delta): for s in const_steps: if s < velocity < s + window: return s + delta return velocity for k, v in sorted(self.items()): v = [ _set_velocity(v[0], dn_cns_vel, dn_window, dn_delta), _set_velocity(v[1], up_cns_vel, up_window, up_delta) ] if v[0] >= v[1]: v[0] = v[1] + dn_delta try: if self[k - 1][1] <= v[0]: v[0] = self[k - 1][1] + up_delta except KeyError: pass self[k] = tuple(v) return self
[docs] def plot(self): import matplotlib.pylab as plt for k, v in self.items(): kv = {} for (s, l), x in zip((('down', '--'), ('up', '-')), v): if x < dfl.INF: kv['label'] = 'Gear %d:%s-shift' % (k, s) kv['linestyle'] = l # noinspection PyProtectedMember kv['color'] = plt.plot([x] * 2, [0, 1], **kv)[0]._color plt.legend(loc='best') plt.xlabel('Velocity [km/h]')
def _init_gear(self, times, velocities, accelerations, motive_powers, engine_coolant_temperatures): keys = sorted(self.keys()) matrix, c = {}, len(keys) - 1 from co2mpas.utils import List if isinstance(velocities, List): for i, k in enumerate(keys): matrix[k] = self[k], (keys[max(0, i - 1)], keys[min(i + 1, c)]) def _next(gear, index): v = velocities[index] (_down, _up), (g0, g1) = matrix[gear] if v >= _up: return g1 if v < _down: return g0 return gear else: r = velocities.shape[0] for i, g in enumerate(keys): down, up = self[g] matrix[g] = p = np.tile(g, r) p[velocities < down] = keys[max(0, i - 1)] p[velocities >= up] = keys[min(i + 1, c)] def _next(gear, index): return matrix[gear][index] return _next
[docs] def predict(self, times, velocities, accelerations, motive_powers, engine_coolant_temperatures=None, correct_gear=lambda g, *args: g, gear_filter=define_gear_filter()): gears = np.zeros_like(times, int) gen = self.init_gear( gears, times, velocities, accelerations, motive_powers, engine_coolant_temperatures, correct_gear ) for i in range(times.shape[0]): gears[i] = gen(i) # if gear_filter is not None: # gears[index:times.shape[0]] = gear_filter(times, gears) return gears
[docs] def init_gear(self, gears, times, velocities, accelerations, motive_powers, engine_coolant_temperatures=None, correct_gear=lambda g, *args: g): next_g = self._init_gear( times, velocities, accelerations, motive_powers, engine_coolant_temperatures ) valid_gears = np.array(list(getattr(self, 'gears', self))) def get_valid_gear(g): if g in valid_gears: return g return valid_gears[np.abs(np.subtract(valid_gears, g)).argmin()] args = ( gears, times, velocities, accelerations, motive_powers, engine_coolant_temperatures, next_g ) def _next(i): if i != 0: g = gears[i - 1] return get_valid_gear(correct_gear(next_g(g, i), i, *args)) g = valid_gears.min() for _ in range(len(valid_gears) + 1): g0 = g g = get_valid_gear(correct_gear(next_g(g, i), i, *args)) if g == g0: break return g return _next
[docs] def init_speed(self, stop_velocity, gears, velocities, *args, **kwargs): vsr = self.velocity_speed_ratios.get def _next(i): v = velocities[i] r = v > stop_velocity and vsr(gears[i], 0) return v / r if r else 0 return _next
# noinspection PyPep8Naming
[docs] def convert(self, velocity_speed_ratios): if velocity_speed_ratios != self.velocity_speed_ratios: vsr, n_vsr = self.velocity_speed_ratios, velocity_speed_ratios new_self = _convert_limits(self, vsr, n_vsr) reverse = np.diff(list(self))[0] < 0 self.clear() for k, lu in sorted(new_self.items(), reverse=reverse): self[k] = lu self.velocity_speed_ratios = n_vsr return self
[docs]@sh.add_function(dsp, outputs=['CMV']) def calibrate_gear_shifting_cmv( correct_gear, gears, engine_speeds_out, times, velocities, accelerations, motive_powers, velocity_speed_ratios, stop_velocity): """ Calibrates a corrected matrix velocity to predict gears. :param correct_gear: A function to correct the predicted gear. :type correct_gear: callable :param gears: Gear vector [-]. :type gears: numpy.array :param engine_speeds_out: Engine speed vector [RPM]. :type engine_speeds_out: numpy.array :param times: Time vector [s]. :type times: numpy.array :param velocities: Vehicle velocity [km/h]. :type velocities: numpy.array :param accelerations: Vehicle acceleration [m/s2]. :type accelerations: numpy.array :param motive_powers: Motive power [kW]. :type motive_powers: numpy.array :param velocity_speed_ratios: Constant velocity speed ratios of the gear box [km/(h*RPM)]. :type velocity_speed_ratios: dict[int | float] :param stop_velocity: Maximum velocity to consider the vehicle stopped [km/h]. :type stop_velocity: float :returns: A corrected matrix velocity to predict gears. :rtype: dict """ cmv = CMV().fit( correct_gear, gears, engine_speeds_out, times, velocities, accelerations, motive_powers, velocity_speed_ratios, stop_velocity ) return cmv
# predict gears with corrected matrix velocity dsp.add_function( function=prediction_gears_gsm, inputs=['correct_gear', 'gear_filter', 'CMV', 'times', 'velocities', 'accelerations', 'motive_powers', 'cycle_type', 'velocity_speed_ratios'], outputs=['gears'] )