"""Tableread package to read a text file table into a Python object."""
import os
from collections import OrderedDict
from itertools import filterfalse
from operator import attrgetter
from typing import Any, Callable, List, Optional, Tuple, Union
import attr
FilePath = str
FileContents = str
[docs]class InvalidFileException(Exception):
"""Exception for improperly formatted files."""
pass
[docs]class FileParsingException(Exception):
"""Exception for parsing failures."""
pass
def _safe_name(name: str):
return name.replace(" ", "_").replace(".", "_").lower()
[docs]def get_specific_attr_matcher(key: str, value: str):
"""
Check if a given attribute value matches the expected value.
Args:
key: the name of the attribute to check
value: the expected string value
Returns:
function: a checker that will accept an object
and return True if the attribute value matches, or False if not.
"""
return lambda x: getattr(x, key).lower() == value.lower()
[docs]def safe_list_index(a_list: list, index_value: int, default: Any = None):
"""
Return the value at the given index, or a default if index does not exist.
Args:
a_list: the list to be indexed
index_value: the desired index position from the list
default: the default value to return if the given index does not exist
Returns:
any: the value at the list index position or the default
"""
if index_value < 0:
return default
try:
return a_list[index_value]
except IndexError:
return default
[docs]class BaseRSTDataObject(object):
"""Base Class for RST Table Data handling."""
column_divider_char = " "
header_divider = "="
# The full set of potential ReStructuredText section markers is sourced from
# http://docutils.sourceforge.net/docs/ref/rst/restructuredtext.html#sections
header_markers = set(r'!"#$%&\'()*+,-./:;<=>?@[]^_`{|}~')
column_default_separator = "="
comment_char = "#"
data_format: Any = None
def __init__(self):
self.data = self.data_format()
def __len__(self):
"""Get the length of the underlying data."""
return len(self.data)
def __iter__(self):
"""Iterate over underlying data."""
for data in self.data:
yield data
def __getitem__(self, key: Union[str, int]):
"""Get the value of the given key from the data."""
return self.data[key]
def _is_divider_row(self, row: str):
if not row.startswith(self.header_divider):
return False
return self.header_divider in row and set(row) <= {
self.column_divider_char,
self.header_divider,
" ",
}
[docs]class SimpleRSTTable(BaseRSTDataObject):
"""Represent a single table from a RST file."""
data_format = list
def __init__(self, divider_row: str, header: str, rows: List[str]):
"""
Build a table from the text string parts.
Args:
divider_row: the row above or below the table headers,
consisting solely of "=" and spaces,
that delineates the column boundaries.
header: the column header row
rows: the rows within the table containing data
"""
super(SimpleRSTTable, self).__init__()
self._header = header
self._rows = rows
self._column_spans = self._build_column_spans(divider_row)
self._row_length = len(divider_row)
self._build_data()
[docs] @classmethod
def from_data(cls, data: List[dict]):
"""Given data, build a SimpleRSTTable object."""
table = cls.__new__(cls)
table.data = list(data)
return table
def _build_column_spans(self, divider_row: str):
# remove any trailing whitespace from the end of the row
divider_row = divider_row.rstrip()
column_spans: List[Tuple[int, Optional[int]]] = []
start = 0
next_break = divider_row.find(self.column_divider_char, start)
while next_break != -1:
column_spans.append((start, next_break))
start = divider_row.find(self.header_divider, next_break)
next_break = divider_row.find(self.column_divider_char, start)
column_spans.append((start, None))
return column_spans
def _stop_checker(self, row: str):
return self._is_divider_row(row)
def _row_splitter(self, row: str):
if not self._column_spans:
raise FileParsingException("Column spans not defined!")
# first, pad the row with spaces in case end columns are left empty
row = "{row:{length}}".format(row=row, length=self._row_length)
# then, find the columns in the row
columns = []
for (col_start, col_stop) in self._column_spans:
column = row[col_start:col_stop]
columns.append(column.strip().replace("..", ""))
return columns
def _set_header_names_and_defaults(self, fields: List[str]):
name_sets = [x.split(self.column_default_separator, 1) for x in fields]
self.fields = [_safe_name(x[0].strip()) for x in name_sets]
self.defaults = [x[1].strip() if len(x) > 1 else "" for x in name_sets]
def _build_data(self):
self._set_header_names_and_defaults(self._row_splitter(self._header))
row_class = attr.make_class("Row", self.fields, hash=True)
for row in self._rows:
if self._stop_checker(row):
break
if "\t" in row:
raise TabError("Tabs are not supported in tables - use spaces only!")
row = row.split(" {} ".format(self.comment_char))[0]
if row.count(self.column_divider_char) or len(self._column_spans) == 1:
row = self._row_splitter(row)
if len(row) != len(self.fields):
message = "Row '{}' does not match field list '{}' length."
raise InvalidFileException(message.format(row, self.fields))
row_data = (
value if value else default
for default, value in zip(self.defaults, row)
)
self.data.append(row_class(*row_data))
def _filter_data(
self, data: List[dict], filter_kwargs: dict, filter_func: Callable
):
filters = [
v if callable(v) else get_specific_attr_matcher(k, v)
for k, v in filter_kwargs.items()
]
data = filter_func(lambda x: all(f(x) for f in filters), data)
return self.__class__.from_data(data)
[docs] def matches_all(self, **kwargs):
"""
Filter data for a positive match to conditions.
Given a set of key/value filters,
returns a new TableRead object with the filtered data,
that can be iterated over.
Kwarg values may be a simple value (str, int)
or a function that returns a boolean.
Note: When filtering both keys and values are **not** case sensitive.
"""
return self._filter_data(self.data, kwargs, filter)
[docs] def exclude_by(self, **kwargs):
"""
Filter data to exclude items matching conditions.
Given a set of key/value filters,
returns a new TableRead object without the matching data,
that can be iterated over.
Kwarg values may be a simple value (str, int)
or a function that returns a boolean.
Note: When filtering both keys and values are **not** case sensitive.
"""
return self._filter_data(self.data, kwargs, filterfalse)
[docs] def get_fields(self, *fields: str):
"""
Get only specified fields from data.
Given a set of fields, returns a list of those field values from each entry.
A single field will return a list of values,
Multiple fields will return a list of tuples of values.
"""
return list(map(attrgetter(*fields), self.data))
[docs]class SimpleRSTReader(BaseRSTDataObject):
"""Represent all tables found in a RST file."""
data_format = OrderedDict
def __init__(self, rst_source: Union[FilePath, FileContents]):
"""
Determine from where to parse RST content and then parse it.
Args:
rst_source: The source of the RST content to parse.
This can either be a file path with a ``.rst`` extension,
or a string containing the RST content.
"""
super(SimpleRSTReader, self).__init__()
rst_string = rst_source
if rst_source.lower().endswith(".rst"):
rst_string = self._read_file(rst_source)
self._parse(rst_string)
if not self.data:
raise InvalidFileException("No tables could be parsed from the RST source.")
@staticmethod
def _read_file(file_path: FilePath):
if not os.path.exists(file_path):
raise FileNotFoundError("File not found: {}".format(file_path))
with open(file_path, "r") as rst_fo:
return rst_fo.read()
@property
def first(self):
"""Return the first table found in the document."""
return list(self.data.values())[0]
def _is_header_underline(self, row: str):
return any((set(row) == set(x) for x in self.header_markers))
def _name_if_header(self, four_rows: List[str]):
above, header, below, tail = four_rows
# Row below potential section header must be an underline row
if not self._is_header_underline(below):
return None
# Row above should be an matching overline or empty
if above and not above == below:
return None
# the line below the underline should be empty
# (this condition ensures we don't take a table header as a section name)
if tail:
return None
return header
def _table_name(self, section_header: Optional[str]):
section_header = section_header or "Default"
if section_header not in self.data.keys():
return section_header
name_number = 2
while True:
name = "{}_{}".format(section_header, name_number)
if name not in self.data.keys():
return name
name_number += 1
def _parse(self, rst_string: FileContents):
text_lines = rst_string.split("\n")
section_header_cursor = None
i = 0
while i < len(text_lines) - 1:
sliding_window = [
safe_list_index(text_lines, idx, default="")
for idx in range(i - 1, i + 3)
]
header_check = self._name_if_header(sliding_window)
if header_check:
section_header_cursor = header_check
# skip past the section header AND the underline row
i += 2
continue
if self._is_divider_row(text_lines[i]):
header, rows = self._get_header_and_rows(text_lines[i:])
table_name = self._table_name(section_header_cursor)
self.data[table_name] = SimpleRSTTable(text_lines[i], header, rows)
# The extra 4 rows 'skipped' are for the 3 divider rows and the header
i += len(rows) + 4
i += 1
def _get_header_and_rows(self, text_lines: List[str]):
header, rows = None, None
# find the header
for i in range(len(text_lines)):
if self._is_divider_row(text_lines[i]):
if text_lines[i] != text_lines[i + 2]:
raise InvalidFileException("Column divider rows do not match!")
header, rows = text_lines[i + 1], text_lines[i + 3 :]
break
# truncate remaining rows to just table contents
if rows:
for i in range(len(rows)):
if self._is_divider_row(rows[i]):
rows = rows[:i]
break
else:
raise InvalidFileException("Expected table rows could not be found!")
return header, rows
@property
def tables(self):
"""Get the list of table names found in the document."""
return list(self.data.keys())