@sh.add_function(dsp, outputs=['gears'], weight=10)
def identify_gears_v1(
times, velocities, accelerations, motive_powers, on_engine,
engine_speeds_out, velocity_speed_ratios, stop_velocity,
plateau_acceleration, change_gear_window_width, idle_engine_speed,
correct_gear):
"""
Identifies gear time series [-].
:param times:
Time vector [s].
:type times: numpy.array
:param velocities:
Velocity vector [km/h].
:type velocities: numpy.array
:param accelerations:
Acceleration vector [m/s2].
:type accelerations: numpy.array
:param motive_powers:
Motive power [kW].
:type motive_powers: numpy.array
:param on_engine:
If the engine is on [-].
:type on_engine: numpy.array
:param engine_speeds_out:
Engine speed [RPM].
:type engine_speeds_out: numpy.array
:param velocity_speed_ratios:
Constant velocity speed ratios of the gear box [km/(h*RPM)].
:type velocity_speed_ratios: dict[int | float]
:param stop_velocity:
Maximum velocity to consider the vehicle stopped [km/h].
:type stop_velocity: float
:param plateau_acceleration:
Maximum acceleration to be at constant velocity [m/s2].
:type plateau_acceleration: float
:param change_gear_window_width:
Time window used to apply gear change filters [s].
:type change_gear_window_width: float
:param idle_engine_speed:
Engine speed idle median and std [RPM].
:type idle_engine_speed: (float, float)
:param correct_gear:
A function to correct the gear predicted.
:type correct_gear: callable
:return:
Gear vector identified [-].
:rtype: numpy.array
"""
n = [k for k in velocity_speed_ratios if k != 0]
if len(n) == 1:
gears = np.ones_like(times, int) * n[0]
gears[velocities <= stop_velocity] = 0
return gears
with np.errstate(divide='ignore', invalid='ignore'):
r = velocities / engine_speeds_out
idle = (idle_engine_speed[0] - idle_engine_speed[1],
idle_engine_speed[0] + idle_engine_speed[1])
r[engine_speeds_out <= idle[0]] = 0
_vsr = sh.combine_dicts(velocity_speed_ratios, base={0: 0})
g, vsr = np.array([(k, v) for k, v in sorted(_vsr.items())]).T
dr = np.abs(vsr[:, None] - r)
i, j = np.argmin(dr, 0), np.arange(times.shape[0])
b = velocities <= vsr[i] * idle[0]
if idle[1]:
b |= np.abs(velocities / idle[1] - r) < dr[i, j]
b = (velocities <= stop_velocity) | (b & (accelerations < 0))
gears = np.where(b, 0, g[i]).astype(int)
gears = co2_utl.median_filter(times, gears, change_gear_window_width)
gears = _correct_gear_shifts(times, r, gears, velocity_speed_ratios)
gears = co2_utl.clear_fluctuations(times, gears, change_gear_window_width)
anomalies = velocities > stop_velocity
anomalies &= (accelerations > 0) | ~on_engine
anomalies |= accelerations > plateau_acceleration
anomalies &= gears == 0
from ..control.conventional import calculate_engine_speeds_out_hot
ds = calculate_engine_speeds_out_hot(calculate_gear_box_speeds_in(
gears, velocities, velocity_speed_ratios, stop_velocity
), on_engine, idle_engine_speed) - engine_speeds_out
i = np.where(on_engine & ~anomalies)[0]
med = np.nanmedian(ds[i])
std = 3 * max(50, co2_utl.mad(ds[i], med=med))
anomalies[i] |= np.abs(ds[i] - med) >= std
b = (gears == 0) & (velocities <= stop_velocity)
anomalies[b] = False
anomalies = co2_utl.clear_fluctuations(
times, anomalies.astype(int), change_gear_window_width
)
for i, j in sh.pairwise(_shift(gears)):
anomalies[i:j] = anomalies[i:j].mean() > .3
gsm = _calibrate_gsm(
velocity_speed_ratios, on_engine, anomalies, gears, velocities,
stop_velocity, idle_engine_speed
).init_gear(
gears, times, velocities, accelerations, motive_powers,
correct_gear=correct_gear
)
for i, v in enumerate(anomalies):
if v:
gears[i] = gsm(i)
gears = co2_utl.median_filter(times, gears, change_gear_window_width)
gears = co2_utl.clear_fluctuations(times, gears, change_gear_window_width)
return gears.astype(int)