# -*- coding: utf-8 -*-
#
# Copyright 2015-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
Functions to write outputs on an excel file.
"""
import regex
import logging
import itertools
import collections
import schedula as sh
log = logging.getLogger(__name__)
def _clone_excel(file_name):
from urllib.error import URLError
import openpyxl
try:
from urllib.request import urlopen
book = openpyxl.load_workbook(urlopen(file_name))
except (ValueError, URLError):
with open(file_name, 'rb') as file:
book = openpyxl.load_workbook(file)
import io
import pandas as pd
fd = io.BytesIO()
# noinspection PyTypeChecker
writer = pd.ExcelWriter(
fd, engine='openpyxl', optimized_write=True, write_only=True
)
writer.book = book
writer.sheets.update(dict((ws.title, ws) for ws in book.worksheets))
return writer, fd
def _sort_sheets(x):
x = x[0]
imp = ['summary', 'dice', 'graphs', 'plan', 'nedc_h', 'nedc_l', 'wltp_h',
'wltp_l', 'wltp_p', 'prediction', 'calibration', 'input', 'pa', 'ts']
w = ()
for i, k in enumerate(imp):
if k in x:
w = (i,) + _sort_sheets((x.replace(k, ''),))[0]
break
return w or (100,), x
def _multi_index_df2excel(writer, shname, df, index=True, **kw):
try:
df.to_excel(writer, shname, index=index, **kw)
except NotImplementedError as ex:
import pandas as pd
if not index and isinstance(df.columns, pd.MultiIndex):
kw = kw.copy()
if kw.pop('header', True):
header = pd.DataFrame([c for c in df.columns]).T
header.to_excel(writer, shname, index=False, header=False, **kw)
kw['startrow'] = kw.get('startrow', 0) + header.shape[0]
values = pd.DataFrame(df.values)
values.to_excel(writer, shname, index=False, header=False, **kw)
else:
raise ex
def _get_defaults(func):
import inspect
a = inspect.getfullargspec(func)
defaults = {}
if a.defaults:
defaults.update(zip(a.args[::-1], a.defaults[::-1]))
if a.kwonlydefaults:
defaults.update(a.kwonlydefaults)
return defaults
def _index_levels(index):
# noinspection PyBroadException
try:
return len(index.levels)
except Exception:
return 1
# noinspection PyUnusedLocal
def _get_corner(df, startcol=0, startrow=0, index=False, header=True, **kw):
import json
import xlrd
ref = {}
if header:
i = _index_levels(df.columns)
ref['header'] = list(range(i))
startrow += i
import pandas as pd
if index and isinstance(df.columns, pd.MultiIndex):
ref['skiprows'] = [i + 1]
startrow += 1
if index:
i = _index_levels(df.index)
ref['index_col'] = list(range(i))
startcol += i
landing = xlrd.cellname(startrow, startcol)
ref = json.dumps(ref, sort_keys=True)
ref = '{}(L):..(DR):LURD:["df", {}]'.format(landing, ref)
return startrow, startcol, ref
def _convert_index(k):
if not isinstance(k, collections.Iterable):
k = (str(k),)
elif isinstance(k, str):
k = (k,)
return k
def _rangename2d(rlo, clo, rhi, chi):
import xlrd
return "%s:%s" % (xlrd.cellnameabs(rlo, clo), xlrd.cellnameabs(rhi, chi))
def _ranges_by_col_row(df, startrow, startcol):
for row, i in enumerate(df.index, start=startrow):
i = _convert_index(i)
for col, c in enumerate(df.columns, start=startcol):
yield i + _convert_index(c), _rangename2d(row, col, row, col)
def _ranges_by_col(df, startrow, startcol):
for col, (k, v) in enumerate(df.items(), start=startcol):
yield k, _rangename2d(startrow, col, startrow + len(v) - 1, col)
def _ranges_by_row(df, startrow, startcol):
for row, (k, v) in enumerate(df.iterrows(), start=startrow):
yield k, _rangename2d(row, startcol, row, startcol + len(v) - 1)
def _add_named_ranges(df, writer, shname, startrow, startcol, named_ranges, k0):
ref = '!'.join([shname, '%s'])
# noinspection PyBroadException
try:
define_name = writer.book.define_name
def _create_named_range(ref_n, ref_r):
define_name(ref % ref_n, ref % ref_r)
except Exception: # Use other pkg.
define_name = writer.book.create_named_range
scope = writer.book.index(writer.sheets[shname])
def _create_named_range(ref_n, ref_r):
define_name(ref_n, value=ref % ref_r, scope=scope)
tag = ()
if hasattr(df, 'name'):
tag += (df.name,)
it = ()
if 'rows' in named_ranges and 'columns' in named_ranges:
it += (_ranges_by_col_row(df, startrow, startcol),)
elif 'columns' in named_ranges:
it += (_ranges_by_col(df, startrow, startcol),)
elif 'rows' in named_ranges:
it += (_ranges_by_row(df, startrow, startcol),)
for k, range_ref in itertools.chain(*it):
k = _convert_index(k)
if k:
try:
k = tag + k[k0:]
_create_named_range(_ref_name(*k), range_ref)
except TypeError:
pass
def _df2excel(writer, shname, df, k0=0, named_ranges=('columns', 'rows'), **kw):
import pandas as pd
if isinstance(df, pd.DataFrame) and not df.empty:
_multi_index_df2excel(writer, shname, df, **kw)
defaults = _get_defaults(df.to_excel)
defaults.update(kw)
kw = defaults
startrow, startcol, ref = _get_corner(df, **kw)
ref_name = (shname, df.name) if hasattr(df, 'name') else (shname,)
ref = {'.'.join(ref_name): '#%s!%s' % (shname, ref)}
if named_ranges:
_add_named_ranges(df, writer, shname, startrow, startcol,
named_ranges, k0)
return (startrow, startcol), ref
def _write_sheets(writer, sheet_name, data, down=True, **kw):
import pandas as pd
if isinstance(data, pd.DataFrame):
return [_df2excel(writer, sheet_name, data, **kw)]
else:
refs = []
for d in data:
ref = _write_sheets(writer, sheet_name, d, down=not down, **kw)
refs.extend(ref)
if ref[-1]:
corner = ref[-1][0]
if down:
kw['startrow'] = d.shape[0] + corner[0] + 2
else:
kw['startcol'] = d.shape[1] + corner[1] + 2
return refs
def _sheet_name(tags):
return '.'.join(tags)
def _ref_name(*names):
return '_{}'.format(regex.sub(r"[\W]", "_", '.'.join(names)))
def _data_ref(ref):
return '%s!%s' % (_sheet_name(ref[:-1]), _ref_name(ref[-1]))
def _chart2excel(writer, sheet, charts):
import xlrd
from openpyxl.chart import ScatterChart, Series
sn = writer.book.sheetnames
named_ranges = {'%s!%s' % (sn[d.localSheetId], d.name): d.value
for d in writer.book.defined_names.definedName}
m, h, w = 3, 7.94, 13.55
for i, (k, v) in enumerate(sorted(charts.items())):
chart = ScatterChart()
chart.height = h
chart.width = w
_map = {
('title', 'name'): ('title',),
('y_axis', 'name'): ('y_axis', 'title'),
('x_axis', 'name'): ('x_axis', 'title'),
}
_filter = {
('legend', 'position'): lambda x: x[0],
}
it = {s: _filter[s](o) if s in _filter else o
for s, o in sh.stack_nested_keys(v['set'])}
for s, o in sh.map_dict(_map, it).items():
c = chart
for j in s[:-1]:
c = getattr(c, j)
setattr(c, s[-1], o)
for s in v['series']:
xvalues = named_ranges[_data_ref(s['x'])]
values = named_ranges[_data_ref(s['y'])]
series = Series(values, xvalues, title=s['label'])
chart.series.append(series)
n = int(i / m)
j = i - n * m
sheet.add_chart(chart, xlrd.cellname(15 * j, 8 * n))
[docs]def write_to_excel(dfs, output_template):
"""
Writes DataFrames to excel.
:param dfs:
DataFrames of vehicle output report.
:type dfs: dict[str, pandas.DataFrame]
:param output_template:
Template output.
:type output_template: str
:return:
Excel output file.
:rtype: io.BytesIO
"""
import pandas as pd
log.debug(
'Writing into xl-file based on template(%s)...', output_template
)
writer, fd = _clone_excel(output_template)
xlref, calculate_sheets, charts = [], sorted(writer.sheets), []
for k, v in sorted(dfs.items(), key=_sort_sheets):
if not k.startswith('graphs.'):
down = True
if k.endswith('pa'):
kw = {'named_ranges': ('rows',), 'index': True, 'k0': 1}
elif k.endswith('ts'):
kw = {'named_ranges': ('columns',), 'index': False, 'k0': 1}
elif k.endswith('proc_info'):
down = False
kw = {'named_ranges': ()}
else:
kw = {}
xlref.extend(_write_sheets(writer, k, v, down=down, **kw))
else:
try:
sheet = writer.book.add_worksheet(k)
except AttributeError:
sheet = writer.book.create_sheet(title=k)
charts.append((sheet, v))
for sheet, v in charts:
_chart2excel(writer, sheet, v)
if xlref:
xlref = sorted(sh.combine_dicts(*[x[1] for x in xlref]).items())
xlref = pd.DataFrame(xlref)
xlref.set_index([0], inplace=True)
_df2excel(
writer, 'xlref', xlref, named_ranges=(), index=True, header=False
)
writer.save()
return fd