# -*- coding: utf-8 -*-
#
# Copyright 2015-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
Functions to read inputs from excel.
"""
import math
import regex
import logging
import functools
import collections
import os.path as osp
import schedula as sh
log = logging.getLogger(__name__)
_base_params = r"""
^((?P<scope>base)(\.|\s+))?
((?P<usage>(target|input|output|data))s?(\.|\s+))?
((?P<stage>(precondition|calibration|prediction))s?(\.|\s+))?
((?P<cycle>WLTP([-_]{1}[HLP]{1})?|
NEDC([-_]{1}[HL]{1})?|
ALL([-_]{1}[HL]{1})?)(recon)?(\.|\s+))?
(?P<param>[^\s.]*)\s*$
|
^((?P<scope>base)(\.|\s+))?
((?P<usage>(target|input|output|data))s?(\.|\s+))?
((?P<stage>(precondition|calibration|prediction))s?(\.|\s+))?
((?P<param>[^\s.]*))?
((.|\s+)(?P<cycle>WLTP([-_]{1}[HLP]{1})?|
NEDC([-_]{1}[HL]{1})?|
ALL([-_]{1}[HL]{1})?)(recon)?)?\s*$
"""
_flag_params = r"""^(?P<scope>flag)(\.|\s+)(?P<flag>(input_version|vehicle_family_id))\s*$"""
_dice_params = r"""^(?P<scope>dice)(\.|\s+)(?P<dice>[^\s.]*)\s*$"""
_meta_params = r"""
^(?P<scope>meta)(\.|\s+)((?P<meta>.+)(\.|\s+))?((?P<param>[^\s.]+))\s*$
"""
_plan_params = r"""
^(?P<scope>plan)(\.|\s+)(
(?P<index>(id|base|run_base))\s*$
|
""" + _flag_params.replace('<scope>', '<v_scope>').replace('^(', '(') + r"""
|
""" + _dice_params.replace('<scope>', '<v_scope>').replace('^(', '(') + r"""
|
""" + _meta_params.replace('<scope>', '<v_scope>').replace('^(', '(') + r"""
|
""" + _base_params.replace('<scope>', '<v_scope>').replace('^(', '(') + r"""
)
"""
_re_params_name = regex.compile(
r"""
^(?P<param>((plan|base|flag|dice)|
(target|input|output|data|meta)|
((precondition|calibration|prediction)s?)|
(WLTP([-_]{1}[HLP]{1})?|
NEDC([-_]{1}[HL]{1})?|
ALL([-_]{1}[HL]{1})?)(recon)?))\s*$
|
""" + _flag_params + r"""
|
""" + _dice_params + r"""
|
""" + _meta_params + r"""
|
""" + _plan_params + r"""
|
""" + _base_params, regex.IGNORECASE | regex.X | regex.DOTALL)
_base_sheet = r"""
^((?P<scope>base)(\.|\s+)?)?
((?P<usage>(target|input|output|data))s?(\.|\s+)?)?
((?P<stage>(precondition|calibration|prediction))s?(\.|\s+)?)?
((?P<cycle>WLTP([-_]{1}[HLP]{1})?|
NEDC([-_]{1}[HL]{1})?|
ALL([-_]{1}[HL]{1})?)(recon)?(\.|\s+)?)?
(?P<type>(pa|ts|pl))?\s*$
"""
_flag_sheet = r"""^(?P<scope>flag)((\.|\s+)(?P<type>(pa|ts|pl)))?\s*$"""
_dice_sheet = r"""^(?P<scope>dice)((\.|\s+)(?P<type>(pa|ts|pl)))?\s*$"""
_meta_sheet = r"""
^(?P<scope>meta)((\.|\s+)(?P<meta>.+))?(\.|\s+)(?P<type>(pa|ts|pl))\s*$
"""
_plan_sheet = r"""
^(?P<scope>plan)((\.|\s+)(
""" + _flag_sheet.replace('<scope>', '<v_scope>').replace('^(', '(') + r"""
|
""" + _dice_sheet.replace('<scope>', '<v_scope>').replace('^(', '(') + r"""
|
""" + _meta_sheet.replace('<scope>', '<v_scope>').replace('^(', '(') + r"""
|
""" + _base_sheet.replace('<scope>', '<v_scope>').replace('^(', '(') + r"""
))?\s*$
"""
_re_input_sheet_name = regex.compile(r'|'.join(
(_flag_sheet, _dice_sheet, _meta_sheet, _plan_sheet, _base_sheet)
), regex.IGNORECASE | regex.X | regex.DOTALL)
_re_space_dot = regex.compile(r'(\s*\.\s*|\s+)')
_xl_ref = {
'pa': '#%s!B2:C_:["pipe", ["dict", "recurse"]]',
'ts': '#%s!A2(R):.3:RD:["df", {"header": 0}]',
'pl': '#%s!A1(R):._:R:"recurse"'
}
# noinspection PyShadowingBuiltins,PyUnusedLocal
def _get_sheet_type(
type=None, usage=None, cycle=None, scope='base', **kw):
if type:
pass
elif scope == 'plan':
type = 'pl'
elif scope in ('flag', 'dice') or not cycle:
type = 'pa'
else:
type = 'ts'
return type
def _check_none(v):
if v is None:
return True
elif isinstance(v, collections.Iterable) and not isinstance(v, str) \
and len(v) <= 1:
# noinspection PyTypeChecker
return _check_none(next(iter(v))) if len(v) == 1 else True
return False
def _isempty(val):
return isinstance(val, float) and math.isnan(val) or _check_none(val)
# noinspection PyUnusedLocal
def _get_cycle(cycle=None, usage=None, **kw):
if cycle is None or cycle == 'all':
cycle = 'nedc_h', 'nedc_l', 'wltp_h', 'wltp_l'
if cycle == 'all':
cycle += 'wltp_p',
elif cycle == 'wltp':
cycle = 'wltp_h', 'wltp_l'
elif cycle == 'nedc':
cycle = 'nedc_h', 'nedc_l'
elif cycle in ('all-h', 'all_h'):
cycle = 'nedc_h', 'wltp_h'
elif cycle in ('all-l', 'all_l'):
cycle = 'nedc_l', 'wltp_l'
elif isinstance(cycle, str):
cycle = cycle.replace('-', '_')
return cycle
# noinspection PyUnusedLocal
def _get_default_stage(stage=None, cycle=None, usage=None, **kw):
if stage is None:
if cycle == 'wltp_p':
stage = 'precondition'
elif 'nedc' in cycle or usage == 'target':
stage = 'prediction'
else:
stage = 'calibration'
return stage.replace(' ', '')
def _parse_key(scope='base', usage='input', **match):
if scope == 'flag':
if match['flag'] == 'vehicle_family_id':
scope = 'dice'
yield scope, match['flag']
elif scope == 'dice':
yield scope, match['dice']
elif scope == 'meta':
meta = _re_space_dot.sub(match.get('meta', ''), '.').replace('-', '_')
yield scope, meta, match['param']
elif scope == 'plan':
if 'param' in match:
m = _re_params_name.match('.'.join((scope, match['param'])))
if m:
m = {i: j for i, j in m.groupdict().items() if j}
if 'index' in m:
match = m
if 'index' in match:
yield scope, match['index']
else:
for k in _parse_key(match.get('v_scope', 'base'), usage, **match):
yield scope, '.'.join(k)
elif scope == 'base':
i = match['param']
if i.lower() == 'version':
yield 'flag', 'input_version'
else:
m = match.copy()
for c in sh.stlp(_get_cycle(usage=usage, **match)):
m['cycle'] = c
stage = _get_default_stage(usage=usage, **m)
yield scope, usage, stage, c, i
def _parse_values(data, default=None, where=''):
default = default or {}
for k, v in data.items():
k = k.strip(' ')
match = _re_params_name.match(k) if k is not None else None
if not match and default.get('scope') == 'meta':
# noinspection PyTypeChecker
match = _re_params_name.match(
'.'.join(filter(bool, ('meta', default.get('meta'), k)))
)
if not match:
log.warning("Parameter '%s' %s cannot be parsed!", k, where)
continue
elif _isempty(v):
continue
match = {i: j.lower() for i, j in match.groupdict().items() if j}
for key in _parse_key(**sh.combine_dicts(default, match)):
yield key, v
# noinspection PyProtectedMember
@functools.lru_cache(None)
def _lasso_filters():
from pandalone.xleash._filter import install_default_filters
from pandalone.xleash._pandas_filters import install_filters
filters = {}
install_default_filters(filters)
install_filters(filters)
return filters
def _parse_sheet(match, sheet, sheet_name, res=None):
if res is None:
res = {}
sh_type = _get_sheet_type(**match)
# noinspection PyProtectedMember
from pandalone.xleash._lasso import lasso
data = lasso(
_xl_ref[sh_type] % sheet_name, sheet=sheet,
available_filters=_lasso_filters()
)
if sh_type == 'pl':
try:
import pandas as pd
data = pd.DataFrame(data[1:], columns=data[0])
except IndexError:
return None
if 'id' not in data:
data['id'] = data.index + 1
else:
data['id'] = data['id'].apply(
lambda x: x.strip() if isinstance(x, str) else x
)
data.set_index(['id'], inplace=True)
data.dropna(how='all', inplace=True)
data.dropna(axis=1, how='all', inplace=True)
elif sh_type == 'ts':
data.dropna(how='all', inplace=True)
data.dropna(axis=1, how='all', inplace=True)
# noinspection PyProtectedMember
mask = data.count(0) == len(data._get_axis(0))
# noinspection PyUnresolvedReferences
drop = [k for k, v in mask.items() if not v]
if drop:
msg = 'Columns {} in {} sheet contains nan.\n ' \
'Please correct the inputs!'
raise ValueError(msg.format(drop, sheet_name))
data = data.to_dict('list')
else:
data = {k: v for k, v in data.items() if k}
for k, v in _parse_values(data, match, "in sheet '%s'" % sheet_name):
sh.get_nested_dicts(res, *k[:-1])[k[-1]] = v
return res
def _add_times_base(data, scope='base', usage='input', **match):
if scope != 'base':
return
sh_type = _get_sheet_type(scope=scope, usage=usage, **match)
n = (scope, 'target')
if sh_type == 'ts' and sh.are_in_nested_dicts(data, *n):
t = sh.get_nested_dicts(data, *n)
for k, v in sh.stack_nested_keys(t, key=n, depth=2):
if 'times' not in v:
n = list(k + ('times',))
n[1] = usage
if sh.are_in_nested_dicts(data, *n):
v['times'] = sh.get_nested_dicts(data, *n)
else:
for i, j in sh.stack_nested_keys(data, depth=4):
if 'times' in j:
v['times'] = j['times']
break
def _add_index_plan(plan, file_path):
if 'base' not in plan:
plan['base'] = file_path
else:
d = osp.dirname(file_path)
plan['base'].fillna(osp.basename(file_path), inplace=True)
plan['base'] = plan['base'].apply(
lambda x: osp.isabs(x) and x or osp.join(d, x)
)
plan['base'] = plan['base'].apply(osp.normpath)
if 'run_base' not in plan:
plan['run_base'] = True
else:
plan['run_base'].fillna(True)
plan['id'] = plan.index
return plan
def _finalize_plan(res, plans, file_path):
import pandas as pd
if not plans:
plans = (pd.DataFrame(),)
for k, v in sh.stack_nested_keys(res.get('plan', {}), depth=4):
n = '.'.join(k)
m = '.'.join(k[:-1])
for p in plans:
if any(c.startswith(m) for c in p.columns):
if n in p:
p[n].fillna(value=v, inplace=True)
else:
p[n] = v
plan = pd.concat(plans, axis=1, copy=False, verify_integrity=True)
# noinspection PyTypeChecker
return _add_index_plan(plan, file_path)
[docs]def parse_excel_file(input_file_name, input_file):
"""
Reads cycle's data and simulation plans.
:param input_file_name:
Input file name.
:type input_file_name: str
:param input_file:
Input file.
:type input_file: io.BytesIO
:return:
Raw input data.
:rtype: dict
"""
import pandas as pd
# noinspection PyProtectedMember
from pandalone.xleash.io._xlrd import _open_sheet_by_name_or_index
xl_file, res, plans = pd.ExcelFile(input_file), {'base': {}}, []
for sheet_name in xl_file.sheet_names:
match = _re_input_sheet_name.match(sheet_name.strip(' '))
if not match:
log.debug("Sheet name '%s' cannot be parsed!", sheet_name)
continue
match = {k: v.lower() for k, v in match.groupdict().items() if v}
# noinspection PyProtectedMember
sheet = _open_sheet_by_name_or_index(xl_file.book, 'book', sheet_name)
is_plan = match.get('scope', None) == 'plan'
if is_plan:
r = {'plan': pd.DataFrame()}
else:
r = {}
r = _parse_sheet(match, sheet, sheet_name, res=r)
if is_plan:
plans.append(r['plan'])
else:
_add_times_base(r, **match)
sh.combine_nested_dicts(r, depth=5, base=res)
for k, v in sh.stack_nested_keys(res['base'], depth=3):
if k[0] != 'target':
v['cycle_type'] = v.get('cycle_type', k[-1].split('_')[0]).upper()
v['cycle_name'] = v.get('cycle_name', k[-1]).upper()
res['plan'] = _finalize_plan(res, plans, input_file_name).to_dict('records')
return res