"""
Functions for punching
Copyright (c) 2017 - Eindhoven University of Technology, The Netherlands
This software is made available under the terms of the MIT License.
"""
import copy
import re
from argparse import Namespace
from collections import defaultdict
from typing import Any, Tuple, List, Dict, Set, Optional
from nbformat import NotebookNode
from .notebook_io import cell_lines
TEST = False
[docs]class PunchBaseProcessor:
"""Base class for ``punch`` processors,
using the Template Method pattern.
The base class takes care of parsing the marking structure.
The concrete subclasses provide semantics for marking structure.
Template methods:
* ``punch_nb()``: main entry point; calls ``punch_line()`` and hook methods
* ``pre_cell()``,
* ``post_cell()``, and
* ``post_nb()``
* ``punch_line()``: calls hook methods
* ``punch_begin_marker_line()``,
* ``punch_end_marker_line()``,
* ``punch_inside_line()``, and
* ``punch_outside_line()``
Concrete processors override the following hook methods:
* ``__init__()`` to initialize; must call ``super()`` at begin
* ``pre_cell()``
* ``punch_begin_marker_line()``
* ``punch_end_marker_line()``
* ``punch_inside_line()``
* ``punch_outside_line()``
* ``post_cell()``
* ``post_nb()``
A concrete class is typically instantiated and used to process a notebook in one go:
* ``ConcretePunchProcessor(args, nb, name).punch_nb()``
"""
# TODO consider whether nb should be param of punch_nb(); this will require pre_nb() hook
# TODO consider whether punch_nb() should be named __call__()
def __init__(self, nb: NotebookNode, args: Namespace, name: str):
"""Initialize punch processor for given notebook and options.
Some PunchProcessors may modify args, e.g. add ``source_chads``.
:param nb: notebook to punch
:param args: options
:param name: notebook name used in messages
"""
self.args = args
self.nb = nb
self.name = name
# initialize punch counts
self.freq = defaultdict(int)
# initialize parsing state
self.inside = False # whether inside a hole
self.label = None # label of hole, when inside a hole
self.labels = set() # type: Set[str] # labels encountered so far
self.duplicate_labels = set() # type: Set[str]
# initialize marker line parsing results
self.marker = None
[docs] def pre_cell(self, cell: NotebookNode) -> None:
pass
[docs] def punch_begin_marker_line(self, cell: NotebookNode, line: str) -> None:
"""Process begin marker line."""
pass
[docs] def punch_end_marker_line(self, cell: NotebookNode, line: str) -> None:
"""Process end marker line."""
pass
[docs] def punch_inside_line(self, cell: NotebookNode, line: str) -> None:
"""Process line inside hole."""
pass
[docs] def punch_outside_line(self, cell: NotebookNode, line: str) -> None:
"""Process line outside hole."""
pass
[docs] def handle_error(self, message: str, cell: NotebookNode, ignore: bool = True) -> None:
"""Handle error with marker.
:param message: error message
:param cell: notebook cell with error
:param ignore: whether to ignore the marker line
"""
if self.args.allow_errors:
self.freq['errors'] += 1
print(message)
if self.inside:
self.punch_end_marker_line(cell, "# <<< WARNING: AUTO-GENERATED END MARKER >>>")
self.inside = False
self.label = None
if ignore:
self.marker = None
action = " Assuming correct end marker"
else:
action = " Inserting correct end marker"
else:
action = " Ignoring marker"
self.marker = None
print(action)
else:
raise AssertionError(message)
[docs] def parse_line(self, line: str, cell_type: str) -> None:
"""Parse given line for given cell type, and decide whether it is a marker line.
Set instance variable ``marker`` to dictionary with transition, label, and description.
Sets it to ``None`` if the line is not a marker line.
Sets ``marker['transition']`` to ``None`` if invalid marker line.
.. note:: **Modifies**: ``self.marker``
:param line: marker line
:param cell_type: type of cell containing marker line
"""
self.marker = None # anticipated
if not re.search(self.args.marker_regex[cell_type], line):
return
# recognized as marker line, parse it
# TODO maybe have separate begin/end regex?
m = re.search(self.args.marker_line_parsing_regex[cell_type], line)
# the ``m`` stands for 'match object'
if m:
if self.args.label_regex and not re.search(self.args.label_regex, m.group('label')):
return
self.marker = {
'transition': m.group('transition'),
'label': m.group('label'),
'description': m.group('description'),
}
if TEST:
print('transition: {transition}, label: {label}, description: {description}'
.format(**self.marker))
else: # invalid marker line
self.marker = {'transition': None}
[docs] def punch_line(self, cell: NotebookNode, cell_index: int, line: str, line_index: int) -> None:
"""Punch line.
"""
# if line is a marker line, process it and update state
# processing it, yields whether it is BEGIN or END, its label, and description
# label is needed when filling from source
if TEST:
print(' {:>3}| {}'.format(line_index + 1, line))
self.parse_line(line, cell.cell_type)
if self.marker: # it is a marker line
expected_transitions = sorted(self.args.marker_transitions.values())
expected_transition = self.args.marker_transitions['end' if self.inside else 'begin']
# check marker line validity
if not self.marker['transition']: # parse failed
self.handle_error('Invalid marker line in cell {} on line {}:\n {}'
.format(cell_index, line_index + 1, line),
cell)
elif self.marker['transition'] not in expected_transitions:
self.handle_error('Invalid transition in marker line of cell {} on line {}:\n'
' {}\n Expected {}, but found {transition}.'
.format(cell_index, line_index + 1,
line, expected_transitions, **self.marker),
cell)
elif self.marker['transition'] != expected_transition:
self.handle_error('Unexpected transition in marker line of cell {} on line {}:\n'
' {}\n Expected {}{}, but found {transition}.'
.format(cell_index, line_index + 1,
line, expected_transition,
' [{}]'.format(self.label) if self.inside else '', **self.marker),
cell, False)
elif self.inside and self.marker['label'] != self.label:
self.handle_error('Unexpected label in end marker line of cell {} on line {}:\n'
' {}\n Expected [{}], but found [{label}]'
.format(cell_index, line_index + 1, line, self.label, **self.marker),
cell)
if self.marker:
# update state
self.inside = not self.inside
if self.inside: # begin of hole
self.freq['holes'] += 1
self.label = self.marker['label']
if self.args.list:
print(' [{label}] {description}'.format(**self.marker))
# check uniqueness of label
if self.label in self.labels:
# label already appeared earlier
# TODO depending on purpose of hole collection: ignore, warn, error, or abort
# TODO maybe put this in a hook method?
# TODO maybe use warnings module?
if self.args.debug:
print('Duplicate label in cell {} on line {}:\n {}'
.format(cell_index, line_index + 1, line))
self.duplicate_labels.add(self.label)
self.labels.add(self.label)
self.punch_begin_marker_line(cell, line) # hook method
else: # end of hole
self.punch_end_marker_line(cell, line) # hook method
if not self.inside:
self.label = None
else: # not marker line, then it depends on the side
if self.inside:
self.punch_inside_line(cell, line) # hook method
else:
self.punch_outside_line(cell, line) # hook method
[docs] def post_cell(self, cell: NotebookNode) -> None:
pass
[docs] def post_nb(self) -> Any:
pass
[docs] def punch_nb(self) -> Any:
"""Template method; main entry point.
"""
# __init__() took care of initialization
if self.args.list:
print('Listing labels and descriptions for "{}"'.format(self.name))
# traverse cells of notebook
for cell_index, cell in enumerate(self.nb.cells):
if TEST:
ct = cell.cell_type
print('===== cell {}: {} ====='.format(cell_index, ct))
print('marker regex: {}'.format(self.args.marker_regex[ct]))
self.pre_cell(cell) # hook method
# process all lines in cell.source
for line_index, line in enumerate(cell_lines(cell)):
self.punch_line(cell, cell_index, line, line_index)
self.post_cell(cell) # hook method
# check for missing end marker
if self.inside:
message = 'Missing end marker line for last hole labeled [{}].'.format(self.label)
if self.args.allow_errors:
self.freq['errors'] += 1
print(message)
print(" Assuming correct end marker.")
# TODO call hook method that finishes filling in case of PunchedNotebookProcessor
# Without that, the filling may be incomplete
else:
raise AssertionError(message)
if self.duplicate_labels:
if self.args.verbose:
print('Labels with duplicates: {}'.format(sorted(self.duplicate_labels)))
self.freq['labels with duplicates'] = len(self.duplicate_labels)
return self.post_nb() # hook method
[docs]class PunchedNotebookProcessor(PunchBaseProcessor):
"""Create punched notebook.
self.nb_punched = result notebook under construction
self.cell_punched = next cell under construction
self.source_chad = chad being inserted in current hole
"""
def __init__(self, nb: NotebookNode, args: Namespace, name: str):
super().__init__(nb, args, name)
# initialize result notebook
self.nb_punched = copy.deepcopy(nb)
self.nb_punched.cells = [] # to be appended to
self.source_chad = [] # type: List[NotebookNode] # chad being inserted to fill current hole
self.cell_punched = None # type: Optional[NotebookNode] # next punched cell being collected
[docs] def pre_cell(self, cell: NotebookNode):
# create corresponding cell for punched notebook
# we preserve everything, except possibly the source lines (this could result in empty cells)
self.cell_punched = copy.deepcopy(cell)
self.cell_punched.source = [] # List[str] to be converted to single string in post_cell()
[docs] def punch_begin_marker_line(self, cell: NotebookNode, line: str):
# copy marker line to result notebook, if desired
if self.args.keep_marker_lines:
self.cell_punched.source.append(line)
# take care of filling, if desired
if self.args.fill:
# inject filling
if self.args.punch_source:
# use chads from source notebook, available in self.args.source_chads
if self.label in self.args.source_chads: # corresponding source chad is present
# copy the source chad,
self.source_chad = self.args.source_chads[self.label][:] # copy TODO deep?
# it will be injected possibly in multiple (up to three) steps
# (1) here, (2) in post_cell(), (3) in punch_end_marker_line()
if self.source_chad[0].cell_type == cell.cell_type:
# if equal begin cell types, then insert source from first cell here
# handle first cell of source chad here (N.B. source chad has first cell)
if TEST:
print('Copying from begin chad cell to begin hole cell: {}'
.format(repr(self.source_chad[0].source)))
self.cell_punched.source.append(self.source_chad[0].source)
del self.source_chad[0] # remove parts of source chad that have been handled
else: # begin cell types differ
# insert below this cell (handled in post_cell() and/or punch_end_marker_line())
pass
else: # corresponding source chad is not present
# warn that replacement is not present
# TODO find better way of warning
if self.args.debug:
print('No source chad for label [{}] (not filling this hole).'
.format(self.label))
self.source_chad = []
self.freq['unfilled holes'] += 1
else:
# use fixed content, substituting label and description (only useful with source option)
self.cell_punched.source.append(self.args.filling[cell.cell_type].format(**self.marker))
[docs] def punch_end_marker_line(self, cell: NotebookNode, line: str):
# check if source chad still has material; if so, handle it; could concern multiple cells
if self.source_chad:
# need to split if still in same cell as begin marker line
if self.cell_punched.source: # already collected some material preceding this hole
# append that as cell
self.cell_punched.source = '\n'.join(self.cell_punched.source)
self.nb_punched.cells.append(self.cell_punched)
# reset
self.cell_punched = copy.deepcopy(cell)
self.cell_punched.source = [] # to be converted to single string in post_cell()
# prepend material in source chad, except last if last has same cell type as this cell
if TEST:
print('Copying all-but-last cell from chad cell into hole: {} cells'
.format(len(self.source_chad[:-1])))
self.nb_punched.cells.extend(self.source_chad[:-1])
del self.source_chad[:-1]
# check if receiver and source end cell have same type
if self.args.source_chads[self.label][-1].cell_type == cell.cell_type:
# copy any remaining content
if TEST:
print('Copying from end chad cell to end hole cell: {}'
.format(repr(self.source_chad[-1].source)))
self.cell_punched.source.append(self.source_chad[-1].source)
else: # receiving and source end cell types differ; put in a separate cell preceding this one
if TEST:
print('Copying last cell from chad cell into hole: {} cells'
.format(len(self.source_chad[:-1])))
self.nb_punched.cells.append(self.source_chad[-1])
del self.source_chad[-1] # now it is empty
# copy marker line to result notebook, if desired
if self.args.keep_marker_lines:
self.cell_punched.source.append(line)
[docs] def punch_outside_line(self, cell: NotebookNode, line: str):
# copy line to result notebook
self.cell_punched.source.append(line)
[docs] def post_cell(self, cell: NotebookNode):
# join source list into single string
self.cell_punched.source = '\n'.join(self.cell_punched.source)
if self.cell_punched.source or not self.inside or not self.args.punch_source:
# transfer non-empty collected cell to result notebook
self.nb_punched.cells.append(self.cell_punched)
self.cell_punched = None
# TODO not satisfactory when not copying marker lines; empty begin cell is suppressed, empty end cell not
[docs] def post_nb(self):
unused_chads = sorted(set(self.args.source_chads.keys()).difference(self.labels))
if unused_chads:
self.freq['unused source chads'] = len(unused_chads)
if self.args.verbose:
print('Labels of unused source chads: {}'.format(unused_chads))
unfilled_labels = sorted(self.labels.difference(self.args.source_chads.keys()))
if self.args.punch_source and unfilled_labels:
if self.args.verbose:
print('Labels of unfilled holes: {}'.format(unfilled_labels))
return self.freq, self.nb_punched
[docs]class ChadsNotebookProcessor(PunchBaseProcessor):
"""Create chads notebook.
self.nb_chads = result notebook under construction
self.cell_chads = next cell under construction, with source as list of strings (to be compacted)
"""
def __init__(self, nb: NotebookNode, args: Namespace, name: str):
super().__init__(nb, args, name)
# initialize result notebook
self.nb_chads = copy.deepcopy(self.nb)
self.nb_chads.cells = [] # it needs its own list object (avoid aliasing)
self.cell_chads = None # type: Optional[NotebookNode]
[docs] def pre_cell(self, cell: NotebookNode):
# create corresponding cell for chads notebook
# we preserve everything, except possibly the source lines (this could result in empty cells)
self.cell_chads = copy.deepcopy(cell)
self.cell_chads.source = [] # to be converted to single string in post_cell()
[docs] def punch_begin_marker_line(self, cell: NotebookNode, line: str):
# copy marker line to result notebook, if desired
if self.args.keep_marker_lines:
self.cell_chads.source.append(line)
[docs] def punch_end_marker_line(self, cell: NotebookNode, line: str):
# copy marker line to result notebook, if desired
if self.args.keep_marker_lines:
self.cell_chads.source.append(line)
[docs] def punch_inside_line(self, cell: NotebookNode, line: str):
# copy line to result notebook
self.cell_chads.source.append(line)
[docs] def post_cell(self, cell: NotebookNode):
# join source list into single string
self.cell_chads.source = '\n'.join(self.cell_chads.source)
# transfer collected cell to result notebook
self.nb_chads.cells.append(self.cell_chads)
[docs] def post_nb(self):
return self.freq, self.nb_chads
[docs]class SourceChadsProcessor(PunchBaseProcessor):
"""Create chads for later use as source.
Result is delivered in ``args.source_chads``,
which is a dictionary that maps hole labels to hole content as list of cells.
self.source_chads = dictionary that maps label to list of cells
self.cell_source_chads = next cell under construction, with source as list of strings (to be compacted)
"""
def __init__(self, nb: NotebookNode, args: Namespace, name: str):
super().__init__(nb, args, name)
# initialize punched chads (which could serve as source chads in another punch)
# self.args.source_chads = {} # dictionary of label-chad pairs # commented out to allow multiple source files
self.cell_source_chads = None
[docs] def pre_cell(self, cell: NotebookNode):
# start collecting hole content for args.source_chads
if self.inside:
self.cell_source_chads = copy.deepcopy(cell)
self.cell_source_chads.source = [] # list of strings, to be consolidated into single string
[docs] def punch_begin_marker_line(self, cell: NotebookNode, line: str):
# start collecting hole content for args.source_chads
self.cell_source_chads = copy.deepcopy(cell)
self.cell_source_chads.source = [] # list of strings, to be consolidated into single string
# we don't copy marker lines into args.source_chads
self.args.source_chads[self.label] = [] # overwrites previous chad, if label not unique
[docs] def punch_end_marker_line(self, cell: NotebookNode, line: str):
# finish collecting hole content for args.source_chads
# we don't copy marker lines into args.source_chads
# join source list into single string
self.cell_source_chads.source = '\n'.join(self.cell_source_chads.source)
# transfer collected cell to result
self.args.source_chads[self.label].append(self.cell_source_chads)
self.cell_source_chads = None
[docs] def punch_inside_line(self, cell: NotebookNode, line: str):
# copy line to result
self.cell_source_chads.source.append(line)
[docs] def post_cell(self, cell: NotebookNode):
if self.inside:
# finish collecting hole content for args.source_chads
# join source list into single string
self.cell_source_chads.source = '\n'.join(self.cell_source_chads.source)
# transfer collected cell to result
self.args.source_chads[self.label].append(self.cell_source_chads)
[docs] def post_nb(self):
return self.freq