|
1 | 1 | # emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*-
|
2 | 2 | # vi: set ft=python sts=4 ts=4 sw=4 et:
|
3 |
| -"""Tests for workflow callbacks |
4 |
| -""" |
| 3 | +"""Tests for workflow callbacks.""" |
| 4 | +from pathlib import Path |
5 | 5 | from time import sleep
|
| 6 | +import json |
6 | 7 | import pytest
|
7 | 8 | import nipype.interfaces.utility as niu
|
8 | 9 | import nipype.pipeline.engine as pe
|
@@ -71,22 +72,22 @@ def test_callback_exception(tmpdir, plugin, stop_on_first_crash):
|
71 | 72 |
|
72 | 73 | @pytest.mark.parametrize("plugin", ["Linear", "MultiProc", "LegacyMultiProc"])
|
73 | 74 | @pytest.mark.skipif(not has_pandas, reason="Test requires pandas")
|
74 |
| -def test_callback_gantt(tmpdir, plugin): |
| 75 | +def test_callback_gantt(tmp_path: Path, plugin: str) -> None: |
75 | 76 | import logging
|
76 | 77 |
|
77 | 78 | from os import path
|
78 | 79 |
|
79 | 80 | from nipype.utils.profiler import log_nodes_cb
|
80 | 81 | from nipype.utils.draw_gantt_chart import generate_gantt_chart
|
81 | 82 |
|
82 |
| -log_filename = path.join(tmpdir, "callback.log") |
| 83 | +log_filename = tmp_path / "callback.log" |
83 | 84 | logger = logging.getLogger("callback")
|
84 | 85 | logger.setLevel(logging.DEBUG)
|
85 | 86 | handler = logging.FileHandler(log_filename)
|
86 | 87 | logger.addHandler(handler)
|
87 | 88 |
|
88 | 89 | # create workflow
|
89 |
| -wf = pe.Workflow(name="test", base_dir=tmpdir.strpath) |
| 90 | +wf = pe.Workflow(name="test", base_dir=str(tmp_path)) |
90 | 91 | f_node = pe.Node(
|
91 | 92 | niu.Function(function=func, input_names=[], output_names=[]), name="f_node"
|
92 | 93 | )
|
@@ -98,7 +99,21 @@ def test_callback_gantt(tmpdir, plugin):
|
98 | 99 | plugin_args["n_procs"] = 8
|
99 | 100 | wf.run(plugin=plugin, plugin_args=plugin_args)
|
100 | 101 |
|
101 |
| -generate_gantt_chart( |
102 |
| -path.join(tmpdir, "callback.log"), 1 if plugin == "Linear" else 8 |
103 |
| -) |
104 |
| -assert path.exists(path.join(tmpdir, "callback.log.html")) |
| 102 | +with open(log_filename, "r") as _f: |
| 103 | +loglines = _f.readlines() |
| 104 | + |
| 105 | +# test missing duration |
| 106 | +first_line = json.loads(loglines[0]) |
| 107 | +if "duration" in first_line: |
| 108 | +del first_line["duration"] |
| 109 | +loglines[0] = f"{json.dumps(first_line)}\n" |
| 110 | + |
| 111 | +# test duplicate timestamp warning |
| 112 | +loglines.append(loglines[-1]) |
| 113 | + |
| 114 | +with open(log_filename, "w") as _f: |
| 115 | +_f.write("".join(loglines)) |
| 116 | + |
| 117 | +with pytest.warns(Warning): |
| 118 | +generate_gantt_chart(str(log_filename), 1 if plugin == "Linear" else 8) |
| 119 | +assert (tmp_path / "callback.log.html").exists() |
0 commit comments