import functools
import os as _os
import warnings
from isopy import core
import csv as csv
import datetime as dt
import numpy as np
import chardet
import openpyxl
import pyperclip
from openpyxl import load_workbook
import itertools
import io
import numpy as np
__all__ = ['read_exp',
'read_csv', 'write_csv',
'read_xlsx', 'write_xlsx',
'read_clipboard', 'write_clipboard']
import isopy.checks
NAN_STRINGS = 'nan #NA #N/A N/A NA =NA() =na()'.split()
def rows_to_data(data, has_keys, keys_in_first):
if len(data) == 0 or len(data[0])==0:
if has_keys is not True and keys_in_first is None:
return [[]]
else:
return dict()
# Everything needs to be converted to nan before it gets here
if has_keys is not False and keys_in_first is None:
r = [type(v) is str for v in data[0]]
c = [type(d[0]) is str for d in data]
if False not in c and False not in r:
# Both are strings
try:
float(data[0][0])
except ValueError:
f = True
else:
f = False
try:
[float(v) for v in data[0][1:]]
except ValueError:
r = True
else:
r = False
try:
[float(v[0]) for v in data[1:]]
except ValueError:
c = True
else:
c = False
if has_keys is None and not c and not r:
if f and (len(data)) == 1:
keys_in_first = 'c'
elif f and len(data[0]) == 1:
keys_in_first = 'r'
elif f:
# Both the first row and the first
warnings.warn('Both the first row and the first column contain strings. Using first row as keys')
keys_in_first = 'r'
#raise ValueError('Unable to determine whether the first rows or columns contains the keys')
else:
# All values in the first row and the first column can be converted to float
has_keys = False
elif c and not r:
keys_in_first = 'c'
elif r and not c:
keys_in_first = 'r'
else:
# Both the first row and the first column contains at least one value that cant be converted to float
warnings.warn('Both the first row and the first column contain strings. Using first row as keys')
keys_in_first = 'r'
#raise ValueError('Unable to determine whether the first rows or columns contains the keys')
elif False not in c:
# Only c contains string
keys_in_first = 'c'
elif False not in r:
# Only r contains strings
keys_in_first = 'r'
else:
# Neither contain only strings
has_keys=False
if has_keys is False:
return data
elif keys_in_first == 'c':
return {v[0]: v[1:] for v in data}
elif keys_in_first == 'r':
return {v[0]: v[1:] for v in zip(*data)}
else:
raise ValueError(f'Unknown value for "keys_in_first" {keys_in_first}')
def data_to_rows(data, keys_in_first, keyfmt = None):
data = isopy.asanyarray(data)
if isinstance(data, core.IsopyArray):
if data.ndim == 0:
data = data.reshape(-1)
if keys_in_first == 'r':
rows = [[k.str(keyfmt) for k in data.keys()]]
rows += data.to_list()
elif keys_in_first == 'c':
rows = [[k.str(keyfmt)] + v.tolist() for k, v in data.items()]
else:
raise ValueError(f'Unknown value for "keys_in_first" {keys_in_first}')
else:
if data.ndim == 0:
data = data.reshape((1, 1))
elif data.ndim == 1:
data = data.reshape((1, -1))
elif data.ndim > 2:
raise ValueError('data cannot have more than 2 dimensions')
rows = data.tolist()
return rows
################
### read exp ###
################
class NeptuneData:
"""
Container for the data returned by ``read_exp``.
"""
def __init__(self, info, cycle, time, measurements):
self.info = info
self.cycle = cycle
self.time = time
self.measurements = measurements
[docs]def read_exp(filename, rename = None) -> NeptuneData:
"""
Load data from a Neptune/Triton export file.
Parameters
----------
filename : str, bytes, StringIO, BytesIO
Path for file to be opened. Alternatively a file like byte string or a file like object can be supplied.
rename : dict, Callable, Optional
For renaming keys in the analysed data. Useful for cases when the key is the mass rather
than the isotope measured. If a dictionary is passed then every key present in the
dictionary will be replaced by the associated value. A callable can also be passed that
takes the key in the file and returns the new key.
Returns
-------
neptune_data : NeptuneData
An object containing the following attributes:
* info - Dictionary containing the metadata included at the beginning of the file.
* cycle - A list containing the cycle number for each measurement.
* time - A list containing datetime objects for each measurement.
* measurements - An dictionary containing an an isopy array with the values in for each line measured. Static measurements are always given as line ``1``.
To extract e.g only the isotope data from a measurement use ``neptune_data.measurement[line].copy(flavour_eq='isotope')``.
"""
# If filename is a string load the files.
if type(filename) is str:
with open(filename, 'rb') as fileio:
file = fileio.read()
elif type(filename) is bytes:
file = filename
elif type(filename) is io.BytesIO:
filename.seek(0)
file = filename.read()
elif type(filename) is io.StringIO:
filename.seek(0)
file = filename.read()
else:
raise TypeError('filename is of unknown type')
if type(file) is bytes:
# find the files encoding
encoding = chardet.detect(file).get('encoding')
# Decode the bytes into string.
file = file.decode(encoding)
csv_reader = csv.reader(io.StringIO(file), dialect='excel-tab')
information = {}
for row in csv_reader:
if ':' in row[0]: # meta
name, value = row[0].split(':', 1)
information[name.strip()] = value.strip()
if row[0] == 'Cycle':
# We can reuse the function from before to read the integration data
data = _read_csv_data(row, csv_reader, termination_symbol='***')
data = rows_to_data(data, True, 'r')
if rename is None:
renamer = lambda name: name
elif isinstance(rename, dict):
renamer = lambda name: rename.get(name, name)
elif callable(rename):
renamer = rename
else:
raise TypeError('rename must be a dict or a callable function')
data.pop("", None)
cycle = [int(i) for i in data.pop('Cycle')]
time = data.pop('Time')
try:
time = [dt.datetime.strptime(time[i], '%H:%M:%S:%f') for i in range(len(time))]
except:
# Dont think the time will ever be given as a float
# time = [dt.datetime.fromtimestamp(time[i]).strftime('%H:%M:%S:%f') for i in range(len(time))]
warnings.warn('Unable to parse the time column')
time = [dt.datetime.fromtimestamp(0).strftime('%H:%M:%S:%f') for i in range(len(time))]
measurements = {}
for key in data:
# Check if the data has more than one line
if ":" in key:
line, keystring = key.split(":", 1)
line = int(line)
else:
line = 1
keystring = key
# Rename columns if needed
keystring = renamer(keystring)
measurements.setdefault(line, {})[isopy.keystring(keystring)] = data[key]
# Convert to isopy arrays
measurements = {line: isopy.asarray(measurements[line]) for line in measurements}
return NeptuneData(information, cycle, time, measurements)
######################
### read/write CSV ###
######################
[docs]def read_csv(filename, has_keys= None, keys_in_first = None, comment_symbol ='#', encoding = None, dialect = 'excel'):
"""
Load data from a csv file.
Parameters
----------
filename : str, bytes, StringIO, BytesIO
Path for file to be opened. Alternatively a file like byte string can be supplied.
Also accepts file like objects.
has_keys : bool, None
If True or *keys_in_first* is not None a dictionary will always be returned. If False an nexted list is
always returned. If None it will return a nested list of all values in the first row and column
can be converted to/is a float.
keys_in_first : {'c', 'r', None}
Where the keys are found. Give 'r' if the keys are found in the first row and 'c' if the
keys are in first column. If None it will analyse the data to determine where the keys are. If *has_keys*
is not False an exception will be raised if it cant determine where the keys are.
comment_symbol : str, Default = '#'
Rows starting with this string will be ignored.
encoding : str
Encoding of the file. If None the encoding will be guessed from the file.
dialect
Dialect of the csv file. If None the dialect will be guessed from the file.
Returns
-------
data : dict or list
Returns a dictionary for data with keys otherwise a list.
"""
#If filename is a string load the files.
if type(filename) is str:
with open(filename, 'rb') as fileio:
text = fileio.read()
elif type(filename) is bytes:
text = filename
elif type(filename) is io.BytesIO:
filename.seek(0)
text = filename.read()
elif type(filename) is io.StringIO:
filename.seek(0)
text = filename.read()
else:
raise TypeError(f'filename is of unknown type {type(filename)}')
if type(text) is bytes:
#find the files encoding
if encoding is None:
encoding = chardet.detect(text).get('encoding')
#Decode the bytes into string.
text = text.decode(encoding)
#find the csv dialect
if dialect is None:
dialect = csv.Sniffer().sniff(text)
# Create a reader object by converting the file string to a file-like object
csv_reader = csv.reader(io.StringIO(text), dialect=dialect)
for row in csv_reader:
row_data = [r.strip() for r in row] # Remove any training whitespaces from the data in this row
if len(row_data) == 0:
return rows_to_data([[]], has_keys, keys_in_first)
if comment_symbol is not None and row[0][:len(comment_symbol)] == comment_symbol:
pass # This is a comment so ignore it.
else:
data = _read_csv_data(row_data, csv_reader, comment_symbol)
return rows_to_data(data, has_keys, keys_in_first)
return rows_to_data([[]], has_keys, keys_in_first)
def _read_csv_data(first_row, reader, comment_symbol=None, termination_symbol=None):
data = []
dlen = None
# Loop over the remaining rows
for i, row in enumerate(itertools.chain([first_row], reader)):
row = [r.strip() for r in row]
if len(row) > 0:
if termination_symbol is not None and row[0][:len(termination_symbol)] == termination_symbol:
# Stop reading data if we find this string at the beginning of a row
break
if comment_symbol is not None and row[0][:len(comment_symbol)] == comment_symbol and row[0] not in NAN_STRINGS:
# Row is a comment, ignore
continue
data.append([])
if dlen is None:
dlen = len(row)
elif dlen != len(row):
raise ValueError('Rows have different number of values')
for j in range(len(row)):
value = row[j]
if value in NAN_STRINGS:
value = float('nan')
data[-1].append(value)
return data
[docs]def write_csv(filename, data, comments=None, keys_in_first='r', comment_symbol = '#', keyfmt = None,
dialect = 'excel') -> None:
"""
Save data to a csv file.
Parameters
----------
filename : str, StringIO, BytesIO
Path/name of the csv file to be created. Any existing file with the same path/name will be
over written. Also accepts file like objects.
data : isopy_array_like, numpy_array_like
Data to be saved in the array.
comments : str, Sequence[str], Optional
Comments to be included at the top of the file
keys_in_first : {'c', 'r'}
Only used if the input has keys. Give 'r' if the keys should be in the first row and 'c' if the
keys should be in the first column.
comment_symbol : str, Default = '#'
This string will precede any comments at the beginning of the file.
keyfmt
Specify the format used for the key string. See the ``str()`` method of each key string for options.
dialect
The CSV dialect used to save the file. Default to 'excel' which is a ', ' seperated file.
"""
rows = data_to_rows(data, keys_in_first, keyfmt)
if comments is not None:
if type(comments) is not list:
comments = [comments]
crows = []
for comment in comments:
crows.append([f'{comment_symbol}{comment}'] + ['' for i in range(len(rows[0][1:]))])
if len(rows[0]) == 0:
rows = crows
else:
rows = crows + rows
if type(filename) is str:
with open(filename, mode='w', newline='') as file:
csv_writer = csv.writer(file, dialect=dialect)
for row in rows:
csv_writer.writerow(row)
elif type(filename) is io.StringIO:
filename.truncate(0)
filename.seek(0)
csv_writer = csv.writer(filename, dialect=dialect)
for row in rows:
csv_writer.writerow(row)
elif type(filename) is io.BytesIO:
filename.truncate(0)
filename.seek(0)
file = io.StringIO()
csv_writer = csv.writer(file, dialect=dialect)
for row in rows:
csv_writer.writerow(row)
file.seek(0)
text = file.read()
filename.write(text.encode('UTF-8'))
else:
raise TypeError(f'filename is of unknown type {type(filename)}')
[docs]def read_clipboard(has_keys= None, keys_in_first = None, comment_symbol ='#', dialect = None):
"""
Load data from values in the clipboard.
Parameters
----------
comment_symbol : str, Default = '#'
Rows starting with this string will be ignored.
has_keys : bool, None
If True or *keys_in_first* is not None a dictionary will always be returned. If False an nexted list is
always returned. If None it will return a nested list of all values in the first row and column
can be converted to/is a float.
keys_in_first : {'c', 'r', None}
Where the keys are found. Give 'r' if the keys are found in the first row and 'c' if the
keys are in first column. If None it will analyse the data to determine where the keys are. If *has_keys*
is not False an exception will be raised if it cant determine where the keys are.
dialect
Dialect of the values in the clipboard. If None the dialect will be guessed from the values.
"""
data = pyperclip.paste()
return read_csv(io.StringIO(data), comment_symbol=comment_symbol, has_keys=has_keys,
keys_in_first=keys_in_first, dialect=dialect)
[docs]def write_clipboard(data, comments=None, keys_in_first='r',
comment_symbol = '#', keyfmt = None, dialect = 'excel'):
"""
Copies data to the clipboard
Parameters
----------
data : isopy_array_like, numpy_array_like
Data to be copied.
comments : str, Sequence[str], Optional
Comments to be included
keys_in_first : {'c', 'r'}
Only used if the input has keys. Give 'r' if the keys should be in the first row and 'c' if the
keys should be in the first column.
comment_symbol : str, Default = '#'
This string will precede any comments.
keyfmt
Specify the format used for the key string. See the ``str()`` method of each key string for options.
dialect
The CSV dialect used to copy the data to the clipboard. Default to 'excel' which is gives ', ' seperated data.
"""
text = io.StringIO()
write_csv(text, data, comments=comments, keys_in_first=keys_in_first, dialect=dialect,
comment_symbol=comment_symbol, keyfmt=keyfmt)
text.seek(0)
pyperclip.copy(text.read())
#######################
### Read/write xlsx ###
#######################
[docs]def read_xlsx(filename, sheetname=None, has_keys = None, keys_in_first=None, comment_symbol='#', start_at='A1'):
"""
Load data from an excel file.
Parameters
----------
filename : str, bytes, BytesIO
Path for file to be opened. Also accepts file like objects.
sheetname : str, int, Optional
To load data from a single sheet in the workbook pass either the name of the sheet or
the position of the sheet. If nothing is specified all the data for all sheets in the
workbook is returned.
has_keys : bool, None
If True or *keys_in_first* is not None a dictionary will always be returned. If False an nexted list is
always returned. If None it will return a nested list of all values in the first row and column
can be converted to/is a float.
keys_in_first : {'c', 'r', None}
Where the keys are found. Give 'r' if the keys are found in the first row and 'c' if the
keys are in first column. If None it will analyse the data to determine where the keys are. If *has_keys*
is not False an exception will be raised if it cant determine where the keys are.
comment_symbol : str, Default = '#'
Rows starting with this string will be ignored.
start_at : str, (int, int)
Start scanning at this cell. Can either be a excel style cell reference or a (row, column) tuple of integers.
Returns
-------
data : dict
A dictionary containing the data for a single sheet or a dictionary containing the data
for multiple sheets in the workbook indexed by sheet name.
"""
if type(filename) is bytes:
filename = io.BytesIO(filename)
if type(filename) is openpyxl.Workbook:
workbook = filename
else:
workbook = openpyxl.load_workbook(filename, data_only=True, read_only=True)
sheetnames = workbook.sheetnames
if sheetname is None:
sheet_data = {}
for sheetname in sheetnames:
sheet_data[sheetname] = _read_xlsx_sheet(workbook[sheetname], has_keys, keys_in_first, comment_symbol, start_at)
return sheet_data
elif type(sheetname) is int and sheetname < len(sheetnames):
return _read_xlsx_sheet(workbook[sheetnames[sheetname]], has_keys, keys_in_first, comment_symbol, start_at)
elif sheetname in sheetnames:
return _read_xlsx_sheet(workbook[sheetname], has_keys, keys_in_first, comment_symbol, start_at)
else:
raise ValueError(f'sheetname "{sheetname}" not found in workbook"')
def _read_xlsx_sheet(worksheet, has_keys, keys_in_first, comment_symbol, start_at):
if type(start_at) is tuple:
start_r, start_c = start_at
else:
start_r, start_c = openpyxl.utils.cell.coordinate_to_tuple(start_at)
#Remove inital comments
for ri in range(start_r, worksheet.max_row + 1):
value = worksheet.cell(ri, start_c).value
if type(value) is str and value[:len(comment_symbol)] == comment_symbol and value not in NAN_STRINGS:
continue
else:
start_r = ri
break
# Select area containing data
stop_c = worksheet.max_column + 1
stop_r = worksheet.max_row + 1
for ri in range(start_r + 1, stop_r+1):
values_r = []
for ci in range(start_c, stop_c):
if ri != stop_r:
# All rows should be included
cell = worksheet.cell(ri, start_c)
value = cell.value
if value == '':
values_r.append(None)
else:
values_r.append(value)
if values_r.count(None) == len(values_r):
values_c = []
if ci != (stop_c-1):
for rj in range(start_r, ri + 1):
cell = worksheet.cell(rj, ci + 1)
value = cell.value
if value == '':
values_c.append(None)
else:
values_c.append(value)
if values_c.count(None) == len(values_c):
stop_r = ri
stop_c = ci + 1
if ci == stop_c: break
if ri == stop_r: break
if (stop_r-start_r) == 1 and (stop_c-start_c) == 1 and ((cell:=worksheet.cell(start_r,start_c)).value is None or cell.value==''):
return rows_to_data([[]], has_keys, keys_in_first)
data = []
# Iterate over each column
for ri in range(start_r, stop_r):
value = worksheet.cell(ri, start_c).value
if type(value) is str and value[:len(comment_symbol)] == comment_symbol and value not in NAN_STRINGS:
continue
data.append([])
for ci in range(start_c, stop_c):
cell = worksheet.cell(ri, ci)
value = cell.value
if type(value) is str:
value = value.strip()
if cell.data_type == 'e' or (type(value) is str and value.upper() == '=NA()'):
data[-1].append(float('nan'))
elif value == '' or value in NAN_STRINGS or value is None:
data[-1].append(float('nan'))
else:
data[-1].append(value)
return rows_to_data(data, has_keys, keys_in_first)
[docs]def write_xlsx(filename, *sheets, comments = None,
keys_in_first= 'r', comment_symbol= '#', keyfmt = None,
start_at ="A1", append = False, clear = True, **sheetnames):
"""
Save data to an excel file.
Parameters
----------
filename : str, BytesIO
Path/name of the excel file to be created. Any existing file with the same path/name
will be overwritten. Also accepts file like objects.
sheets : isopy_array_like, numpy_array_like
Data given here will be saved as sheet1, sheet2 etc.
comments : str, Sequence[str], Optional
Comments to be included at the top of the file
keys_in_first : {'c', 'r'}
Only used if the input has keys. Give 'r' if the keys should be in the first row and 'c' if the
keys should be in the first column.
comment_symbol : str, Default = '#'
This string will precede any comments at the beginning of the file
keyfmt
Specify the format used for the key string. See the ``str()`` method of each key string for options.
start_at: str, (int, int)
The first cell where the data is written. Can either be a excel style cell reference or a (row, column)
tuple of integers.
append : bool, Default = False
If ``True`` and *filename* exists it will append the data to this workbook. An exception
is raised if *filename* is not a valid excel workbook.
clear : bool, Default = True
If ``True`` any preexisting sheets are cleared before any new data is written to it.
sheetnames : isopy_array, numpy_array
Data given here will be saved in a sheet with *sheetname* name.
"""
save = True
if type(filename) is openpyxl.Workbook:
workbook = filename
save = False
elif type(filename) is io.BytesIO:
if append and filename.seek(0,2) > 0:
workbook = openpyxl.load_workbook(filename=filename)
else:
# openpyxl truncates BytesIO objects automatically via the use of ZipFile('w')
workbook = openpyxl.Workbook()
workbook.remove(workbook.active)
elif type(filename) is str:
if append and _os.path.exists(filename):
workbook = openpyxl.load_workbook(filename=filename)
else:
workbook = openpyxl.Workbook()
workbook.remove(workbook.active)
sheetname_data = {f'sheet{i+1}': data for i, data in enumerate(sheets)}
sheetname_data.update(sheetnames)
try:
for sheetname, data in sheetname_data.items():
#If appending then delete any preexisting workbooks
if sheetname not in workbook.sheetnames:
worksheet = workbook.create_sheet(sheetname)
elif clear:
workbook.remove(workbook[sheetname])
worksheet = workbook.create_sheet(sheetname)
else:
worksheet = workbook[sheetname]
_write_xlsx(worksheet, data, comments, comment_symbol, keys_in_first, start_at, keyfmt)
#Workbooks must have at least one sheet
if len(workbook.sheetnames) == 0:
workbook.create_sheet('sheet1')
if save: workbook.save(filename)
finally:
if save: workbook.close()
def _write_xlsx(worksheet, data, comments, comment_symbol, keys_in_first, start_at, keyfmt):
rows = data_to_rows(data, keys_in_first, keyfmt)
if type(start_at) is tuple:
start_r, start_c = start_at
else:
start_r, start_c = openpyxl.utils.cell.coordinate_to_tuple(start_at)
if comments is not None:
if not isinstance(comments, (list, tuple)):
comments = [comments]
for comment in comments:
worksheet.cell(start_r, start_c).value = f'{comment_symbol}{comment}'
start_r += 1
for ri, row in enumerate(rows):
for ci, value in enumerate(row):
if str(value) == 'nan':
value = '#N/A'
if value is not None and value != '':
worksheet.cell(start_r + ri, start_c + ci).value = value