Skip to content

Operations

Operations are functions that operate on either a list of the examples or a single example. If the function operates on a single example, Recon will take care of applying it to all examples in a dataset.

The following operations are built into Recon

Error

... full list of operations to come

Operation

Operation class that takes care of calling and reporting the results of an operation on a Dataset

Source code in recon/operations.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
class Operation:
    """Operation class that takes care of calling and reporting
    the results of an operation on a Dataset"""

    def __init__(
        self,
        name: str,
        pre: List[PreProcessor],
        op: OperationProtocol,
        handles_tokens: bool,
        augmentation: bool,
    ):
        """Initialize an Operation instance

        Args:
            name (str): Name of operation
            pre (List[PreProcessor]): List of preprocessors to run
            op (Op): Operation callable
        """
        self.name = name
        self.pre = pre
        self.op = op
        self.handles_tokens = handles_tokens
        self.augmentation = augmentation

    def __call__(
        self,
        dataset: "Dataset",
        *args: Any,
        verbose: bool = False,
        initial_state: Optional[OperationState] = None,
        **kwargs: Any,
    ) -> OperationResult:
        """Runs op on a dataset and records the results

        Args:
            dataset (Dataset): Dataset to operate on

        Raises:
            ValueError: if track_example is called in the op with no data

        Returns:
            OperationResult: Container holding new data and the state of the Operation
        """
        if not initial_state:
            initial_state = OperationState(name=self.name)
        state = initial_state.model_copy(deep=True)

        if state.status == OperationStatus.NOT_STARTED:
            state.status = OperationStatus.IN_PROGRESS

        state.args = args
        state.kwargs = kwargs

        def track_add_example(new_example: Example) -> None:
            state.transformations.append(
                Transformation(
                    example=hash(new_example), type=TransformationType.EXAMPLE_ADDED
                )
            )
            dataset.example_store.add(new_example)

        def track_remove_example(orig_example_hash: int) -> None:
            state.transformations.append(
                Transformation(
                    prev_example=orig_example_hash,
                    type=TransformationType.EXAMPLE_REMOVED,
                )
            )

        def track_change_example(orig_example_hash: int, new_example: Example) -> None:
            state.transformations.append(
                Transformation(
                    prev_example=orig_example_hash,
                    example=hash(new_example),
                    type=TransformationType.EXAMPLE_CHANGED,
                )
            )
            dataset.example_store.add(new_example)

        has_tokens = False
        for e in dataset.data:
            if e.tokens or any([(s.token_start or s.token_end) for s in e.spans]):
                has_tokens = True
                break

        if has_tokens and not self.handles_tokens:
            warnings.warn(
                # fmt: off
                "This dataset seems to have preset tokens. "
                f"Operation: {self.name} is not currently capable of "
                "handling tokens and you will "
                "need to reset tokenization after this operation. "
                "Applying the `recon.add_tokens.v1` operation after this "
                "operation is complete will get you back to a clean state."
                # fmt: on
            )
            state.status = OperationStatus.NEEDS_TOKENIZATION

        sig = inspect.signature(self.op)
        tmp_example = dataset.data[0].model_copy(deep=True)
        bound_args = sig.bind_partial(tmp_example, *state.args, **state.kwargs)
        bound_args.apply_defaults()
        del bound_args.arguments["example"]

        new_data = []
        with tqdm(total=len(dataset), disable=(not verbose)) as pbar:
            it = op_iter(dataset.data, self.pre, verbose=verbose)
            for orig_example_hash, example, preprocessed_outputs in it:
                if "preprocessed_outputs" in sig.parameters:
                    bound_args.arguments["preprocessed_outputs"] = preprocessed_outputs
                res = self.op(example, *bound_args.args, **bound_args.kwargs)
                if res is None:
                    track_remove_example(orig_example_hash)
                elif isinstance(res, list):
                    old_example_present = False
                    for new_example in res:
                        new_data.append(new_example)
                        if hash(new_example) == orig_example_hash:
                            old_example_present = True
                        else:
                            track_add_example(new_example.model_copy(deep=True))
                    if not old_example_present:
                        track_remove_example(orig_example_hash)
                else:
                    assert isinstance(res, Example)
                    new_data.append(res)
                    if hash(res) != orig_example_hash:
                        track_change_example(orig_example_hash, res)

                pbar.update(1)

        transformation_counts = Counter([t.type for t in state.transformations])
        state.examples_added = transformation_counts[TransformationType.EXAMPLE_ADDED]
        state.examples_removed = transformation_counts[
            TransformationType.EXAMPLE_REMOVED
        ]
        state.examples_changed = transformation_counts[
            TransformationType.EXAMPLE_CHANGED
        ]
        state.status = OperationStatus.COMPLETED
        return OperationResult(data=new_data, state=state)

    def register(self) -> None:
        registry.operations.register(self.name)(self)

__call__(dataset, *args, verbose=False, initial_state=None, **kwargs)

Runs op on a dataset and records the results

Parameters:

Name Type Description Default
dataset Dataset

Dataset to operate on

required

Raises:

Type Description
ValueError

if track_example is called in the op with no data

Returns:

Name Type Description
OperationResult OperationResult

Container holding new data and the state of the Operation

Source code in recon/operations.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def __call__(
    self,
    dataset: "Dataset",
    *args: Any,
    verbose: bool = False,
    initial_state: Optional[OperationState] = None,
    **kwargs: Any,
) -> OperationResult:
    """Runs op on a dataset and records the results

    Args:
        dataset (Dataset): Dataset to operate on

    Raises:
        ValueError: if track_example is called in the op with no data

    Returns:
        OperationResult: Container holding new data and the state of the Operation
    """
    if not initial_state:
        initial_state = OperationState(name=self.name)
    state = initial_state.model_copy(deep=True)

    if state.status == OperationStatus.NOT_STARTED:
        state.status = OperationStatus.IN_PROGRESS

    state.args = args
    state.kwargs = kwargs

    def track_add_example(new_example: Example) -> None:
        state.transformations.append(
            Transformation(
                example=hash(new_example), type=TransformationType.EXAMPLE_ADDED
            )
        )
        dataset.example_store.add(new_example)

    def track_remove_example(orig_example_hash: int) -> None:
        state.transformations.append(
            Transformation(
                prev_example=orig_example_hash,
                type=TransformationType.EXAMPLE_REMOVED,
            )
        )

    def track_change_example(orig_example_hash: int, new_example: Example) -> None:
        state.transformations.append(
            Transformation(
                prev_example=orig_example_hash,
                example=hash(new_example),
                type=TransformationType.EXAMPLE_CHANGED,
            )
        )
        dataset.example_store.add(new_example)

    has_tokens = False
    for e in dataset.data:
        if e.tokens or any([(s.token_start or s.token_end) for s in e.spans]):
            has_tokens = True
            break

    if has_tokens and not self.handles_tokens:
        warnings.warn(
            # fmt: off
            "This dataset seems to have preset tokens. "
            f"Operation: {self.name} is not currently capable of "
            "handling tokens and you will "
            "need to reset tokenization after this operation. "
            "Applying the `recon.add_tokens.v1` operation after this "
            "operation is complete will get you back to a clean state."
            # fmt: on
        )
        state.status = OperationStatus.NEEDS_TOKENIZATION

    sig = inspect.signature(self.op)
    tmp_example = dataset.data[0].model_copy(deep=True)
    bound_args = sig.bind_partial(tmp_example, *state.args, **state.kwargs)
    bound_args.apply_defaults()
    del bound_args.arguments["example"]

    new_data = []
    with tqdm(total=len(dataset), disable=(not verbose)) as pbar:
        it = op_iter(dataset.data, self.pre, verbose=verbose)
        for orig_example_hash, example, preprocessed_outputs in it:
            if "preprocessed_outputs" in sig.parameters:
                bound_args.arguments["preprocessed_outputs"] = preprocessed_outputs
            res = self.op(example, *bound_args.args, **bound_args.kwargs)
            if res is None:
                track_remove_example(orig_example_hash)
            elif isinstance(res, list):
                old_example_present = False
                for new_example in res:
                    new_data.append(new_example)
                    if hash(new_example) == orig_example_hash:
                        old_example_present = True
                    else:
                        track_add_example(new_example.model_copy(deep=True))
                if not old_example_present:
                    track_remove_example(orig_example_hash)
            else:
                assert isinstance(res, Example)
                new_data.append(res)
                if hash(res) != orig_example_hash:
                    track_change_example(orig_example_hash, res)

            pbar.update(1)

    transformation_counts = Counter([t.type for t in state.transformations])
    state.examples_added = transformation_counts[TransformationType.EXAMPLE_ADDED]
    state.examples_removed = transformation_counts[
        TransformationType.EXAMPLE_REMOVED
    ]
    state.examples_changed = transformation_counts[
        TransformationType.EXAMPLE_CHANGED
    ]
    state.status = OperationStatus.COMPLETED
    return OperationResult(data=new_data, state=state)

__init__(name, pre, op, handles_tokens, augmentation)

Initialize an Operation instance

Parameters:

Name Type Description Default
name str

Name of operation

required
pre List[PreProcessor]

List of preprocessors to run

required
op Op

Operation callable

required
Source code in recon/operations.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def __init__(
    self,
    name: str,
    pre: List[PreProcessor],
    op: OperationProtocol,
    handles_tokens: bool,
    augmentation: bool,
):
    """Initialize an Operation instance

    Args:
        name (str): Name of operation
        pre (List[PreProcessor]): List of preprocessors to run
        op (Op): Operation callable
    """
    self.name = name
    self.pre = pre
    self.op = op
    self.handles_tokens = handles_tokens
    self.augmentation = augmentation

operation

Decorator for a Recon Operation. An Operation is python function that Recon uses will map over each example in a dataset, tracking changes made to examples by hash so dataset changes can back. An operation has 1 required positional argument called "example" with the "recon.types.Example" type. Any other arguments are allowed and can be provided by passing them to Dataset.apply_

Example operation:

@operation("my_custom_operation")
def rename_labels(example: Example, *, my_kwarg1: str, my_kwarg2: str)
Source code in recon/operations.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class operation:
    """Decorator for a Recon Operation. An Operation is python function that Recon
    uses will map over each example in a dataset, tracking changes made to examples
    by hash so dataset changes can back.
    An operation has 1 required positional argument called "example" with the
    "recon.types.Example" type.
    Any other arguments are allowed and can be provided by
    passing them to `Dataset.apply_`

    Example operation:

    ```
    @operation("my_custom_operation")
    def rename_labels(example: Example, *, my_kwarg1: str, my_kwarg2: str)
    ```
    """

    def __init__(
        self,
        name: str,
        *,
        pre: List[Union[str, PreProcessor]] = [],
        handles_tokens: bool = True,
        factory: bool = False,
        augmentation: bool = False,
    ):
        """Decorate an operation that makes some changes to a dataset.

        Args:
            name (str): Operation name.
            pre (Union[List[str], List[PreProcessor]]): List of preprocessors to run
        """
        self.name = name
        self.pre = pre
        self.handles_tokens = handles_tokens
        self.factory = factory
        self.augmentation = augmentation

    def __call__(self, op: OperationProtocol) -> OperationProtocol:
        """Decorator for an operation.
        The first arg to the op callable needs to be a example.
        Recon will take care of applying it to a full Dataset

        Args:
            op (Op): First arg is the function to decorate

        Returns:
            Op: The original function
        """
        pre: List[PreProcessor] = []

        for preprocessor in self.pre:
            if isinstance(preprocessor, str):
                preprocessor = pre_registry.preprocessors.get(preprocessor)
            assert isinstance(preprocessor, PreProcessor)
            pre.append(preprocessor)

        registry.operations.register(self.name)(
            Operation(
                self.name, pre, op, self.handles_tokens, augmentation=self.augmentation
            )
        )
        return op

__call__(op)

Decorator for an operation. The first arg to the op callable needs to be a example. Recon will take care of applying it to a full Dataset

Parameters:

Name Type Description Default
op Op

First arg is the function to decorate

required

Returns:

Name Type Description
Op OperationProtocol

The original function

Source code in recon/operations.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def __call__(self, op: OperationProtocol) -> OperationProtocol:
    """Decorator for an operation.
    The first arg to the op callable needs to be a example.
    Recon will take care of applying it to a full Dataset

    Args:
        op (Op): First arg is the function to decorate

    Returns:
        Op: The original function
    """
    pre: List[PreProcessor] = []

    for preprocessor in self.pre:
        if isinstance(preprocessor, str):
            preprocessor = pre_registry.preprocessors.get(preprocessor)
        assert isinstance(preprocessor, PreProcessor)
        pre.append(preprocessor)

    registry.operations.register(self.name)(
        Operation(
            self.name, pre, op, self.handles_tokens, augmentation=self.augmentation
        )
    )
    return op

__init__(name, *, pre=[], handles_tokens=True, factory=False, augmentation=False)

Decorate an operation that makes some changes to a dataset.

Parameters:

Name Type Description Default
name str

Operation name.

required
pre Union[List[str], List[PreProcessor]]

List of preprocessors to run

[]
Source code in recon/operations.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def __init__(
    self,
    name: str,
    *,
    pre: List[Union[str, PreProcessor]] = [],
    handles_tokens: bool = True,
    factory: bool = False,
    augmentation: bool = False,
):
    """Decorate an operation that makes some changes to a dataset.

    Args:
        name (str): Operation name.
        pre (Union[List[str], List[PreProcessor]]): List of preprocessors to run
    """
    self.name = name
    self.pre = pre
    self.handles_tokens = handles_tokens
    self.factory = factory
    self.augmentation = augmentation

op_iter(data, pre, verbose=True)

Iterate over list of examples for an operation yielding tuples of (example hash, example)

Parameters:

Name Type Description Default
data List[Example]

List of examples to iterate

required
pre List[PreProcessor]

List of preprocessors to run

required
verbose bool

Show verbose output.

True

Yields:

Type Description
int

Iterator[Tuple[int, Example]]: Tuples of (example hash, example)

Source code in recon/operations.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def op_iter(
    data: List[Example], pre: List[PreProcessor], verbose: bool = True
) -> Iterator[Tuple[int, Example, Dict[str, Any]]]:
    """Iterate over list of examples for an operation
    yielding tuples of (example hash, example)

    Args:
        data (List[Example]): List of examples to iterate
        pre (List[PreProcessor]): List of preprocessors to run
        verbose (bool, optional): Show verbose output.

    Yields:
        Iterator[Tuple[int, Example]]: Tuples of (example hash, example)
    """
    msg = Printer(no_print=not verbose, hide_animation=not verbose)
    preprocessed_outputs: Dict[Example, Dict[str, Any]] = defaultdict(dict)
    for processor in pre:
        msg.info(f"\t=> Running preprocessor {processor.name}")
        processor_outputs = processor(data)
        for example, output in tqdm(
            zip(data, processor_outputs),
            total=len(data),
            disable=(not verbose),
            leave=False,
        ):
            preprocessed_outputs[example][processor.name] = output
    for example in data:
        yield hash(example), example, preprocessed_outputs[example]