Skip to content

QPI Driver API Reference

qpi_driver

Executor

Bases: ABC

Source code in qpi-driver/qpi_driver/executors/base.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class Executor(ABC):
    def __init__(self, name: str = "executor", **kwargs: Any) -> None:
        self.name = name

    @abstractmethod
    def execute(self, payload: JobPayload) -> xr.Dataset:
        """Execute the quantum circuit/instructions payload.

        Args:
            payload: JobPayload object containing circuit QASM, qubit count, shots, etc.

        Returns:
            xr.Dataset: Dataset mimicking the raw measurement counts and frequencies.
        """
        pass

    @abstractmethod
    def process_result(self, dataset: xr.Dataset, job_id: str) -> dict:
        """Convert a raw xr.Dataset from execute() into a Qiskit-compatible result dict.

        Each executor knows its own data format and how to:
        - Perform state discrimination (if meas_level=2)
        - Return IQ memory (if meas_level=1)
        - Return raw traces (if meas_level=0)
        - Handle meas_return averaging

        Args:
            dataset: The xr.Dataset returned by execute().
            job_id: The unique ID of the quantum job.

        Returns:
            dict: Qiskit-compatible result dict with keys like 'counts', 'memory',
                  'shots', 'backend', 'success', etc.
        """
        pass

    def close(self) -> None:
        """Release resources."""
        pass

close()

Release resources.

Source code in qpi-driver/qpi_driver/executors/base.py
155
156
157
def close(self) -> None:
    """Release resources."""
    pass

execute(payload) abstractmethod

Execute the quantum circuit/instructions payload.

Parameters:

Name Type Description Default
payload JobPayload

JobPayload object containing circuit QASM, qubit count, shots, etc.

required

Returns:

Type Description
Dataset

xr.Dataset: Dataset mimicking the raw measurement counts and frequencies.

Source code in qpi-driver/qpi_driver/executors/base.py
123
124
125
126
127
128
129
130
131
132
133
@abstractmethod
def execute(self, payload: JobPayload) -> xr.Dataset:
    """Execute the quantum circuit/instructions payload.

    Args:
        payload: JobPayload object containing circuit QASM, qubit count, shots, etc.

    Returns:
        xr.Dataset: Dataset mimicking the raw measurement counts and frequencies.
    """
    pass

process_result(dataset, job_id) abstractmethod

Convert a raw xr.Dataset from execute() into a Qiskit-compatible result dict.

Each executor knows its own data format and how to: - Perform state discrimination (if meas_level=2) - Return IQ memory (if meas_level=1) - Return raw traces (if meas_level=0) - Handle meas_return averaging

Parameters:

Name Type Description Default
dataset Dataset

The xr.Dataset returned by execute().

required
job_id str

The unique ID of the quantum job.

required

Returns:

Name Type Description
dict dict

Qiskit-compatible result dict with keys like 'counts', 'memory', 'shots', 'backend', 'success', etc.

Source code in qpi-driver/qpi_driver/executors/base.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
@abstractmethod
def process_result(self, dataset: xr.Dataset, job_id: str) -> dict:
    """Convert a raw xr.Dataset from execute() into a Qiskit-compatible result dict.

    Each executor knows its own data format and how to:
    - Perform state discrimination (if meas_level=2)
    - Return IQ memory (if meas_level=1)
    - Return raw traces (if meas_level=0)
    - Handle meas_return averaging

    Args:
        dataset: The xr.Dataset returned by execute().
        job_id: The unique ID of the quantum job.

    Returns:
        dict: Qiskit-compatible result dict with keys like 'counts', 'memory',
              'shots', 'backend', 'success', etc.
    """
    pass

MockExecutor

Bases: Executor

Source code in qpi-driver/qpi_driver/executors/mock.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class MockExecutor(Executor):
    def __init__(self, name: str = "mock", **kwargs: Any):
        super().__init__(name, **kwargs)
        self._simulator = BasicSimulator()

    def execute(self, payload: JobPayload) -> xr.Dataset:
        """Execute quantum circuit simulation using Qiskit BasicSimulator.

        For multi-circuit payloads, results are concatenated along a ``circuit_index``
        dimension.  A single-circuit payload without parameter bindings returns the
        legacy flat format (no ``circuit_index`` dimension) for backward compatibility.

        Args:
            payload: JobPayload specifying shots and circuits.

        Returns:
            xr.Dataset: Dataset containing measured state outcomes.
        """
        sub_datasets: list[xr.Dataset] = []

        for circ in payload.circuits:
            circ_shots = circ.shots if circ.shots is not None else payload.shots
            qasm_str = circ.circuit
            circuit = load_qasm(qasm_str)

            param_sets = circ.parameter_values or [None]
            for param_vals in param_sets:
                bound_circuit = circuit
                if param_vals is not None and circuit.parameters:
                    bound_circuit = circuit.assign_parameters(param_vals)

                t_qc = transpile(bound_circuit, self._simulator)
                result = self._simulator.run(
                    t_qc, shots=circ_shots, memory=True
                ).result()
                memory = result.get_memory(t_qc)
                n_qubits = circuit.num_qubits

                ds = memory_to_dataset(memory, n_qubits, circ_shots, payload.meas_level)
                sub_datasets.append(ds)

        # Backward compatible: single result → flat dataset (no circuit_index)
        if len(sub_datasets) == 1:
            ds = sub_datasets[0]
            ds.attrs.update(
                {
                    "shots": circ_shots,
                    "n_qubits": n_qubits,
                    "backend": self.name,
                    "meas_level": payload.meas_level,
                    "meas_return": payload.meas_return,
                }
            )
            return ds

        # Multiple results → concat along circuit_index
        combined = xr.concat(sub_datasets, dim="circuit_index")
        combined.attrs.update(
            {
                "shots": payload.shots,
                "n_qubits": n_qubits,
                "backend": self.name,
                "meas_level": payload.meas_level,
                "meas_return": payload.meas_return,
            }
        )
        return combined

    def process_result(self, dataset: xr.Dataset, job_id: str) -> dict:
        """Convert the simulator's xr.Dataset into a Qiskit-compatible result dict.

        Supports meas_level 1 (IQ memory) and 2 (classified counts).
        meas_level 0 is not supported for simulators.

        Args:
            dataset: xr.Dataset from execute().
            job_id: Unique job ID.

        Returns:
            dict: Qiskit-compatible result dict.
        """
        meas_level = int(dataset.attrs.get("meas_level", 2))
        meas_return = str(dataset.attrs.get("meas_return", "single"))
        backend = dataset.attrs.get("backend", self.name)

        # Handle multi-circuit datasets
        if "circuit_index" in dataset.dims:
            circuit_results = []
            for ci in range(dataset.sizes["circuit_index"]):
                sub_ds = dataset.isel(circuit_index=ci)
                sub_ds.attrs.update(dataset.attrs)
                circuit_results.append(
                    self._single_dataset_to_result(sub_ds, meas_level, meas_return)
                )
            return build_qiskit_result(circuit_results, job_id, backend)

        # Single circuit
        single = self._single_dataset_to_result(dataset, meas_level, meas_return)
        return build_qiskit_result([single], job_id, backend)

    @staticmethod
    def _single_dataset_to_result(
        dataset: xr.Dataset, meas_level: int, meas_return: str
    ) -> dict:
        """Extract counts or IQ memory from a single-circuit dataset slice."""
        # Identify qubit variables (integer-named data vars)
        qubit_vars = []
        for var in dataset.data_vars:
            try:
                qubit_vars.append(int(var))
            except ValueError:
                pass
        if not qubit_vars:
            return {"raw": str(dataset), "shots": 0}

        qubit_vars.sort()
        q0_key = str(qubit_vars[0])
        shots = int(dataset.attrs.get("shots", len(dataset[q0_key])))

        # meas_level=1: return IQ memory
        if meas_level == 1:
            memory: list[list[list[float]]] = []
            num_samples = len(dataset[q0_key])
            for s in range(num_samples):
                shot_iq = []
                for q_idx in qubit_vars:
                    val = dataset[str(q_idx)].values[s]
                    shot_iq.append([float(val.real), float(val.imag)])
                memory.append(shot_iq)

            if meas_return == "avg" and memory:
                memory = iq_memory_avg(memory, len(qubit_vars))

            return {"memory": memory, "shots": shots}

        # meas_level=2: classified counts
        n_qubits = len(qubit_vars)
        counts_dict: dict[str, int] = {}
        num_samples = len(dataset[q0_key])
        for s in range(num_samples):
            state_chars = []
            for q_idx in reversed(qubit_vars):
                val = dataset[str(q_idx)].values[s]
                state_chars.append("1" if val.real > 0.5 else "0")
            state_str = "".join(state_chars)
            counts_dict[state_str] = counts_dict.get(state_str, 0) + 1

        if num_samples == 1 and shots > 1:
            state_str = list(counts_dict.keys())[0]
            counts_dict = {state_str: shots}

        # Pad with 0 counts for all 2^N states
        states_list = [format(i, f"0{n_qubits}b") for i in range(2**n_qubits)]
        for st in states_list:
            if st not in counts_dict:
                counts_dict[st] = 0

        return {"counts": counts_dict, "shots": shots}

execute(payload)

Execute quantum circuit simulation using Qiskit BasicSimulator.

For multi-circuit payloads, results are concatenated along a circuit_index dimension. A single-circuit payload without parameter bindings returns the legacy flat format (no circuit_index dimension) for backward compatibility.

Parameters:

Name Type Description Default
payload JobPayload

JobPayload specifying shots and circuits.

required

Returns:

Type Description
Dataset

xr.Dataset: Dataset containing measured state outcomes.

Source code in qpi-driver/qpi_driver/executors/mock.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def execute(self, payload: JobPayload) -> xr.Dataset:
    """Execute quantum circuit simulation using Qiskit BasicSimulator.

    For multi-circuit payloads, results are concatenated along a ``circuit_index``
    dimension.  A single-circuit payload without parameter bindings returns the
    legacy flat format (no ``circuit_index`` dimension) for backward compatibility.

    Args:
        payload: JobPayload specifying shots and circuits.

    Returns:
        xr.Dataset: Dataset containing measured state outcomes.
    """
    sub_datasets: list[xr.Dataset] = []

    for circ in payload.circuits:
        circ_shots = circ.shots if circ.shots is not None else payload.shots
        qasm_str = circ.circuit
        circuit = load_qasm(qasm_str)

        param_sets = circ.parameter_values or [None]
        for param_vals in param_sets:
            bound_circuit = circuit
            if param_vals is not None and circuit.parameters:
                bound_circuit = circuit.assign_parameters(param_vals)

            t_qc = transpile(bound_circuit, self._simulator)
            result = self._simulator.run(
                t_qc, shots=circ_shots, memory=True
            ).result()
            memory = result.get_memory(t_qc)
            n_qubits = circuit.num_qubits

            ds = memory_to_dataset(memory, n_qubits, circ_shots, payload.meas_level)
            sub_datasets.append(ds)

    # Backward compatible: single result → flat dataset (no circuit_index)
    if len(sub_datasets) == 1:
        ds = sub_datasets[0]
        ds.attrs.update(
            {
                "shots": circ_shots,
                "n_qubits": n_qubits,
                "backend": self.name,
                "meas_level": payload.meas_level,
                "meas_return": payload.meas_return,
            }
        )
        return ds

    # Multiple results → concat along circuit_index
    combined = xr.concat(sub_datasets, dim="circuit_index")
    combined.attrs.update(
        {
            "shots": payload.shots,
            "n_qubits": n_qubits,
            "backend": self.name,
            "meas_level": payload.meas_level,
            "meas_return": payload.meas_return,
        }
    )
    return combined

process_result(dataset, job_id)

Convert the simulator's xr.Dataset into a Qiskit-compatible result dict.

Supports meas_level 1 (IQ memory) and 2 (classified counts). meas_level 0 is not supported for simulators.

Parameters:

Name Type Description Default
dataset Dataset

xr.Dataset from execute().

required
job_id str

Unique job ID.

required

Returns:

Name Type Description
dict dict

Qiskit-compatible result dict.

Source code in qpi-driver/qpi_driver/executors/mock.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def process_result(self, dataset: xr.Dataset, job_id: str) -> dict:
    """Convert the simulator's xr.Dataset into a Qiskit-compatible result dict.

    Supports meas_level 1 (IQ memory) and 2 (classified counts).
    meas_level 0 is not supported for simulators.

    Args:
        dataset: xr.Dataset from execute().
        job_id: Unique job ID.

    Returns:
        dict: Qiskit-compatible result dict.
    """
    meas_level = int(dataset.attrs.get("meas_level", 2))
    meas_return = str(dataset.attrs.get("meas_return", "single"))
    backend = dataset.attrs.get("backend", self.name)

    # Handle multi-circuit datasets
    if "circuit_index" in dataset.dims:
        circuit_results = []
        for ci in range(dataset.sizes["circuit_index"]):
            sub_ds = dataset.isel(circuit_index=ci)
            sub_ds.attrs.update(dataset.attrs)
            circuit_results.append(
                self._single_dataset_to_result(sub_ds, meas_level, meas_return)
            )
        return build_qiskit_result(circuit_results, job_id, backend)

    # Single circuit
    single = self._single_dataset_to_result(dataset, meas_level, meas_return)
    return build_qiskit_result([single], job_id, backend)

PrestoExecutor

Bases: Executor

Executor subclass for interacting with Presto RF signal generators.

Source code in qpi-driver/qpi_driver/executors/presto.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class PrestoExecutor(Executor):
    """Executor subclass for interacting with Presto RF signal generators."""

    def execute(self, payload: dict) -> xr.Dataset:
        """Execute quantum instructions on Presto hardware.

        Args:
            payload: Dictionary containing circuit execution options.

        Returns:
            xr.Dataset: The control acquisition dataset.

        Raises:
            NotImplementedError: Always raised since the executor is a placeholder.
        """
        raise NotImplementedError("PrestoExecutor is not implemented yet.")

    def process_result(self, dataset: xr.Dataset, job_id: str) -> dict:
        """Convert raw dataset to Qiskit-compatible result dict.

        Args:
            dataset: The xr.Dataset returned by execute().
            job_id: The unique ID of the quantum job.

        Returns:
            dict: Qiskit-compatible result dict.

        Raises:
            NotImplementedError: Always raised since the executor is a placeholder.
        """
        raise NotImplementedError("PrestoExecutor is not implemented yet.")

execute(payload)

Execute quantum instructions on Presto hardware.

Parameters:

Name Type Description Default
payload dict

Dictionary containing circuit execution options.

required

Returns:

Type Description
Dataset

xr.Dataset: The control acquisition dataset.

Raises:

Type Description
NotImplementedError

Always raised since the executor is a placeholder.

Source code in qpi-driver/qpi_driver/executors/presto.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
def execute(self, payload: dict) -> xr.Dataset:
    """Execute quantum instructions on Presto hardware.

    Args:
        payload: Dictionary containing circuit execution options.

    Returns:
        xr.Dataset: The control acquisition dataset.

    Raises:
        NotImplementedError: Always raised since the executor is a placeholder.
    """
    raise NotImplementedError("PrestoExecutor is not implemented yet.")

process_result(dataset, job_id)

Convert raw dataset to Qiskit-compatible result dict.

Parameters:

Name Type Description Default
dataset Dataset

The xr.Dataset returned by execute().

required
job_id str

The unique ID of the quantum job.

required

Returns:

Name Type Description
dict dict

Qiskit-compatible result dict.

Raises:

Type Description
NotImplementedError

Always raised since the executor is a placeholder.

Source code in qpi-driver/qpi_driver/executors/presto.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def process_result(self, dataset: xr.Dataset, job_id: str) -> dict:
    """Convert raw dataset to Qiskit-compatible result dict.

    Args:
        dataset: The xr.Dataset returned by execute().
        job_id: The unique ID of the quantum job.

    Returns:
        dict: Qiskit-compatible result dict.

    Raises:
        NotImplementedError: Always raised since the executor is a placeholder.
    """
    raise NotImplementedError("PrestoExecutor is not implemented yet.")

QbloxExecutor

Bases: Executor

Executor subclass for interacting with Qblox instruments and modules via qblox-scheduler.

Source code in qpi-driver/qpi_driver/executors/qblox.py
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
class QbloxExecutor(Executor):
    """Executor subclass for interacting with Qblox instruments and modules via qblox-scheduler."""

    def __new__(cls, *args, **kwargs):
        if not IS_QBLOX_SCHEDULER_INSTALLED:
            raise ImportError(
                "qblox-scheduler is not installed. Install the [qblox] extra to use QbloxExecutor."
            )
        return super().__new__(cls)

    def __init__(
        self,
        name: str = "qblox",
        quantify_hardware_config: QbloxHardwareCompilationConfig | Path | dict = Path(
            "quantify.hardware.json"
        ),
        quantify_device_config: Path | dict = Path("quantify.device.json"),
        is_dummy: bool = False,
        data_dir: Path = Path("data"),
        acquisition_timeout: int = 10,
        **kwargs: Any,
    ) -> None:
        """Initialize the QbloxExecutor.

        Args:
            name: the name of the executor
            quantify_hardware_config: Hardware-layer configuration dictionary, file path, or config as dict.
            quantify_device_config: Device-layer configuration dictionary, file path or config as dict
            is_dummy: If True, uses a dummy Cluster instrument.
            data_dir: Directory to where data is temporarily stored.
            acquisition_timeout: Timeout in seconds to wait for acquisition.
            **kwargs: Arbitrary keyword arguments passed to the base class.
        """
        super().__init__(name, **kwargs)
        # Clean up any previously registered instruments to avoid name collision errors in QCoDeS
        with suppress(Exception):
            Instrument.close_all()

        self._data_dir = data_dir
        self._is_dummy = is_dummy
        self._acquisition_timeout = acquisition_timeout
        self._hardware_config = _load_quantify_hardware_config(quantify_hardware_config)
        self._device = _load_quantum_device(name=name, config=quantify_device_config)
        self._agent = HardwareAgent(
            hardware_configuration=self._hardware_config,
            quantum_device_configuration=self._device,
            create_dummy_connections=is_dummy,
            output_dir=data_dir,
        )

    @property
    def hardware_config(self) -> QbloxHardwareCompilationConfig:
        return self._hardware_config

    def execute(self, payload: JobPayload) -> xr.Dataset:
        """Execute quantum instructions using the Qblox scheduler.

        Args:
            payload: JobPayload specifying shots, circuits, meas_level, etc.

        Returns:
            xr.Dataset: Raw acquisition dataset.
        """
        # Parse QASM circuit
        circuit = load_qasm(payload.qasm)
        shots = payload.shots
        n_qubits = circuit.num_qubits

        # Determine acquisition protocol and parameters from meas_level and payload overrides
        acq_protocol, acq_kwargs = self._resolve_acq_protocol(payload)

        schedule = _generate_schedule(
            name=payload.id,
            circuit=circuit,
            shots=shots,
            acq_protocol=acq_protocol,
            acq_kwargs=acq_kwargs,
        )

        dataset = self._agent.run(schedule, timeout=self._acquisition_timeout)
        dataset.attrs.update(
            {
                "shots": shots,
                "n_qubits": n_qubits,
                "backend": self.name,
                "meas_level": payload.meas_level,
                "meas_return": payload.meas_return,
                "acq_protocol": acq_protocol,
            }
        )
        if "acq_rotation" in acq_kwargs:
            dataset.attrs["acq_rotation"] = acq_kwargs["acq_rotation"]
        if "acq_threshold" in acq_kwargs:
            dataset.attrs["acq_threshold"] = acq_kwargs["acq_threshold"]
        return dataset

    def _resolve_acq_protocol(self, payload: JobPayload) -> tuple[str, dict]:
        """Determine the qblox-scheduler acquisition protocol for the given meas_level.

        Args:
            payload: JobPayload specifying meas_level, acq_threshold, acq_rotation, etc.

        Returns:
            Tuple of (protocol_name, extra_kwargs_for_Measure).
        """
        if payload.meas_level == 0:
            return "Trace", {}

        if payload.meas_level == 1:
            return "SSBIntegrationComplex", {}

        # meas_level == 2: try ThresholdedAcquisition if device or payload has threshold params
        dev_params = self._get_threshold_params()
        if payload.acq_rotation is not None:
            dev_params["acq_rotation"] = payload.acq_rotation
        if payload.acq_threshold is not None:
            dev_params["acq_threshold"] = payload.acq_threshold

        if "acq_rotation" in dev_params and "acq_threshold" in dev_params:
            return "ThresholdedAcquisition", dev_params

        # Fallback to SSBIntegrationComplex + software discrimination in process_result()
        return "SSBIntegrationComplex", dev_params

    def _get_threshold_params(self) -> dict:
        """Extract acq_threshold and acq_rotation from the first device element that has them.

        Returns:
            Dict with 'acq_rotation' and 'acq_threshold' if found, else empty dict.
        """
        elements = self._device.elements
        if callable(elements):
            element_names = elements()
        else:
            element_names = list(elements.keys())

        for element_name in element_names:
            if hasattr(self._device, "get_element"):
                element = self._device.get_element(element_name)
            else:
                element = self._device.elements[element_name]
            try:
                threshold = element.measure.acq_threshold
                rotation = element.measure.acq_rotation
                if callable(threshold):
                    threshold = threshold()
                if callable(rotation):
                    rotation = rotation()
                if threshold is not None and rotation is not None:
                    return {
                        "acq_threshold": float(threshold),
                        "acq_rotation": float(rotation),
                    }
            except (AttributeError, KeyError):
                continue
        return {}

    def process_result(self, dataset: xr.Dataset, job_id: str) -> dict:
        """Convert a qblox-scheduler acquisition dataset into a Qiskit-compatible result dict.

        Handles all meas_levels:
        - meas_level=0 (Trace): Returns raw complex waveform data as [[real, imag], ...] per time sample.
        - meas_level=1 (SSBIntegrationComplex): Returns IQ values as [[real, imag]] per shot per qubit.
        - meas_level=2 with ThresholdedAcquisition: Aggregates 0/1 values into counts dict.
        - meas_level=2 with SSBIntegrationComplex: Performs software discrimination using
          acq_threshold and acq_rotation from the device config.

        Args:
            dataset: xr.Dataset from execute().
            job_id: Unique job ID.

        Returns:
            dict: Qiskit-compatible result dict.
        """
        from qpi_driver.executors.utils.result import build_qiskit_result

        meas_level = int(dataset.attrs.get("meas_level", 2))
        meas_return = str(dataset.attrs.get("meas_return", "single"))
        acq_protocol = str(dataset.attrs.get("acq_protocol", "SSBIntegrationComplex"))
        backend = dataset.attrs.get("backend", self.name)

        # Handle multi-circuit datasets
        if "circuit_index" in dataset.dims:
            circuit_results = []
            for ci in range(dataset.sizes["circuit_index"]):
                sub_ds = dataset.isel(circuit_index=ci)
                sub_ds.attrs.update(dataset.attrs)
                circuit_results.append(
                    self._single_dataset_to_result(
                        sub_ds, meas_level, meas_return, acq_protocol
                    )
                )
            return build_qiskit_result(circuit_results, job_id, backend)

        single = self._single_dataset_to_result(
            dataset, meas_level, meas_return, acq_protocol
        )
        return build_qiskit_result([single], job_id, backend)

    def _single_dataset_to_result(
        self, dataset: xr.Dataset, meas_level: int, meas_return: str, acq_protocol: str
    ) -> dict:
        """Extract result data from a single-circuit quantify dataset."""
        qubit_vars, q0_key, shots = self._extract_qubit_vars(dataset)
        if not qubit_vars:
            return {"raw": str(dataset), "shots": 0}

        if meas_level == 0:
            return self._process_meas_level_0(dataset, qubit_vars, shots)
        if meas_level == 1:
            return self._process_meas_level_1(dataset, qubit_vars, shots, meas_return)
        return self._process_meas_level_2(dataset, qubit_vars, shots, acq_protocol)

    def _extract_qubit_vars(self, dataset: xr.Dataset) -> tuple[list[int], str, int]:
        """Identify qubit variables (integer-named data vars) and shots."""
        qubit_vars = []
        for var in dataset.data_vars:
            try:
                qubit_vars.append(int(var))
            except ValueError:
                pass
        if not qubit_vars:
            return [], "", 0
        qubit_vars.sort()
        q0_key = qubit_vars[0] if qubit_vars[0] in dataset else str(qubit_vars[0])
        shots = int(dataset.attrs.get("shots", len(dataset[q0_key])))
        return qubit_vars, q0_key, shots

    def _process_meas_level_0(
        self, dataset: xr.Dataset, qubit_vars: list[int], shots: int
    ) -> dict:
        """Extract raw complex trace data (meas_level=0)."""
        memory: list[list[list[float]]] = []
        for q_idx in qubit_vars:
            var_key = q_idx if q_idx in dataset else str(q_idx)
            trace = dataset[var_key].values.flatten()
            qubit_trace = [[float(v.real), float(v.imag)] for v in trace]
            memory.append(qubit_trace)
        return {"memory": memory, "shots": shots}

    def _process_meas_level_1(
        self, dataset: xr.Dataset, qubit_vars: list[int], shots: int, meas_return: str
    ) -> dict:
        """Extract integrated IQ memory (meas_level=1)."""
        from qpi_driver.executors.utils.result import iq_memory_avg

        q0_key = qubit_vars[0] if qubit_vars[0] in dataset else str(qubit_vars[0])
        num_samples = len(dataset[q0_key])
        memory = []
        for s in range(num_samples):
            shot_iq = []
            for q_idx in qubit_vars:
                var_key = q_idx if q_idx in dataset else str(q_idx)
                val = dataset[var_key].values[s]
                r = (
                    float(val.real)
                    if val is not None and not np.isnan(val.real)
                    else 0.0
                )
                i = (
                    float(val.imag)
                    if val is not None and not np.isnan(val.imag)
                    else 0.0
                )
                shot_iq.append([r, i])
            memory.append(shot_iq)

        if meas_return == "avg" and memory:
            memory = iq_memory_avg(memory, len(qubit_vars))
        return {"memory": memory, "shots": shots}

    def _process_meas_level_2(
        self, dataset: xr.Dataset, qubit_vars: list[int], shots: int, acq_protocol: str
    ) -> dict:
        """Extract classified counts (meas_level=2) performing software discrimination if needed."""
        q0_key = qubit_vars[0] if qubit_vars[0] in dataset else str(qubit_vars[0])
        num_samples = len(dataset[q0_key])
        n_qubits = len(qubit_vars)
        counts_dict: dict[str, int] = {}

        if acq_protocol == "ThresholdedAcquisition":
            for s in range(num_samples):
                state_chars = []
                for q_idx in reversed(qubit_vars):
                    var_key = q_idx if q_idx in dataset else str(q_idx)
                    val = dataset[var_key].values[s]
                    r = val.real if val is not None and not np.isnan(val.real) else 0.0
                    state_chars.append(str(1 if r >= 1 else 0))
                state_str = "".join(state_chars)
                counts_dict[state_str] = counts_dict.get(state_str, 0) + 1
        else:
            # SSBIntegrationComplex software discrimination.
            # Check dataset.attrs first, fallback to device config, default to 0.0.
            acq_rotation = dataset.attrs.get("acq_rotation")
            acq_threshold = dataset.attrs.get("acq_threshold")
            if acq_rotation is None or acq_threshold is None:
                dev_params = self._get_threshold_params()
                acq_rotation = (
                    dev_params.get("acq_rotation", 0.0)
                    if acq_rotation is None
                    else acq_rotation
                )
                acq_threshold = (
                    dev_params.get("acq_threshold", 0.0)
                    if acq_threshold is None
                    else acq_threshold
                )

            rot_rad = np.radians(acq_rotation)
            for s in range(num_samples):
                state_chars = []
                for q_idx in reversed(qubit_vars):
                    var_key = q_idx if q_idx in dataset else str(q_idx)
                    val = dataset[var_key].values[s]
                    if val is None or (np.isnan(val.real) and np.isnan(val.imag)):
                        state_chars.append("0")
                    else:
                        rotated = val * np.exp(1j * rot_rad)
                        state_chars.append("1" if rotated.real > acq_threshold else "0")
                state_str = "".join(state_chars)
                counts_dict[state_str] = counts_dict.get(state_str, 0) + 1

        if num_samples == 1 and shots > 1:
            state_str = list(counts_dict.keys())[0]
            counts_dict = {state_str: shots}

        # Pad with 0 counts for all 2^N states
        states_list = [format(i, f"0{n_qubits}b") for i in range(2**n_qubits)]
        for st in states_list:
            if st not in counts_dict:
                counts_dict[st] = 0

        return {"counts": counts_dict, "shots": shots}

    def close(self) -> None:
        """Release resources."""
        with suppress(Exception):
            self._agent.instrument_coordinator.close()

__init__(name='qblox', quantify_hardware_config=Path('quantify.hardware.json'), quantify_device_config=Path('quantify.device.json'), is_dummy=False, data_dir=Path('data'), acquisition_timeout=10, **kwargs)

Initialize the QbloxExecutor.

Parameters:

Name Type Description Default
name str

the name of the executor

'qblox'
quantify_hardware_config QbloxHardwareCompilationConfig | Path | dict

Hardware-layer configuration dictionary, file path, or config as dict.

Path('quantify.hardware.json')
quantify_device_config Path | dict

Device-layer configuration dictionary, file path or config as dict

Path('quantify.device.json')
is_dummy bool

If True, uses a dummy Cluster instrument.

False
data_dir Path

Directory to where data is temporarily stored.

Path('data')
acquisition_timeout int

Timeout in seconds to wait for acquisition.

10
**kwargs Any

Arbitrary keyword arguments passed to the base class.

{}
Source code in qpi-driver/qpi_driver/executors/qblox.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def __init__(
    self,
    name: str = "qblox",
    quantify_hardware_config: QbloxHardwareCompilationConfig | Path | dict = Path(
        "quantify.hardware.json"
    ),
    quantify_device_config: Path | dict = Path("quantify.device.json"),
    is_dummy: bool = False,
    data_dir: Path = Path("data"),
    acquisition_timeout: int = 10,
    **kwargs: Any,
) -> None:
    """Initialize the QbloxExecutor.

    Args:
        name: the name of the executor
        quantify_hardware_config: Hardware-layer configuration dictionary, file path, or config as dict.
        quantify_device_config: Device-layer configuration dictionary, file path or config as dict
        is_dummy: If True, uses a dummy Cluster instrument.
        data_dir: Directory to where data is temporarily stored.
        acquisition_timeout: Timeout in seconds to wait for acquisition.
        **kwargs: Arbitrary keyword arguments passed to the base class.
    """
    super().__init__(name, **kwargs)
    # Clean up any previously registered instruments to avoid name collision errors in QCoDeS
    with suppress(Exception):
        Instrument.close_all()

    self._data_dir = data_dir
    self._is_dummy = is_dummy
    self._acquisition_timeout = acquisition_timeout
    self._hardware_config = _load_quantify_hardware_config(quantify_hardware_config)
    self._device = _load_quantum_device(name=name, config=quantify_device_config)
    self._agent = HardwareAgent(
        hardware_configuration=self._hardware_config,
        quantum_device_configuration=self._device,
        create_dummy_connections=is_dummy,
        output_dir=data_dir,
    )

close()

Release resources.

Source code in qpi-driver/qpi_driver/executors/qblox.py
382
383
384
385
def close(self) -> None:
    """Release resources."""
    with suppress(Exception):
        self._agent.instrument_coordinator.close()

execute(payload)

Execute quantum instructions using the Qblox scheduler.

Parameters:

Name Type Description Default
payload JobPayload

JobPayload specifying shots, circuits, meas_level, etc.

required

Returns:

Type Description
Dataset

xr.Dataset: Raw acquisition dataset.

Source code in qpi-driver/qpi_driver/executors/qblox.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def execute(self, payload: JobPayload) -> xr.Dataset:
    """Execute quantum instructions using the Qblox scheduler.

    Args:
        payload: JobPayload specifying shots, circuits, meas_level, etc.

    Returns:
        xr.Dataset: Raw acquisition dataset.
    """
    # Parse QASM circuit
    circuit = load_qasm(payload.qasm)
    shots = payload.shots
    n_qubits = circuit.num_qubits

    # Determine acquisition protocol and parameters from meas_level and payload overrides
    acq_protocol, acq_kwargs = self._resolve_acq_protocol(payload)

    schedule = _generate_schedule(
        name=payload.id,
        circuit=circuit,
        shots=shots,
        acq_protocol=acq_protocol,
        acq_kwargs=acq_kwargs,
    )

    dataset = self._agent.run(schedule, timeout=self._acquisition_timeout)
    dataset.attrs.update(
        {
            "shots": shots,
            "n_qubits": n_qubits,
            "backend": self.name,
            "meas_level": payload.meas_level,
            "meas_return": payload.meas_return,
            "acq_protocol": acq_protocol,
        }
    )
    if "acq_rotation" in acq_kwargs:
        dataset.attrs["acq_rotation"] = acq_kwargs["acq_rotation"]
    if "acq_threshold" in acq_kwargs:
        dataset.attrs["acq_threshold"] = acq_kwargs["acq_threshold"]
    return dataset

process_result(dataset, job_id)

Convert a qblox-scheduler acquisition dataset into a Qiskit-compatible result dict.

Handles all meas_levels: - meas_level=0 (Trace): Returns raw complex waveform data as [[real, imag], ...] per time sample. - meas_level=1 (SSBIntegrationComplex): Returns IQ values as [[real, imag]] per shot per qubit. - meas_level=2 with ThresholdedAcquisition: Aggregates 0/1 values into counts dict. - meas_level=2 with SSBIntegrationComplex: Performs software discrimination using acq_threshold and acq_rotation from the device config.

Parameters:

Name Type Description Default
dataset Dataset

xr.Dataset from execute().

required
job_id str

Unique job ID.

required

Returns:

Name Type Description
dict dict

Qiskit-compatible result dict.

Source code in qpi-driver/qpi_driver/executors/qblox.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def process_result(self, dataset: xr.Dataset, job_id: str) -> dict:
    """Convert a qblox-scheduler acquisition dataset into a Qiskit-compatible result dict.

    Handles all meas_levels:
    - meas_level=0 (Trace): Returns raw complex waveform data as [[real, imag], ...] per time sample.
    - meas_level=1 (SSBIntegrationComplex): Returns IQ values as [[real, imag]] per shot per qubit.
    - meas_level=2 with ThresholdedAcquisition: Aggregates 0/1 values into counts dict.
    - meas_level=2 with SSBIntegrationComplex: Performs software discrimination using
      acq_threshold and acq_rotation from the device config.

    Args:
        dataset: xr.Dataset from execute().
        job_id: Unique job ID.

    Returns:
        dict: Qiskit-compatible result dict.
    """
    from qpi_driver.executors.utils.result import build_qiskit_result

    meas_level = int(dataset.attrs.get("meas_level", 2))
    meas_return = str(dataset.attrs.get("meas_return", "single"))
    acq_protocol = str(dataset.attrs.get("acq_protocol", "SSBIntegrationComplex"))
    backend = dataset.attrs.get("backend", self.name)

    # Handle multi-circuit datasets
    if "circuit_index" in dataset.dims:
        circuit_results = []
        for ci in range(dataset.sizes["circuit_index"]):
            sub_ds = dataset.isel(circuit_index=ci)
            sub_ds.attrs.update(dataset.attrs)
            circuit_results.append(
                self._single_dataset_to_result(
                    sub_ds, meas_level, meas_return, acq_protocol
                )
            )
        return build_qiskit_result(circuit_results, job_id, backend)

    single = self._single_dataset_to_result(
        dataset, meas_level, meas_return, acq_protocol
    )
    return build_qiskit_result([single], job_id, backend)

QiskitAerExecutor

Bases: Executor

Source code in qpi-driver/qpi_driver/executors/qiskit_aer.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class QiskitAerExecutor(Executor):
    def __init__(self, name: str = "qiskit_aer", **kwargs: Any):
        if not IS_AER_INSTALLED:
            raise ImportError(
                "qiskit-aer is not installed. Install the [aer] extra to use QiskitAerExecutor."
            )

        super().__init__(name, **kwargs)
        self._simulator = AerSimulator()

    def execute(self, payload: JobPayload) -> xr.Dataset:
        """Run quantum circuit simulation using Qiskit Aer backend.

        For multi-circuit payloads, results are concatenated along a ``circuit_index``
        dimension.  A single-circuit payload without parameter bindings returns the
        legacy flat format (no ``circuit_index`` dimension) for backward compatibility.

        Args:
            payload: JobPayload specifying shots and circuits.

        Returns:
            xr.Dataset: Dataset containing measured state outcomes, counts, and frequencies.

        Raises:
            ImportError: If qiskit-aer is not installed.
            ValueError: If the provided QASM circuit cannot be loaded.
        """
        sub_datasets: list[xr.Dataset] = []

        for circ in payload.circuits:
            circ_shots = circ.shots if circ.shots is not None else payload.shots
            qasm_str = circ.circuit
            circuit = load_qasm(qasm_str)

            param_sets = circ.parameter_values or [None]
            for param_vals in param_sets:
                bound_circuit = circuit
                if param_vals is not None and circuit.parameters:
                    bound_circuit = circuit.assign_parameters(param_vals)

                t_qc = transpile(bound_circuit, self._simulator)
                result = self._simulator.run(
                    t_qc, shots=circ_shots, memory=True
                ).result()
                memory = result.get_memory(t_qc)
                n_qubits = circuit.num_qubits

                ds = memory_to_dataset(memory, n_qubits, circ_shots, payload.meas_level)
                sub_datasets.append(ds)

        # Backward compatible: single result → flat dataset (no circuit_index)
        if len(sub_datasets) == 1:
            ds = sub_datasets[0]
            ds.attrs.update(
                {
                    "shots": circ_shots,
                    "n_qubits": n_qubits,
                    "backend": self.name,
                    "meas_level": payload.meas_level,
                    "meas_return": payload.meas_return,
                }
            )
            return ds

        # Multiple results → concat along circuit_index
        combined = xr.concat(sub_datasets, dim="circuit_index")
        combined.attrs.update(
            {
                "shots": payload.shots,
                "n_qubits": n_qubits,
                "backend": self.name,
                "meas_level": payload.meas_level,
                "meas_return": payload.meas_return,
            }
        )
        return combined

    def process_result(self, dataset: xr.Dataset, job_id: str) -> dict:
        """Convert the simulator's xr.Dataset into a Qiskit-compatible result dict.

        Supports meas_level 1 (IQ memory) and 2 (classified counts).
        meas_level 0 is not supported for simulators.

        Args:
            dataset: xr.Dataset from execute().
            job_id: Unique job ID.

        Returns:
            dict: Qiskit-compatible result dict.
        """
        meas_level = int(dataset.attrs.get("meas_level", 2))
        meas_return = str(dataset.attrs.get("meas_return", "single"))
        backend = dataset.attrs.get("backend", self.name)

        # Handle multi-circuit datasets
        if "circuit_index" in dataset.dims:
            circuit_results = []
            for ci in range(dataset.sizes["circuit_index"]):
                sub_ds = dataset.isel(circuit_index=ci)
                sub_ds.attrs.update(dataset.attrs)
                circuit_results.append(
                    self._single_dataset_to_result(sub_ds, meas_level, meas_return)
                )
            return build_qiskit_result(circuit_results, job_id, backend)

        # Single circuit
        single = self._single_dataset_to_result(dataset, meas_level, meas_return)
        return build_qiskit_result([single], job_id, backend)

    @staticmethod
    def _single_dataset_to_result(
        dataset: xr.Dataset, meas_level: int, meas_return: str
    ) -> dict:
        """Extract counts or IQ memory from a single-circuit dataset slice."""
        # Identify qubit variables (integer-named data vars)
        qubit_vars = []
        for var in dataset.data_vars:
            try:
                qubit_vars.append(int(var))
            except ValueError:
                pass
        if not qubit_vars:
            return {"raw": str(dataset), "shots": 0}

        qubit_vars.sort()
        q0_key = str(qubit_vars[0])
        shots = int(dataset.attrs.get("shots", len(dataset[q0_key])))

        # meas_level=1: return IQ memory
        if meas_level == 1:
            memory: list[list[list[float]]] = []
            num_samples = len(dataset[q0_key])
            for s in range(num_samples):
                shot_iq = []
                for q_idx in qubit_vars:
                    val = dataset[str(q_idx)].values[s]
                    shot_iq.append([float(val.real), float(val.imag)])
                memory.append(shot_iq)

            if meas_return == "avg" and memory:
                memory = iq_memory_avg(memory, len(qubit_vars))

            return {"memory": memory, "shots": shots}

        # meas_level=2: classified counts
        n_qubits = len(qubit_vars)
        counts_dict: dict[str, int] = {}
        num_samples = len(dataset[q0_key])
        for s in range(num_samples):
            state_chars = []
            for q_idx in reversed(qubit_vars):
                val = dataset[str(q_idx)].values[s]
                state_chars.append("1" if val.real > 0.5 else "0")
            state_str = "".join(state_chars)
            counts_dict[state_str] = counts_dict.get(state_str, 0) + 1

        if num_samples == 1 and shots > 1:
            state_str = list(counts_dict.keys())[0]
            counts_dict = {state_str: shots}

        # Pad with 0 counts for all 2^N states
        states_list = [format(i, f"0{n_qubits}b") for i in range(2**n_qubits)]
        for st in states_list:
            if st not in counts_dict:
                counts_dict[st] = 0

        return {"counts": counts_dict, "shots": shots}

execute(payload)

Run quantum circuit simulation using Qiskit Aer backend.

For multi-circuit payloads, results are concatenated along a circuit_index dimension. A single-circuit payload without parameter bindings returns the legacy flat format (no circuit_index dimension) for backward compatibility.

Parameters:

Name Type Description Default
payload JobPayload

JobPayload specifying shots and circuits.

required

Returns:

Type Description
Dataset

xr.Dataset: Dataset containing measured state outcomes, counts, and frequencies.

Raises:

Type Description
ImportError

If qiskit-aer is not installed.

ValueError

If the provided QASM circuit cannot be loaded.

Source code in qpi-driver/qpi_driver/executors/qiskit_aer.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def execute(self, payload: JobPayload) -> xr.Dataset:
    """Run quantum circuit simulation using Qiskit Aer backend.

    For multi-circuit payloads, results are concatenated along a ``circuit_index``
    dimension.  A single-circuit payload without parameter bindings returns the
    legacy flat format (no ``circuit_index`` dimension) for backward compatibility.

    Args:
        payload: JobPayload specifying shots and circuits.

    Returns:
        xr.Dataset: Dataset containing measured state outcomes, counts, and frequencies.

    Raises:
        ImportError: If qiskit-aer is not installed.
        ValueError: If the provided QASM circuit cannot be loaded.
    """
    sub_datasets: list[xr.Dataset] = []

    for circ in payload.circuits:
        circ_shots = circ.shots if circ.shots is not None else payload.shots
        qasm_str = circ.circuit
        circuit = load_qasm(qasm_str)

        param_sets = circ.parameter_values or [None]
        for param_vals in param_sets:
            bound_circuit = circuit
            if param_vals is not None and circuit.parameters:
                bound_circuit = circuit.assign_parameters(param_vals)

            t_qc = transpile(bound_circuit, self._simulator)
            result = self._simulator.run(
                t_qc, shots=circ_shots, memory=True
            ).result()
            memory = result.get_memory(t_qc)
            n_qubits = circuit.num_qubits

            ds = memory_to_dataset(memory, n_qubits, circ_shots, payload.meas_level)
            sub_datasets.append(ds)

    # Backward compatible: single result → flat dataset (no circuit_index)
    if len(sub_datasets) == 1:
        ds = sub_datasets[0]
        ds.attrs.update(
            {
                "shots": circ_shots,
                "n_qubits": n_qubits,
                "backend": self.name,
                "meas_level": payload.meas_level,
                "meas_return": payload.meas_return,
            }
        )
        return ds

    # Multiple results → concat along circuit_index
    combined = xr.concat(sub_datasets, dim="circuit_index")
    combined.attrs.update(
        {
            "shots": payload.shots,
            "n_qubits": n_qubits,
            "backend": self.name,
            "meas_level": payload.meas_level,
            "meas_return": payload.meas_return,
        }
    )
    return combined

process_result(dataset, job_id)

Convert the simulator's xr.Dataset into a Qiskit-compatible result dict.

Supports meas_level 1 (IQ memory) and 2 (classified counts). meas_level 0 is not supported for simulators.

Parameters:

Name Type Description Default
dataset Dataset

xr.Dataset from execute().

required
job_id str

Unique job ID.

required

Returns:

Name Type Description
dict dict

Qiskit-compatible result dict.

Source code in qpi-driver/qpi_driver/executors/qiskit_aer.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def process_result(self, dataset: xr.Dataset, job_id: str) -> dict:
    """Convert the simulator's xr.Dataset into a Qiskit-compatible result dict.

    Supports meas_level 1 (IQ memory) and 2 (classified counts).
    meas_level 0 is not supported for simulators.

    Args:
        dataset: xr.Dataset from execute().
        job_id: Unique job ID.

    Returns:
        dict: Qiskit-compatible result dict.
    """
    meas_level = int(dataset.attrs.get("meas_level", 2))
    meas_return = str(dataset.attrs.get("meas_return", "single"))
    backend = dataset.attrs.get("backend", self.name)

    # Handle multi-circuit datasets
    if "circuit_index" in dataset.dims:
        circuit_results = []
        for ci in range(dataset.sizes["circuit_index"]):
            sub_ds = dataset.isel(circuit_index=ci)
            sub_ds.attrs.update(dataset.attrs)
            circuit_results.append(
                self._single_dataset_to_result(sub_ds, meas_level, meas_return)
            )
        return build_qiskit_result(circuit_results, job_id, backend)

    # Single circuit
    single = self._single_dataset_to_result(dataset, meas_level, meas_return)
    return build_qiskit_result([single], job_id, backend)

QuantifyExecutor

Bases: Executor

Executor subclass for interacting with Quantify-scheduler acquisition backends.

Source code in qpi-driver/qpi_driver/executors/quantify.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
class QuantifyExecutor(Executor):
    """Executor subclass for interacting with Quantify-scheduler acquisition backends."""

    def __new__(cls, *args, **kwargs):
        if not IS_QUANTIFY_INSTALLED:
            raise ImportError(
                "quantify-scheduler is not installed. Install the [quantify] extra to use QuantifyExecutor."
            )
        return super().__new__(cls)

    def __init__(
        self,
        name: str = "quantify",
        quantify_hardware_config: QbloxHardwareCompilationConfig | Path | dict = Path(
            "quantify.hardware.json"
        ),
        quantify_device_config: Path | dict = Path("quantify.device.json"),
        is_dummy: bool = False,
        data_dir: Path = Path("data"),
        acquisition_timeout: int = 10,
        **kwargs: Any,
    ) -> None:
        """Initialize the QuantifyExecutor.

        Args:
            name: the name of the executor
            quantify_hardware_config: Hardware-layer configuration dictionary, file path, or config as dict.
            quantify_device_config: Device-layer configuration dictionary, file path or config as dict
            is_dummy: If True, uses a dummy Cluster instrument.
            data_dir: Directory to where data is temporarily stored.
            acquisition_timeout: Timeout in seconds to wait for acquisition.
            **kwargs: Arbitrary keyword arguments passed to the base class.
        """
        super().__init__(name, **kwargs)
        set_datadir(data_dir)
        # Clean up any previously registered instruments to avoid name collision errors in QCoDeS
        with suppress(Exception):
            Instrument.close_all()

        self._is_dummy = is_dummy
        self._acquisition_timeout = acquisition_timeout
        hardware_config = _load_quantify_hardware_config(quantify_hardware_config)
        self._hardware_config = hardware_config
        self._device = _load_quantum_device(name=name, config=quantify_device_config)
        self._instrument_coordinator = _load_instrument_coordinator(
            f"{name}_ic", hardware_config=hardware_config, is_dummy=is_dummy
        )
        self._device.hardware_config(hardware_config)
        self._compiler = SerialCompiler(
            name=f"{name}_compiler", quantum_device=self._device
        )

    @property
    def hardware_config(self) -> QbloxHardwareCompilationConfig:
        return self._hardware_config

    def execute(self, payload: JobPayload) -> xr.Dataset:
        """Execute quantum instructions using the Quantify scheduler.

        The acquisition protocol is selected based on ``payload.meas_level``:

        * ``meas_level=0`` → ``Trace`` (raw waveform)
        * ``meas_level=1`` → ``SSBIntegrationComplex`` (kerneled IQ)
        * ``meas_level=2`` → ``ThresholdedAcquisition`` if threshold params
          are configured on the device elements, else ``SSBIntegrationComplex``
          (software discrimination deferred to ``process_result()``).

        Args:
            payload: JobPayload specifying shots, circuits, meas_level, etc.

        Returns:
            xr.Dataset: Raw acquisition dataset.
        """
        # Parse QASM circuit
        circuit = load_qasm(payload.qasm)
        shots = payload.shots
        n_qubits = circuit.num_qubits

        # Determine acquisition protocol and parameters from meas_level and payload overrides
        acq_protocol, acq_kwargs = self._resolve_acq_protocol(payload)

        # Translate Qiskit QuantumCircuit to Quantify Schedule
        schedule = Schedule(name=payload.id, repetitions=shots)
        acq_indices = {}

        for instruction in circuit.data:
            parsed_ops = _to_quantify_gates(
                circuit=circuit,
                instruction=instruction,
                acq_indices=acq_indices,
                acq_protocol=acq_protocol,
                acq_kwargs=acq_kwargs,
            )
            import qiskit

            is_parallel_op = isinstance(
                instruction.operation,
                (qiskit_library.Measure, qiskit.circuit.Delay, qiskit_library.Barrier),
            )

            if is_parallel_op and parsed_ops:
                first_op = schedule.add(parsed_ops[0])
                for op in parsed_ops[1:]:
                    schedule.add(op, ref_op=first_op, ref_pt="start")
            else:
                for op in parsed_ops:
                    schedule.add(op)

        compiled_sched = self._compiler.compile(schedule=schedule)

        self._instrument_coordinator.prepare(compiled_sched)
        self._instrument_coordinator.start()
        self._instrument_coordinator.wait_done(timeout_sec=self._acquisition_timeout)
        dataset = self._instrument_coordinator.retrieve_acquisition()
        dataset.attrs.update(
            {
                "shots": shots,
                "n_qubits": n_qubits,
                "backend": self.name,
                "meas_level": payload.meas_level,
                "meas_return": payload.meas_return,
                "acq_protocol": acq_protocol,
            }
        )
        if "acq_rotation" in acq_kwargs:
            dataset.attrs["acq_rotation"] = acq_kwargs["acq_rotation"]
        if "acq_threshold" in acq_kwargs:
            dataset.attrs["acq_threshold"] = acq_kwargs["acq_threshold"]
        return dataset

    def _resolve_acq_protocol(self, payload: JobPayload) -> tuple[str, dict]:
        """Determine the quantify-scheduler acquisition protocol for the given meas_level.

        Args:
            payload: JobPayload specifying meas_level, acq_threshold, acq_rotation, etc.

        Returns:
            Tuple of (protocol_name, extra_kwargs_for_Measure).
        """
        if payload.meas_level == 0:
            return "Trace", {}

        if payload.meas_level == 1:
            return "SSBIntegrationComplex", {}

        # meas_level == 2: try ThresholdedAcquisition if device or payload has threshold params
        dev_params = self._get_threshold_params()
        if payload.acq_rotation is not None:
            dev_params["acq_rotation"] = payload.acq_rotation
        if payload.acq_threshold is not None:
            dev_params["acq_threshold"] = payload.acq_threshold

        if "acq_rotation" in dev_params and "acq_threshold" in dev_params:
            return "ThresholdedAcquisition", dev_params

        # Fallback to SSBIntegrationComplex + software discrimination in process_result()
        return "SSBIntegrationComplex", dev_params

    def _get_threshold_params(self) -> dict:
        """Extract acq_threshold and acq_rotation from the first device element that has them.

        Returns:
            Dict with 'acq_rotation' and 'acq_threshold' if found, else empty dict.
        """
        for element_name in self._device.elements():
            element = self._device.get_element(element_name)
            try:
                threshold = element.measure.acq_threshold()
                rotation = element.measure.acq_rotation()
                if threshold is not None and rotation is not None:
                    return {
                        "acq_threshold": threshold,
                        "acq_rotation": rotation,
                    }
            except (AttributeError, KeyError):
                continue
        return {}

    def process_result(self, dataset: xr.Dataset, job_id: str) -> dict:
        """Convert a quantify-scheduler acquisition dataset into a Qiskit-compatible result dict.

        Handles all meas_levels:
        - meas_level=0 (Trace): Returns raw complex waveform data as [[real, imag], ...] per time sample.
        - meas_level=1 (SSBIntegrationComplex): Returns IQ values as [[real, imag]] per shot per qubit.
        - meas_level=2 with ThresholdedAcquisition: Aggregates 0/1 values into counts dict.
        - meas_level=2 with SSBIntegrationComplex: Performs software discrimination using
          acq_threshold and acq_rotation from the device config.

        Args:
            dataset: xr.Dataset from execute().
            job_id: Unique job ID.

        Returns:
            dict: Qiskit-compatible result dict.
        """
        from qpi_driver.executors.utils.result import build_qiskit_result

        meas_level = int(dataset.attrs.get("meas_level", 2))
        meas_return = str(dataset.attrs.get("meas_return", "single"))
        acq_protocol = str(dataset.attrs.get("acq_protocol", "SSBIntegrationComplex"))
        backend = dataset.attrs.get("backend", self.name)

        # Handle multi-circuit datasets
        if "circuit_index" in dataset.dims:
            circuit_results = []
            for ci in range(dataset.sizes["circuit_index"]):
                sub_ds = dataset.isel(circuit_index=ci)
                sub_ds.attrs.update(dataset.attrs)
                circuit_results.append(
                    self._single_dataset_to_result(
                        sub_ds, meas_level, meas_return, acq_protocol
                    )
                )
            return build_qiskit_result(circuit_results, job_id, backend)

        single = self._single_dataset_to_result(
            dataset, meas_level, meas_return, acq_protocol
        )
        return build_qiskit_result([single], job_id, backend)

    def _single_dataset_to_result(
        self, dataset: xr.Dataset, meas_level: int, meas_return: str, acq_protocol: str
    ) -> dict:
        """Extract result data from a single-circuit quantify dataset."""
        qubit_vars, q0_key, shots = self._extract_qubit_vars(dataset)
        if not qubit_vars:
            return {"raw": str(dataset), "shots": 0}

        if meas_level == 0:
            return self._process_meas_level_0(dataset, qubit_vars, shots)
        if meas_level == 1:
            return self._process_meas_level_1(dataset, qubit_vars, shots, meas_return)
        return self._process_meas_level_2(dataset, qubit_vars, shots, acq_protocol)

    def _extract_qubit_vars(self, dataset: xr.Dataset) -> tuple[list[int], str, int]:
        """Identify qubit variables (integer-named data vars) and shots."""
        qubit_vars = []
        for var in dataset.data_vars:
            try:
                qubit_vars.append(int(var))
            except ValueError:
                pass
        if not qubit_vars:
            return [], "", 0
        qubit_vars.sort()
        q0_key = qubit_vars[0] if qubit_vars[0] in dataset else str(qubit_vars[0])
        shots = int(dataset.attrs.get("shots", len(dataset[q0_key])))
        return qubit_vars, q0_key, shots

    def _process_meas_level_0(
        self, dataset: xr.Dataset, qubit_vars: list[int], shots: int
    ) -> dict:
        """Extract raw complex trace data (meas_level=0)."""
        memory: list[list[list[float]]] = []
        for q_idx in qubit_vars:
            var_key = q_idx if q_idx in dataset else str(q_idx)
            trace = dataset[var_key].values.flatten()
            qubit_trace = [[float(v.real), float(v.imag)] for v in trace]
            memory.append(qubit_trace)
        return {"memory": memory, "shots": shots}

    def _process_meas_level_1(
        self, dataset: xr.Dataset, qubit_vars: list[int], shots: int, meas_return: str
    ) -> dict:
        """Extract integrated IQ memory (meas_level=1)."""
        from qpi_driver.executors.utils.result import iq_memory_avg

        q0_key = qubit_vars[0] if qubit_vars[0] in dataset else str(qubit_vars[0])
        num_samples = len(dataset[q0_key])
        memory = []
        for s in range(num_samples):
            shot_iq = []
            for q_idx in qubit_vars:
                var_key = q_idx if q_idx in dataset else str(q_idx)
                val = dataset[var_key].values[s]
                shot_iq.append([float(val.real), float(val.imag)])
            memory.append(shot_iq)

        if meas_return == "avg" and memory:
            memory = iq_memory_avg(memory, len(qubit_vars))
        return {"memory": memory, "shots": shots}

    def _process_meas_level_2(
        self, dataset: xr.Dataset, qubit_vars: list[int], shots: int, acq_protocol: str
    ) -> dict:
        """Extract classified counts (meas_level=2) performing software discrimination if needed."""
        q0_key = qubit_vars[0] if qubit_vars[0] in dataset else str(qubit_vars[0])
        num_samples = len(dataset[q0_key])
        n_qubits = len(qubit_vars)
        counts_dict: dict[str, int] = {}

        if acq_protocol == "ThresholdedAcquisition":
            for s in range(num_samples):
                state_chars = []
                for q_idx in reversed(qubit_vars):
                    var_key = q_idx if q_idx in dataset else str(q_idx)
                    val = dataset[var_key].values[s]
                    r = val.real if val is not None and not np.isnan(val.real) else 0.0
                    state_chars.append(str(1 if r >= 1 else 0))
                state_str = "".join(state_chars)
                counts_dict[state_str] = counts_dict.get(state_str, 0) + 1
        else:
            # SSBIntegrationComplex software discrimination.
            # Check dataset.attrs first, fallback to device config, default to 0.0.
            acq_rotation = dataset.attrs.get("acq_rotation")
            acq_threshold = dataset.attrs.get("acq_threshold")
            if acq_rotation is None or acq_threshold is None:
                dev_params = self._get_threshold_params()
                acq_rotation = (
                    dev_params.get("acq_rotation", 0.0)
                    if acq_rotation is None
                    else acq_rotation
                )
                acq_threshold = (
                    dev_params.get("acq_threshold", 0.0)
                    if acq_threshold is None
                    else acq_threshold
                )

            rot_rad = np.radians(acq_rotation)
            for s in range(num_samples):
                state_chars = []
                for q_idx in reversed(qubit_vars):
                    var_key = q_idx if q_idx in dataset else str(q_idx)
                    val = dataset[var_key].values[s]
                    rotated = val * np.exp(1j * rot_rad)
                    state_chars.append("1" if rotated.real > acq_threshold else "0")
                state_str = "".join(state_chars)
                counts_dict[state_str] = counts_dict.get(state_str, 0) + 1

        if num_samples == 1 and shots > 1:
            state_str = list(counts_dict.keys())[0]
            counts_dict = {state_str: shots}

        # Pad with 0 counts for all 2^N states
        states_list = [format(i, f"0{n_qubits}b") for i in range(2**n_qubits)]
        for st in states_list:
            if st not in counts_dict:
                counts_dict[st] = 0

        return {"counts": counts_dict, "shots": shots}

    def close(self) -> None:
        """Release resources."""
        for component in self._instrument_coordinator.components:
            self._instrument_coordinator.remove_component(component.name)
            component.close()

        with suppress(Exception):
            self._instrument_coordinator.close()

__init__(name='quantify', quantify_hardware_config=Path('quantify.hardware.json'), quantify_device_config=Path('quantify.device.json'), is_dummy=False, data_dir=Path('data'), acquisition_timeout=10, **kwargs)

Initialize the QuantifyExecutor.

Parameters:

Name Type Description Default
name str

the name of the executor

'quantify'
quantify_hardware_config QbloxHardwareCompilationConfig | Path | dict

Hardware-layer configuration dictionary, file path, or config as dict.

Path('quantify.hardware.json')
quantify_device_config Path | dict

Device-layer configuration dictionary, file path or config as dict

Path('quantify.device.json')
is_dummy bool

If True, uses a dummy Cluster instrument.

False
data_dir Path

Directory to where data is temporarily stored.

Path('data')
acquisition_timeout int

Timeout in seconds to wait for acquisition.

10
**kwargs Any

Arbitrary keyword arguments passed to the base class.

{}
Source code in qpi-driver/qpi_driver/executors/quantify.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def __init__(
    self,
    name: str = "quantify",
    quantify_hardware_config: QbloxHardwareCompilationConfig | Path | dict = Path(
        "quantify.hardware.json"
    ),
    quantify_device_config: Path | dict = Path("quantify.device.json"),
    is_dummy: bool = False,
    data_dir: Path = Path("data"),
    acquisition_timeout: int = 10,
    **kwargs: Any,
) -> None:
    """Initialize the QuantifyExecutor.

    Args:
        name: the name of the executor
        quantify_hardware_config: Hardware-layer configuration dictionary, file path, or config as dict.
        quantify_device_config: Device-layer configuration dictionary, file path or config as dict
        is_dummy: If True, uses a dummy Cluster instrument.
        data_dir: Directory to where data is temporarily stored.
        acquisition_timeout: Timeout in seconds to wait for acquisition.
        **kwargs: Arbitrary keyword arguments passed to the base class.
    """
    super().__init__(name, **kwargs)
    set_datadir(data_dir)
    # Clean up any previously registered instruments to avoid name collision errors in QCoDeS
    with suppress(Exception):
        Instrument.close_all()

    self._is_dummy = is_dummy
    self._acquisition_timeout = acquisition_timeout
    hardware_config = _load_quantify_hardware_config(quantify_hardware_config)
    self._hardware_config = hardware_config
    self._device = _load_quantum_device(name=name, config=quantify_device_config)
    self._instrument_coordinator = _load_instrument_coordinator(
        f"{name}_ic", hardware_config=hardware_config, is_dummy=is_dummy
    )
    self._device.hardware_config(hardware_config)
    self._compiler = SerialCompiler(
        name=f"{name}_compiler", quantum_device=self._device
    )

close()

Release resources.

Source code in qpi-driver/qpi_driver/executors/quantify.py
399
400
401
402
403
404
405
406
def close(self) -> None:
    """Release resources."""
    for component in self._instrument_coordinator.components:
        self._instrument_coordinator.remove_component(component.name)
        component.close()

    with suppress(Exception):
        self._instrument_coordinator.close()

execute(payload)

Execute quantum instructions using the Quantify scheduler.

The acquisition protocol is selected based on payload.meas_level:

  • meas_level=0Trace (raw waveform)
  • meas_level=1SSBIntegrationComplex (kerneled IQ)
  • meas_level=2ThresholdedAcquisition if threshold params are configured on the device elements, else SSBIntegrationComplex (software discrimination deferred to process_result()).

Parameters:

Name Type Description Default
payload JobPayload

JobPayload specifying shots, circuits, meas_level, etc.

required

Returns:

Type Description
Dataset

xr.Dataset: Raw acquisition dataset.

Source code in qpi-driver/qpi_driver/executors/quantify.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def execute(self, payload: JobPayload) -> xr.Dataset:
    """Execute quantum instructions using the Quantify scheduler.

    The acquisition protocol is selected based on ``payload.meas_level``:

    * ``meas_level=0`` → ``Trace`` (raw waveform)
    * ``meas_level=1`` → ``SSBIntegrationComplex`` (kerneled IQ)
    * ``meas_level=2`` → ``ThresholdedAcquisition`` if threshold params
      are configured on the device elements, else ``SSBIntegrationComplex``
      (software discrimination deferred to ``process_result()``).

    Args:
        payload: JobPayload specifying shots, circuits, meas_level, etc.

    Returns:
        xr.Dataset: Raw acquisition dataset.
    """
    # Parse QASM circuit
    circuit = load_qasm(payload.qasm)
    shots = payload.shots
    n_qubits = circuit.num_qubits

    # Determine acquisition protocol and parameters from meas_level and payload overrides
    acq_protocol, acq_kwargs = self._resolve_acq_protocol(payload)

    # Translate Qiskit QuantumCircuit to Quantify Schedule
    schedule = Schedule(name=payload.id, repetitions=shots)
    acq_indices = {}

    for instruction in circuit.data:
        parsed_ops = _to_quantify_gates(
            circuit=circuit,
            instruction=instruction,
            acq_indices=acq_indices,
            acq_protocol=acq_protocol,
            acq_kwargs=acq_kwargs,
        )
        import qiskit

        is_parallel_op = isinstance(
            instruction.operation,
            (qiskit_library.Measure, qiskit.circuit.Delay, qiskit_library.Barrier),
        )

        if is_parallel_op and parsed_ops:
            first_op = schedule.add(parsed_ops[0])
            for op in parsed_ops[1:]:
                schedule.add(op, ref_op=first_op, ref_pt="start")
        else:
            for op in parsed_ops:
                schedule.add(op)

    compiled_sched = self._compiler.compile(schedule=schedule)

    self._instrument_coordinator.prepare(compiled_sched)
    self._instrument_coordinator.start()
    self._instrument_coordinator.wait_done(timeout_sec=self._acquisition_timeout)
    dataset = self._instrument_coordinator.retrieve_acquisition()
    dataset.attrs.update(
        {
            "shots": shots,
            "n_qubits": n_qubits,
            "backend": self.name,
            "meas_level": payload.meas_level,
            "meas_return": payload.meas_return,
            "acq_protocol": acq_protocol,
        }
    )
    if "acq_rotation" in acq_kwargs:
        dataset.attrs["acq_rotation"] = acq_kwargs["acq_rotation"]
    if "acq_threshold" in acq_kwargs:
        dataset.attrs["acq_threshold"] = acq_kwargs["acq_threshold"]
    return dataset

process_result(dataset, job_id)

Convert a quantify-scheduler acquisition dataset into a Qiskit-compatible result dict.

Handles all meas_levels: - meas_level=0 (Trace): Returns raw complex waveform data as [[real, imag], ...] per time sample. - meas_level=1 (SSBIntegrationComplex): Returns IQ values as [[real, imag]] per shot per qubit. - meas_level=2 with ThresholdedAcquisition: Aggregates 0/1 values into counts dict. - meas_level=2 with SSBIntegrationComplex: Performs software discrimination using acq_threshold and acq_rotation from the device config.

Parameters:

Name Type Description Default
dataset Dataset

xr.Dataset from execute().

required
job_id str

Unique job ID.

required

Returns:

Name Type Description
dict dict

Qiskit-compatible result dict.

Source code in qpi-driver/qpi_driver/executors/quantify.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def process_result(self, dataset: xr.Dataset, job_id: str) -> dict:
    """Convert a quantify-scheduler acquisition dataset into a Qiskit-compatible result dict.

    Handles all meas_levels:
    - meas_level=0 (Trace): Returns raw complex waveform data as [[real, imag], ...] per time sample.
    - meas_level=1 (SSBIntegrationComplex): Returns IQ values as [[real, imag]] per shot per qubit.
    - meas_level=2 with ThresholdedAcquisition: Aggregates 0/1 values into counts dict.
    - meas_level=2 with SSBIntegrationComplex: Performs software discrimination using
      acq_threshold and acq_rotation from the device config.

    Args:
        dataset: xr.Dataset from execute().
        job_id: Unique job ID.

    Returns:
        dict: Qiskit-compatible result dict.
    """
    from qpi_driver.executors.utils.result import build_qiskit_result

    meas_level = int(dataset.attrs.get("meas_level", 2))
    meas_return = str(dataset.attrs.get("meas_return", "single"))
    acq_protocol = str(dataset.attrs.get("acq_protocol", "SSBIntegrationComplex"))
    backend = dataset.attrs.get("backend", self.name)

    # Handle multi-circuit datasets
    if "circuit_index" in dataset.dims:
        circuit_results = []
        for ci in range(dataset.sizes["circuit_index"]):
            sub_ds = dataset.isel(circuit_index=ci)
            sub_ds.attrs.update(dataset.attrs)
            circuit_results.append(
                self._single_dataset_to_result(
                    sub_ds, meas_level, meas_return, acq_protocol
                )
            )
        return build_qiskit_result(circuit_results, job_id, backend)

    single = self._single_dataset_to_result(
        dataset, meas_level, meas_return, acq_protocol
    )
    return build_qiskit_result([single], job_id, backend)

run_driver(qpi_addr='http://127.0.0.1:8090', token='', name='qpu_sim_01', executor='mock', custom_executors=None, data_dir=Path('bin/data'), ca_fingerprint='', ca_file_path=Path('./bin/qpi.ca.pem'), **executor_options)

Run the QPI Python hardware driver.

Parameters:

Name Type Description Default
qpi_addr str

Full URL of the QPI orchestrator (e.g. "http://localhost:8090" or "https://qpi.example.com").

'http://127.0.0.1:8090'
token str

QPU access token.

''
name str

Human-readable name for this QPU.

'qpu_sim_01'
executor str | type[Executor] | Executor

Executor specification (string key, class, or instance).

'mock'
custom_executors dict[str, type[Executor]] | None

Optional dict of custom executors for resolving string keys.

None
data_dir Path

Directory for executor working data.

Path('bin/data')
ca_fingerprint str

the fingerprint to verify that the downloaded CA file is the right one

''
ca_file_path Path

Path to the CA certificate file for TLS connections.

Path('./bin/qpi.ca.pem')
executor_options Any

other options to pass to the executor.

{}
Source code in qpi-driver/qpi_driver/driver.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
def run_driver(
    qpi_addr: str = "http://127.0.0.1:8090",
    token: str = "",
    name: str = "qpu_sim_01",
    executor: str | type[Executor] | Executor = "mock",
    custom_executors: dict[str, type[Executor]] | None = None,
    data_dir: Path = Path("bin/data"),
    ca_fingerprint: str = "",
    ca_file_path: Path = Path("./bin/qpi.ca.pem"),
    **executor_options: Any,
) -> None:
    """Run the QPI Python hardware driver.

    Args:
        qpi_addr: Full URL of the QPI orchestrator
            (e.g. ``"http://localhost:8090"`` or ``"https://qpi.example.com"``).
        token: QPU access token.
        name: Human-readable name for this QPU.
        executor: Executor specification (string key, class, or instance).
        custom_executors: Optional dict of custom executors for resolving string keys.
        data_dir: Directory for executor working data.
        ca_fingerprint: the fingerprint to verify that the downloaded CA file is the right one
        ca_file_path: Path to the CA certificate file for TLS connections.
        executor_options: other options to pass to the executor.
    """
    qpi_addr = _normalize_qpi_addr(qpi_addr)

    # Determine executor type string for registration
    executor_type_str = ""
    if isinstance(executor, str):
        executor_type_str = executor
    elif hasattr(executor, "name"):
        executor_type_str = f"{executor.name}"
    elif hasattr(executor, "__name__"):
        executor_type_str = executor.__name__

    # Extract device config from executor options if present
    device_config = executor_options.get("device_config")
    if device_config is None:
        # Try to build a minimal config from known options
        cfg: dict[str, Any] = {}
        for key in ("quantify_hardware_config", "quantify_device_config", "is_dummy"):
            if key in executor_options:
                val = executor_options[key]
                if hasattr(val, "__fspath__"):
                    cfg[key] = str(val)
                else:
                    cfg[key] = val
        if cfg:
            device_config = cfg

    # do_handshake returns strongly typed dataclass
    info = do_handshake(
        qpi_addr,
        token,
        name,
        executor_type=executor_type_str,
        device_config=device_config,
    )
    nng_host = _extract_host(qpi_addr)
    cmd_port = info.nng_command_port
    res_port = info.nng_result_port

    # Create queues
    job_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()

    # add extra args to be passed to the executor
    executor_options.update(dict(name=name))

    # Start Worker Process
    worker = multiprocessing.Process(
        target=execute_job,
        kwargs={
            "job_queue": job_queue,
            "result_queue": result_queue,
            "executor": executor,
            "custom_executors": custom_executors,
            "data_dir": data_dir,
            **executor_options,
        },
        name="QPI-Worker",
        daemon=True,
    )
    worker.start()

    # Start Result Sender Process
    result_sender = multiprocessing.Process(
        target=send_results,
        kwargs={
            "result_queue": result_queue,
            "res_port": res_port,
            "nng_host": nng_host,
            "qpi_addr": qpi_addr,
            "ca_fingerprint": ca_fingerprint,
            "ca_file_path": ca_file_path,
        },
        name="QPI-ResultSender",
        daemon=True,
    )
    result_sender.start()

    # 4. Start NNG PULL (commands) in Main Process
    addr = f"tls+tcp://{nng_host}:{cmd_port}"
    log.info("Connecting NNG PULL → %s", addr)
    tls_config = _get_tls_config(
        qpi_addr, ca_fingerprint=ca_fingerprint, ca_file_path=ca_file_path
    )

    try:
        with pynng.Pull0(tls_config=tls_config) as sock:
            sock.dial(addr, block=True)
            log.info("NNG PULL connected to %s", addr)
            while True:
                try:
                    raw = sock.recv()
                    job = json.loads(raw)
                    log.info("Received job %s", job.get("job_id"))
                    job_queue.put(job)
                except Exception as exc:
                    log.error("PULL error: %s", exc)
                    # Retry with 0.2s delay for faster reconnection times
                    time.sleep(0.2)
    except KeyboardInterrupt:
        log.info("Shutdown signal received")
    finally:
        log.info("Shutting down worker and result sender processes...")
        # Note: We use poison pills (None sentinel values) to cleanly and instantly unblock
        # queue get() calls in the worker and result sender processes without wasteful CPU polling.
        try:
            job_queue.put(None)
        except Exception:
            pass
        try:
            result_queue.put(None)
        except Exception:
            pass

        # Give processes time to shut down cleanly, or terminate them
        worker.join(timeout=2)
        if worker.is_alive():
            log.warning("Terminating worker process...")
            worker.terminate()
            worker.join()

        result_sender.join(timeout=2)
        if result_sender.is_alive():
            log.warning("Terminating result sender process...")
            result_sender.terminate()
            result_sender.join()

        log.info("Shutdown complete.")