# -*- coding: utf-8 -*-
#
# Copyright 2015-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
Functions and `dsp` model to model the GSPV Approach.
"""
import copy
import numpy as np
import schedula as sh
from .cmv import CMV
from co2mpas.defaults import dfl
from .core import prediction_gears_gsm
dsp = sh.BlueDispatcher(name='Gear Shifting Power Velocity Approach')
dsp.add_data('stop_velocity', dfl.values.stop_velocity)
def _gspv_interpolate_cloud(powers, velocities):
from sklearn.isotonic import IsotonicRegression
from scipy.interpolate import InterpolatedUnivariateSpline
regressor = IsotonicRegression()
regressor.fit(powers, velocities)
x = np.linspace(min(powers), max(powers))
y = regressor.predict(x)
return InterpolatedUnivariateSpline(x, y, k=1, ext=3)
# noinspection PyMissingOrEmptyDocstring,PyPep8Naming
[docs]class GSPV(CMV):
[docs] def __init__(self, *args, cloud=None, velocity_speed_ratios=None):
super(GSPV, self).__init__(*args)
if args and isinstance(args[0], GSPV):
if not cloud:
self.cloud = args[0].cloud
if velocity_speed_ratios:
self.convert(velocity_speed_ratios)
else:
velocity_speed_ratios = args[0].velocity_speed_ratios
else:
self.cloud = cloud or {}
self.velocity_speed_ratios = velocity_speed_ratios or {}
if cloud:
self._fit_cloud()
def __repr__(self):
from pprint import pformat
s = 'GSPV(cloud={}, velocity_speed_ratios={})'
vsr = pformat(self.velocity_speed_ratios)
s = s.format(pformat(self.cloud), vsr)
return s.replace('inf', "float('inf')")
# noinspection PyMethodOverriding
[docs] def fit(self, gears, velocities, motive_powers, velocity_speed_ratios,
stop_velocity):
self.clear()
self.velocity_speed_ratios = velocity_speed_ratios
it = zip(velocities, motive_powers, sh.pairwise(gears))
for v, p, (g0, g1) in it:
if v > stop_velocity and g0 != g1:
x = self.get(g0, [[], [[], []]])
if g0 < g1 and p >= 0:
x[1][0].append(p)
x[1][1].append(v)
elif g0 > g1 and p <= 0:
x[0].append(v)
else:
continue
self[g0] = x
self[0] = [[0.0], [[0.0], [stop_velocity]]]
self[max(self)][1] = [[0, 1], [dfl.INF] * 2]
self.cloud = {k: copy.deepcopy(v) for k, v in self.items()}
self._fit_cloud()
return self
def _fit_cloud(self):
from scipy.interpolate import InterpolatedUnivariateSpline as spl
def _line(n, m, i):
x = np.mean(m[i]) if m[i] else None
k_p = n - 1
while k_p > 0 and k_p not in self:
k_p -= 1
x_up = self[k_p][not i](0) if k_p >= 0 else x
if x is None or x > x_up:
x = x_up
return spl([0, 1], [x] * 2, k=1)
self.clear()
self.update(copy.deepcopy(self.cloud))
for k, v in sorted(self.items()):
v[0] = _line(k, v, 0)
if len(v[1][0]) > 2:
v[1] = _gspv_interpolate_cloud(*v[1])
elif v[1][1]:
v[1] = spl([0, 1], [np.mean(v[1][1])] * 2, k=1)
else:
v[1] = self[k - 1][0]
@property
def limits(self):
limits = {}
X = [dfl.INF, 0]
for v in self.cloud.values():
X[0] = min(min(v[1][0]), X[0])
X[1] = max(max(v[1][0]), X[1])
X = list(np.linspace(*X))
X = [0] + X + [X[-1] * 1.1]
for k, func in self.items():
limits[k] = [(f(X), X) for f, x in zip(func, X)]
return limits
[docs] def plot(self):
import matplotlib.pylab as plt
for k, v in self.limits.items():
kv = {}
for (s, l), (x, y) in zip((('down', '--'), ('up', '-')), v):
if x[0] < dfl.INF:
kv['label'] = 'Gear %d:%s-shift' % (k, s)
kv['linestyle'] = l
# noinspection PyProtectedMember
kv['color'] = plt.plot(x, y, **kv)[0]._color
cy, cx = self.cloud[k][1]
if cx[0] < dfl.INF:
kv.pop('label')
kv['linestyle'] = ''
kv['marker'] = 'o'
plt.plot(cx, cy, **kv)
plt.legend(loc='best')
plt.xlabel('Velocity [km/h]')
plt.ylabel('Power [kW]')
def _init_gear(self, times, velocities, accelerations, motive_powers,
engine_coolant_temperatures):
keys = sorted(self.keys())
matrix, c = {}, len(keys) - 1
from co2mpas.utils import List
if isinstance(velocities, List) or isinstance(motive_powers, List):
for i, k in enumerate(keys):
matrix[k] = self[k], (keys[max(0, i - 1)], keys[min(i + 1, c)])
def _next(gear, index):
# noinspection PyShadowingNames
v, p = velocities[index], motive_powers[index]
(_down, _up), (g0, g1) = matrix[gear]
if v >= _up(p):
return g1
if v < _down(p):
return g0
return gear
else:
r = velocities.shape[0]
for i, g in enumerate(keys):
down, up = [func(motive_powers) for func in self[g]]
matrix[g] = p = np.tile(g, r)
p[velocities < down] = keys[max(0, i - 1)]
p[velocities >= up] = keys[min(i + 1, c)]
def _next(gear, index):
return matrix[gear][index]
return _next
# noinspection PyUnresolvedReferences,PyTypeChecker
[docs] def convert(self, velocity_speed_ratios):
if velocity_speed_ratios != self.velocity_speed_ratios:
# noinspection PyProtectedMember
from .cmv import _convert_limits
vsr, n_vsr = self.velocity_speed_ratios, velocity_speed_ratios
limits = [dfl.INF, 0]
for v in self.cloud.values():
if v[1][0]:
limits[0] = min(min(v[1][0]), limits[0])
limits[1] = max(max(v[1][0]), limits[1])
cloud = self.cloud = {}
for p in np.linspace(*limits):
cmv = _convert_limits(
{k: [f(p) for f in v] for k, v in self.items()}, vsr, n_vsr
)
for k, (l, u) in sorted(cmv.items()):
c = cloud[k] = cloud.get(k, [[], [[], []]])
c[0].append(l)
c[1][0].append(p)
c[1][1].append(u)
cloud[0] = [[0.0], [[0.0], [self[0][1](0.0)]]]
cloud[max(cloud)][1] = [[0, 1], [dfl.INF] * 2]
self._fit_cloud()
self.velocity_speed_ratios = n_vsr
return self
[docs]@sh.add_function(dsp, outputs=['GSPV'])
def calibrate_gspv(
gears, velocities, motive_powers, velocity_speed_ratios, stop_velocity):
"""
Identifies gear shifting power velocity matrix.
:param gears:
Gear vector [-].
:type gears: numpy.array
:param velocities:
Vehicle velocity [km/h].
:type velocities: numpy.array
:param motive_powers:
Motive power [kW].
:type motive_powers: numpy.array
:param velocity_speed_ratios:
Constant velocity speed ratios of the gear box [km/(h*RPM)].
:type velocity_speed_ratios: dict[int | float]
:param stop_velocity:
Maximum velocity to consider the vehicle stopped [km/h].
:type stop_velocity: float
:return:
Gear shifting power velocity matrix.
:rtype: dict
"""
gspv = GSPV().fit(
gears, velocities, motive_powers, velocity_speed_ratios, stop_velocity
)
return gspv
# predict gears with corrected matrix velocity
dsp.add_function(
function=prediction_gears_gsm,
inputs=[
'correct_gear', 'gear_filter', 'GSPV', 'times', 'velocities',
'accelerations', 'motive_powers', 'cycle_type', 'velocity_speed_ratios'
],
outputs=['gears']
)