Skip to content

QPI Python Client API Reference

qpi_client

QPI Client SDK for Python.

A client library for interacting with the QPI quantum computing platform. Provides both a low-level HTTP client and Qiskit-compatible backend/job classes.

Usage::

from qpi_client import QPIClient

# Low-level client
client = QPIClient("http://localhost:8090", api_token="my-token")
job_id = client.submit_job([{"circuit": "OPENQASM 3.0; ..."}])

# Qiskit integration (preferred)
backend = client.get_backend(name="mock")
job = backend.run(circuit=my_circuit, shots=1024)
result = job.result()

# Or submit raw QASM
job = backend.run(qasm="OPENQASM 3.0; ...", params=[[0.5]])
result = job.result()

# Retrieve a past job
past_job = client.job(job_id)
past_job = backend.job(job_id)

QPIBackend

Bases: BackendV2

A Qiskit BackendV2 that submits circuits to the QPI orchestrator.

Parameters:

Name Type Description Default
client QPIClient

An authenticated :class:QPIClient instance.

required
name str

Human-readable backend name (default "qpi").

'qpi'
**kwargs Any

Forwarded to :class:BackendV2.__init__.

{}
Source code in qpi-client/py/qpi_client/provider.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
class QPIBackend(BackendV2):
    """A Qiskit ``BackendV2`` that submits circuits to the QPI orchestrator.

    Args:
        client: An authenticated :class:`QPIClient` instance.
        name: Human-readable backend name (default ``"qpi"``).
        **kwargs: Forwarded to :class:`BackendV2.__init__`.
    """

    def __init__(
        self,
        client: QPIClient,
        name: str = "qpi",
        **kwargs: Any,
    ) -> None:
        super().__init__(name=name, **kwargs)
        self._client = client
        self._num_qubits = self._resolve_num_qubits(name)
        self._target = Target(num_qubits=self._num_qubits)
        self._options = Options()
        self._options.update_options(
            shots=1024,
            meas_level=2,
            meas_return="single",
        )

    def _resolve_num_qubits(self, name: str) -> int:
        """Query the orchestrator for QPU info and return its num_qubits.

        Raises:
            RuntimeError: If the QPU cannot be found or has no valid num_qubits.
        """
        qpu = self._client.get_qpu(name)
        try:
            return int(qpu["num_qubits"])
        except (TypeError, KeyError) as exp:
            raise RuntimeError(
                f"QPU '{name}' has no valid num_qubits (got {qpu.get('num_qubits')!r})"
            ) from exp

    # -- BackendV2 required properties ---------------------------------------

    @property
    def target(self) -> Target:
        """Return the transpiler :class:`Target` for this backend."""
        return self._target

    @property
    def max_circuits(self) -> int | None:
        """No server-side limit on the number of circuits per job."""
        return None

    @classmethod
    def _default_options(cls) -> Options:
        return Options(shots=1024, meas_level=2, meas_return="single")

    # -- execution -----------------------------------------------------------

    def run(
        self,
        circuit: QuantumCircuit | Sequence[QuantumCircuit] | None = None,
        qasm: str | None = None,
        shots: int = 1024,
        meas_level: int = 2,
        meas_return: str = "single",
        parameter_values: list[list[float]] | list[dict[Any, float]] | None = None,
        **kwargs: Any,
    ) -> QPIJob:
        """Submit a quantum job to QPI.

        Exactly one of ``circuit`` or ``qasm`` must be provided.

        Args:
            circuit: A single :class:`QuantumCircuit` or a list thereof.
            qasm: A raw OpenQASM string (alternative to ``circuit``).
            shots: Number of shots.
            meas_level: Measurement level (``2`` = classified bits).
            meas_return: ``"single"`` or ``"avg"``.
            parameter_values: Parameter bindings.  For circuits this may be a
                list of dicts mapping :class:`Parameter` objects to floats.
                For raw QASM this should be a list of lists
                (``[[0.5, 1.0]]``).

        Returns:
            A :class:`QPIJob` handle that can be polled or awaited.

        Raises:
            ValueError: If neither or both of ``circuit`` and ``qasm`` are
                supplied.
        """
        if circuit is None and qasm is None:
            raise ValueError("Either 'circuit' or 'qasm' must be provided")
        if circuit is not None and qasm is not None:
            raise ValueError("Only one of 'circuit' or 'qasm' should be provided")

        pv = parameter_values
        circuit_payloads: list[dict[str, Any]] = []

        if circuit is not None:
            if isinstance(circuit, QuantumCircuit):
                circuits = [circuit]
            else:
                circuits = list(circuit)

            # Grow the transpiler target if the circuit is larger than expected
            max_qubits = max(qc.num_qubits for qc in circuits)
            if max_qubits > self._num_qubits:
                self._num_qubits = max_qubits
                self._target = Target(num_qubits=max_qubits)

            for idx, qc in enumerate(circuits):
                if pv and idx < len(pv):
                    pval = pv[idx]
                    if isinstance(pval, dict) and pval:
                        bound_qc = qc.assign_parameters(pval)
                        qasm_str = qasm3_dumps(bound_qc)
                        ordered_values = [float(pval[p]) for p in qc.parameters]
                        circuit_payloads.append(
                            {
                                "circuit": qasm_str,
                                "parameter_values": [ordered_values],
                            }
                        )
                        continue

                qasm_str = qasm3_dumps(qc)
                circuit_payloads.append({"circuit": qasm_str})
        else:
            payload: dict[str, Any] = {"circuit": qasm}
            if pv:
                # Normalise single list to list of lists
                if isinstance(pv[0], (int, float)):
                    pv = [pv]  # type: ignore[assignment]
                payload["parameter_values"] = pv
            circuit_payloads.append(payload)

        job_id = self._client.submit_job(
            circuits=circuit_payloads,
            shots=shots,
            meas_level=meas_level,
            meas_return=meas_return,
        )
        return QPIJob(self, job_id, self._client)

    def job(self, job_id: str) -> QPIJob:
        """Retrieve an existing job by ID.

        Args:
            job_id: The server-assigned job ID.

        Returns:
            A :class:`QPIJob` bound to this backend.
        """
        return QPIJob(self, job_id, self._client)

max_circuits property

No server-side limit on the number of circuits per job.

target property

Return the transpiler :class:Target for this backend.

job(job_id)

Retrieve an existing job by ID.

Parameters:

Name Type Description Default
job_id str

The server-assigned job ID.

required

Returns:

Name Type Description
A QPIJob

class:QPIJob bound to this backend.

Source code in qpi-client/py/qpi_client/provider.py
354
355
356
357
358
359
360
361
362
363
def job(self, job_id: str) -> QPIJob:
    """Retrieve an existing job by ID.

    Args:
        job_id: The server-assigned job ID.

    Returns:
        A :class:`QPIJob` bound to this backend.
    """
    return QPIJob(self, job_id, self._client)

run(circuit=None, qasm=None, shots=1024, meas_level=2, meas_return='single', parameter_values=None, **kwargs)

Submit a quantum job to QPI.

Exactly one of circuit or qasm must be provided.

Parameters:

Name Type Description Default
circuit QuantumCircuit | Sequence[QuantumCircuit] | None

A single :class:QuantumCircuit or a list thereof.

None
qasm str | None

A raw OpenQASM string (alternative to circuit).

None
shots int

Number of shots.

1024
meas_level int

Measurement level (2 = classified bits).

2
meas_return str

"single" or "avg".

'single'
parameter_values list[list[float]] | list[dict[Any, float]] | None

Parameter bindings. For circuits this may be a list of dicts mapping :class:Parameter objects to floats. For raw QASM this should be a list of lists ([[0.5, 1.0]]).

None

Returns:

Name Type Description
A QPIJob

class:QPIJob handle that can be polled or awaited.

Raises:

Type Description
ValueError

If neither or both of circuit and qasm are supplied.

Source code in qpi-client/py/qpi_client/provider.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def run(
    self,
    circuit: QuantumCircuit | Sequence[QuantumCircuit] | None = None,
    qasm: str | None = None,
    shots: int = 1024,
    meas_level: int = 2,
    meas_return: str = "single",
    parameter_values: list[list[float]] | list[dict[Any, float]] | None = None,
    **kwargs: Any,
) -> QPIJob:
    """Submit a quantum job to QPI.

    Exactly one of ``circuit`` or ``qasm`` must be provided.

    Args:
        circuit: A single :class:`QuantumCircuit` or a list thereof.
        qasm: A raw OpenQASM string (alternative to ``circuit``).
        shots: Number of shots.
        meas_level: Measurement level (``2`` = classified bits).
        meas_return: ``"single"`` or ``"avg"``.
        parameter_values: Parameter bindings.  For circuits this may be a
            list of dicts mapping :class:`Parameter` objects to floats.
            For raw QASM this should be a list of lists
            (``[[0.5, 1.0]]``).

    Returns:
        A :class:`QPIJob` handle that can be polled or awaited.

    Raises:
        ValueError: If neither or both of ``circuit`` and ``qasm`` are
            supplied.
    """
    if circuit is None and qasm is None:
        raise ValueError("Either 'circuit' or 'qasm' must be provided")
    if circuit is not None and qasm is not None:
        raise ValueError("Only one of 'circuit' or 'qasm' should be provided")

    pv = parameter_values
    circuit_payloads: list[dict[str, Any]] = []

    if circuit is not None:
        if isinstance(circuit, QuantumCircuit):
            circuits = [circuit]
        else:
            circuits = list(circuit)

        # Grow the transpiler target if the circuit is larger than expected
        max_qubits = max(qc.num_qubits for qc in circuits)
        if max_qubits > self._num_qubits:
            self._num_qubits = max_qubits
            self._target = Target(num_qubits=max_qubits)

        for idx, qc in enumerate(circuits):
            if pv and idx < len(pv):
                pval = pv[idx]
                if isinstance(pval, dict) and pval:
                    bound_qc = qc.assign_parameters(pval)
                    qasm_str = qasm3_dumps(bound_qc)
                    ordered_values = [float(pval[p]) for p in qc.parameters]
                    circuit_payloads.append(
                        {
                            "circuit": qasm_str,
                            "parameter_values": [ordered_values],
                        }
                    )
                    continue

            qasm_str = qasm3_dumps(qc)
            circuit_payloads.append({"circuit": qasm_str})
    else:
        payload: dict[str, Any] = {"circuit": qasm}
        if pv:
            # Normalise single list to list of lists
            if isinstance(pv[0], (int, float)):
                pv = [pv]  # type: ignore[assignment]
            payload["parameter_values"] = pv
        circuit_payloads.append(payload)

    job_id = self._client.submit_job(
        circuits=circuit_payloads,
        shots=shots,
        meas_level=meas_level,
        meas_return=meas_return,
    )
    return QPIJob(self, job_id, self._client)

QPIClient

Low-level HTTP wrapper for the QPI orchestrator API.

Parameters:

Name Type Description Default
base_url str

Root URL of the QPI orchestrator (e.g. "http://localhost:8090").

required
api_token str | None

Optional API token used for authentication via the X-API-Token header. When None, no token header is sent (useful for cookie/JWT-based auth in browser contexts).

None
Source code in qpi-client/py/qpi_client/client.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
class QPIClient:
    """Low-level HTTP wrapper for the QPI orchestrator API.

    Args:
        base_url: Root URL of the QPI orchestrator (e.g. ``"http://localhost:8090"``).
        api_token: Optional API token used for authentication via the
            ``X-API-Token`` header. When *None*, no token header is sent
            (useful for cookie/JWT-based auth in browser contexts).
    """

    def __init__(self, base_url: str, api_token: str | None = None) -> None:
        self.base_url: str = base_url.rstrip("/")
        self.api_token: str | None = api_token
        self._session: requests.Session = requests.Session()
        self._session.headers["Content-Type"] = "application/json"
        if api_token:
            self._session.headers["X-API-Token"] = api_token

    # -- public API ----------------------------------------------------------

    def submit_job(
        self,
        circuits: list[dict[str, Any]],
        shots: int = 1024,
        meas_level: int = 2,
        meas_return: str = "single",
        qpu_target: str = "",
    ) -> str:
        """Submit a quantum job to the orchestrator.

        Args:
            circuits: A list of circuit payload dicts.  Each dict **must**
                contain a ``"circuit"`` key whose value is an OpenQASM 3
                string.  Optional keys: ``"parameter_values"``, ``"shots"``.
            shots: Default number of shots for every circuit.
            meas_level: Measurement level (``2`` = classified bits).
            meas_return: ``"single"`` or ``"avg"``.
            qpu_target: Optional QPU routing hint.

        Returns:
            The server-assigned job ID as a string.

        Raises:
            requests.HTTPError: If the server returns a non-2xx status.
        """
        payload: dict[str, Any] = {
            "circuits": circuits,
            "shots": shots,
            "meas_level": meas_level,
            "meas_return": meas_return,
        }
        if qpu_target:
            payload["qpu_target"] = qpu_target

        resp = self._session.post(f"{self.base_url}/api/jobs", json=payload)
        resp.raise_for_status()
        data = resp.json()

        # The orchestrator may return the ID at the top level or nested.
        job_id: str = data.get("id") or data.get("job_id", "")
        if not job_id:
            raise ValueError(f"Server response did not contain a job ID: {data!r}")
        return job_id

    def get_job(self, job_id: str) -> dict[str, Any]:
        """Retrieve full details for *job_id*.

        Returns:
            A dict with at least ``"id"``, ``"status"``, ``"payload"``,
            ``"results"``, ``"created"``, and ``"updated"`` keys.

        Raises:
            requests.HTTPError: If the server returns a non-2xx status.
        """
        resp = self._session.get(f"{self.base_url}/api/jobs/{job_id}")
        resp.raise_for_status()
        return resp.json()

    def list_jobs(self) -> list[dict[str, Any]]:
        """List all jobs belonging to the authenticated user.

        Returns:
            A list of job-record dicts.

        Raises:
            requests.HTTPError: If the server returns a non-2xx status.
        """
        resp = self._session.get(f"{self.base_url}/api/jobs")
        resp.raise_for_status()
        data = resp.json()
        # The response might be a bare list or wrapped in {"jobs": [...]}.
        if isinstance(data, list):
            return data
        return data.get("jobs", [])

    def cancel_job(self, job_id: str) -> dict[str, Any]:
        """Request cancellation of *job_id*.

        Returns:
            The updated job-record dict.

        Raises:
            requests.HTTPError: If the server returns a non-2xx status.
        """
        resp = self._session.post(f"{self.base_url}/api/jobs/{job_id}/cancel")
        resp.raise_for_status()
        return resp.json()

    # -- high-level helpers --------------------------------------------------

    def get_backend(self, name: str = "qpi") -> "QPIBackend":
        """Return a :class:`QPIBackend` handle for the named QPU.

        Args:
            name: Backend / QPU name (e.g. ``"mock"``, ``"qiskit_aer"``).

        Returns:
            A configured :class:`QPIBackend` instance bound to this client.
        """
        from qpi_client.provider import QPIBackend

        return QPIBackend(self, name=name)

    def job(self, job_id: str) -> "QPIJob":
        """Retrieve an existing job by ID.

        Args:
            job_id: The server-assigned job ID.

        Returns:
            A :class:`QPIJob` handle (backend will be *None*).
        """
        from qpi_client.provider import QPIJob

        return QPIJob(backend=None, job_id=job_id, client=self)

    # -- QPU discovery -------------------------------------------------------

    def list_qpus(self) -> list[dict[str, Any]]:
        """List all online QPUs.

        Returns:
            A list of QPU record dicts.
        """
        resp = self._session.get(f"{self.base_url}/api/qpus")
        resp.raise_for_status()
        return resp.json()

    def get_qpu(self, name: str) -> dict[str, Any]:
        """Retrieve a single QPU by name.

        Args:
            name: The QPU's unique name.

        Returns:
            A QPU record dict.
        """
        resp = self._session.get(f"{self.base_url}/api/qpus/{name}")
        resp.raise_for_status()
        return resp.json()

    # -- QPU Registry & Toggles (admin-only) ---------------------------------

    def create_qpu(
        self,
        name: str,
        executor_type: str | None = None,
        num_qubits: int | None = None,
        enabled: bool | None = None,
    ) -> dict[str, Any]:
        """Create a new QPU record (admin-only).

        The server generates a random ``access_token`` and returns it in
        plain text exactly once; only the hash is persisted.

        Args:
            name: QPU name.
            executor_type: Type of executor.
            num_qubits: Number of qubits on the device.
            enabled: Whether the QPU should be enabled (default ``True``).

        Returns:
            A dict containing at least ``id``, ``name``, ``access_token``,
            ``executor_type``, ``status``, and ``enabled``.
        """
        payload: dict[str, Any] = {"name": name}
        if executor_type is not None:
            payload["executor_type"] = executor_type
        if num_qubits is not None:
            payload["num_qubits"] = num_qubits
        if enabled is not None:
            payload["enabled"] = enabled

        resp = self._session.post(f"{self.base_url}/api/op/qpus/create", json=payload)
        resp.raise_for_status()
        return resp.json()

    def connect_qpu(
        self,
        name: str,
        access_token: str,
        executor_type: str | None = None,
        device_config: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        """Connect a QPU driver node.

        Args:
            name: QPU name.
            access_token: The access token for the QPU.
            executor_type: Type of executor.
            device_config: Configuration dict for the device.

        Returns:
            The connection response dict with NNG port assignments.
        """
        payload: dict[str, Any] = {
            "name": name,
            "access_token": access_token,
        }
        if executor_type is not None:
            payload["executor_type"] = executor_type
        if device_config is not None:
            payload["device_config"] = device_config

        resp = self._session.post(f"{self.base_url}/api/op/qpus/connect", json=payload)
        resp.raise_for_status()
        return resp.json()

    def toggle_qpu(self, qpu_id: str, enabled: bool) -> dict[str, Any]:
        """Toggle QPU driver state (admin-only).

        Args:
            qpu_id: ID of the QPU.
            enabled: Whether the QPU should be enabled.

        Returns:
            Response dict.
        """
        resp = self._session.post(
            f"{self.base_url}/api/op/qpu/toggle",
            json={"id": qpu_id, "enabled": enabled},
        )
        resp.raise_for_status()
        return resp.json()

    # -- Notifications -------------------------------------------------------

    def list_notifications(self) -> list[dict[str, Any]]:
        """List notifications visible to the authenticated user.

        Returns:
            A list of notification record dicts.
        """
        resp = self._session.get(
            f"{self.base_url}/api/collections/notifications/records"
        )
        resp.raise_for_status()
        data = resp.json()
        if isinstance(data, list):
            return data
        return data.get("items", [])

    def dismiss_notification(self, notification_id: str) -> dict[str, Any]:
        """Dismiss a notification for the authenticated user.

        Args:
            notification_id: ID of the notification.

        Returns:
            Response dict.
        """
        resp = self._session.post(
            f"{self.base_url}/api/notifications/{notification_id}/dismiss"
        )
        resp.raise_for_status()
        return resp.json()

    # -- Booking Slots (time_slots) ------------------------------------------

    def list_time_slots(self) -> list[dict[str, Any]]:
        """List all booking slots.

        Returns:
            A list of booking slot record dicts.
        """
        resp = self._session.get(f"{self.base_url}/api/collections/time_slots/records")
        resp.raise_for_status()
        data = resp.json()
        return data.get("items", [])

    def create_time_slot(
        self,
        start_time: str,
        end_time: str,
        booked_by: str | None = None,
    ) -> dict[str, Any]:
        """Create a new booking slot.

        Args:
            start_time: Start time RFC3339 string.
            end_time: End time RFC3339 string.
            booked_by: Optional ID of the user booking the slot.

        Returns:
            The created booking slot dict.
        """
        payload = {"start_time": start_time, "end_time": end_time}
        if booked_by is not None:
            payload["booked_by"] = booked_by
        resp = self._session.post(
            f"{self.base_url}/api/collections/time_slots/records", json=payload
        )
        resp.raise_for_status()
        return resp.json()

    def update_time_slot(
        self,
        slot_id: str,
        start_time: str | None = None,
        end_time: str | None = None,
    ) -> dict[str, Any]:
        """Update an existing booking slot.

        Args:
            slot_id: ID of the booking slot.
            start_time: Optional start time RFC3339 string.
            end_time: Optional end time RFC3339 string.

        Returns:
            The updated booking slot dict.
        """
        payload = {}
        if start_time is not None:
            payload["start_time"] = start_time
        if end_time is not None:
            payload["end_time"] = end_time
        resp = self._session.patch(
            f"{self.base_url}/api/collections/time_slots/records/{slot_id}",
            json=payload,
        )
        resp.raise_for_status()
        return resp.json()

    def delete_time_slot(self, slot_id: str) -> None:
        """Delete a booking slot.

        Args:
            slot_id: ID of the booking slot.
        """
        resp = self._session.delete(
            f"{self.base_url}/api/collections/time_slots/records/{slot_id}"
        )
        resp.raise_for_status()

    # -- QPU Time Requests ---------------------------------------------------

    def list_time_requests(self) -> list[dict[str, Any]]:
        """List QPU time requests.

        Returns:
            A list of QPU time request record dicts.
        """
        resp = self._session.get(
            f"{self.base_url}/api/collections/qpu_time_requests/records"
        )
        resp.raise_for_status()
        data = resp.json()
        return data.get("items", [])

    def create_time_request(
        self, seconds: int, requested_reason: str | None = None
    ) -> dict[str, Any]:
        """Create a new QPU time request.

        Args:
            seconds: Requested duration in seconds.
            requested_reason: Optional explanation.

        Returns:
            The created QPU time request record dict.
        """
        payload = {"seconds": seconds}
        if requested_reason is not None:
            payload["requested_reason"] = requested_reason
        resp = self._session.post(
            f"{self.base_url}/api/collections/qpu_time_requests/records", json=payload
        )
        resp.raise_for_status()
        return resp.json()

    def update_time_request(
        self,
        request_id: str,
        status: str,
        rejection_reason: str | None = None,
    ) -> dict[str, Any]:
        """Update/Handle a QPU time request (admin-only).

        Args:
            request_id: ID of the time request.
            status: "approved" or "rejected".
            rejection_reason: Optional explanation if rejected.

        Returns:
            The updated request record dict.
        """
        payload = {"status": status}
        if rejection_reason is not None:
            payload["rejection_reason"] = rejection_reason
        resp = self._session.patch(
            f"{self.base_url}/api/collections/qpu_time_requests/records/{request_id}",
            json=payload,
        )
        resp.raise_for_status()
        return resp.json()

    # -- Admin User Management -----------------------------------------------

    def list_users(self) -> list[dict[str, Any]]:
        """List all registered users (admin-only).

        Returns:
            A list of user record dicts.
        """
        resp = self._session.get(f"{self.base_url}/api/collections/users/records")
        resp.raise_for_status()
        data = resp.json()
        return data.get("items", [])

    def allocate_qpu_time(self, user_id: str, seconds: int) -> dict[str, Any]:
        """Allocate QPU time to a user (admin-only).

        Args:
            user_id: ID of the user.
            seconds: Total allocated seconds.

        Returns:
            The updated user record dict.
        """
        resp = self._session.patch(
            f"{self.base_url}/api/collections/users/records/{user_id}",
            json={"qpu_seconds": seconds},
        )
        resp.raise_for_status()
        return resp.json()

    # -- Auth helpers --------------------------------------------------------

    def auth_with_password(self, identity: str, password: str) -> dict[str, Any]:
        """Authenticate as a regular user using email/password.

        Args:
            identity: Email or username.
            password: User password.

        Returns:
            The auth response payload including token and record.
        """
        resp = self._session.post(
            f"{self.base_url}/api/collections/users/auth-with-password",
            json={"identity": identity, "password": password},
        )
        resp.raise_for_status()
        data = resp.json()
        token = data.get("token")
        if token:
            self._session.headers["Authorization"] = f"Bearer {token}"
        return data

    # -- lifecycle -----------------------------------------------------------

    def close(self) -> None:
        """Close the underlying HTTP session."""
        self._session.close()

    def __enter__(self) -> "QPIClient":
        return self

    def __exit__(self, *exc: object) -> None:
        self.close()

allocate_qpu_time(user_id, seconds)

Allocate QPU time to a user (admin-only).

Parameters:

Name Type Description Default
user_id str

ID of the user.

required
seconds int

Total allocated seconds.

required

Returns:

Type Description
dict[str, Any]

The updated user record dict.

Source code in qpi-client/py/qpi_client/client.py
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
def allocate_qpu_time(self, user_id: str, seconds: int) -> dict[str, Any]:
    """Allocate QPU time to a user (admin-only).

    Args:
        user_id: ID of the user.
        seconds: Total allocated seconds.

    Returns:
        The updated user record dict.
    """
    resp = self._session.patch(
        f"{self.base_url}/api/collections/users/records/{user_id}",
        json={"qpu_seconds": seconds},
    )
    resp.raise_for_status()
    return resp.json()

auth_with_password(identity, password)

Authenticate as a regular user using email/password.

Parameters:

Name Type Description Default
identity str

Email or username.

required
password str

User password.

required

Returns:

Type Description
dict[str, Any]

The auth response payload including token and record.

Source code in qpi-client/py/qpi_client/client.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def auth_with_password(self, identity: str, password: str) -> dict[str, Any]:
    """Authenticate as a regular user using email/password.

    Args:
        identity: Email or username.
        password: User password.

    Returns:
        The auth response payload including token and record.
    """
    resp = self._session.post(
        f"{self.base_url}/api/collections/users/auth-with-password",
        json={"identity": identity, "password": password},
    )
    resp.raise_for_status()
    data = resp.json()
    token = data.get("token")
    if token:
        self._session.headers["Authorization"] = f"Bearer {token}"
    return data

cancel_job(job_id)

Request cancellation of job_id.

Returns:

Type Description
dict[str, Any]

The updated job-record dict.

Raises:

Type Description
HTTPError

If the server returns a non-2xx status.

Source code in qpi-client/py/qpi_client/client.py
112
113
114
115
116
117
118
119
120
121
122
123
def cancel_job(self, job_id: str) -> dict[str, Any]:
    """Request cancellation of *job_id*.

    Returns:
        The updated job-record dict.

    Raises:
        requests.HTTPError: If the server returns a non-2xx status.
    """
    resp = self._session.post(f"{self.base_url}/api/jobs/{job_id}/cancel")
    resp.raise_for_status()
    return resp.json()

close()

Close the underlying HTTP session.

Source code in qpi-client/py/qpi_client/client.py
488
489
490
def close(self) -> None:
    """Close the underlying HTTP session."""
    self._session.close()

connect_qpu(name, access_token, executor_type=None, device_config=None)

Connect a QPU driver node.

Parameters:

Name Type Description Default
name str

QPU name.

required
access_token str

The access token for the QPU.

required
executor_type str | None

Type of executor.

None
device_config dict[str, Any] | None

Configuration dict for the device.

None

Returns:

Type Description
dict[str, Any]

The connection response dict with NNG port assignments.

Source code in qpi-client/py/qpi_client/client.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def connect_qpu(
    self,
    name: str,
    access_token: str,
    executor_type: str | None = None,
    device_config: dict[str, Any] | None = None,
) -> dict[str, Any]:
    """Connect a QPU driver node.

    Args:
        name: QPU name.
        access_token: The access token for the QPU.
        executor_type: Type of executor.
        device_config: Configuration dict for the device.

    Returns:
        The connection response dict with NNG port assignments.
    """
    payload: dict[str, Any] = {
        "name": name,
        "access_token": access_token,
    }
    if executor_type is not None:
        payload["executor_type"] = executor_type
    if device_config is not None:
        payload["device_config"] = device_config

    resp = self._session.post(f"{self.base_url}/api/op/qpus/connect", json=payload)
    resp.raise_for_status()
    return resp.json()

create_qpu(name, executor_type=None, num_qubits=None, enabled=None)

Create a new QPU record (admin-only).

The server generates a random access_token and returns it in plain text exactly once; only the hash is persisted.

Parameters:

Name Type Description Default
name str

QPU name.

required
executor_type str | None

Type of executor.

None
num_qubits int | None

Number of qubits on the device.

None
enabled bool | None

Whether the QPU should be enabled (default True).

None

Returns:

Type Description
dict[str, Any]

A dict containing at least id, name, access_token,

dict[str, Any]

executor_type, status, and enabled.

Source code in qpi-client/py/qpi_client/client.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
def create_qpu(
    self,
    name: str,
    executor_type: str | None = None,
    num_qubits: int | None = None,
    enabled: bool | None = None,
) -> dict[str, Any]:
    """Create a new QPU record (admin-only).

    The server generates a random ``access_token`` and returns it in
    plain text exactly once; only the hash is persisted.

    Args:
        name: QPU name.
        executor_type: Type of executor.
        num_qubits: Number of qubits on the device.
        enabled: Whether the QPU should be enabled (default ``True``).

    Returns:
        A dict containing at least ``id``, ``name``, ``access_token``,
        ``executor_type``, ``status``, and ``enabled``.
    """
    payload: dict[str, Any] = {"name": name}
    if executor_type is not None:
        payload["executor_type"] = executor_type
    if num_qubits is not None:
        payload["num_qubits"] = num_qubits
    if enabled is not None:
        payload["enabled"] = enabled

    resp = self._session.post(f"{self.base_url}/api/op/qpus/create", json=payload)
    resp.raise_for_status()
    return resp.json()

create_time_request(seconds, requested_reason=None)

Create a new QPU time request.

Parameters:

Name Type Description Default
seconds int

Requested duration in seconds.

required
requested_reason str | None

Optional explanation.

None

Returns:

Type Description
dict[str, Any]

The created QPU time request record dict.

Source code in qpi-client/py/qpi_client/client.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
def create_time_request(
    self, seconds: int, requested_reason: str | None = None
) -> dict[str, Any]:
    """Create a new QPU time request.

    Args:
        seconds: Requested duration in seconds.
        requested_reason: Optional explanation.

    Returns:
        The created QPU time request record dict.
    """
    payload = {"seconds": seconds}
    if requested_reason is not None:
        payload["requested_reason"] = requested_reason
    resp = self._session.post(
        f"{self.base_url}/api/collections/qpu_time_requests/records", json=payload
    )
    resp.raise_for_status()
    return resp.json()

create_time_slot(start_time, end_time, booked_by=None)

Create a new booking slot.

Parameters:

Name Type Description Default
start_time str

Start time RFC3339 string.

required
end_time str

End time RFC3339 string.

required
booked_by str | None

Optional ID of the user booking the slot.

None

Returns:

Type Description
dict[str, Any]

The created booking slot dict.

Source code in qpi-client/py/qpi_client/client.py
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def create_time_slot(
    self,
    start_time: str,
    end_time: str,
    booked_by: str | None = None,
) -> dict[str, Any]:
    """Create a new booking slot.

    Args:
        start_time: Start time RFC3339 string.
        end_time: End time RFC3339 string.
        booked_by: Optional ID of the user booking the slot.

    Returns:
        The created booking slot dict.
    """
    payload = {"start_time": start_time, "end_time": end_time}
    if booked_by is not None:
        payload["booked_by"] = booked_by
    resp = self._session.post(
        f"{self.base_url}/api/collections/time_slots/records", json=payload
    )
    resp.raise_for_status()
    return resp.json()

delete_time_slot(slot_id)

Delete a booking slot.

Parameters:

Name Type Description Default
slot_id str

ID of the booking slot.

required
Source code in qpi-client/py/qpi_client/client.py
360
361
362
363
364
365
366
367
368
369
def delete_time_slot(self, slot_id: str) -> None:
    """Delete a booking slot.

    Args:
        slot_id: ID of the booking slot.
    """
    resp = self._session.delete(
        f"{self.base_url}/api/collections/time_slots/records/{slot_id}"
    )
    resp.raise_for_status()

dismiss_notification(notification_id)

Dismiss a notification for the authenticated user.

Parameters:

Name Type Description Default
notification_id str

ID of the notification.

required

Returns:

Type Description
dict[str, Any]

Response dict.

Source code in qpi-client/py/qpi_client/client.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def dismiss_notification(self, notification_id: str) -> dict[str, Any]:
    """Dismiss a notification for the authenticated user.

    Args:
        notification_id: ID of the notification.

    Returns:
        Response dict.
    """
    resp = self._session.post(
        f"{self.base_url}/api/notifications/{notification_id}/dismiss"
    )
    resp.raise_for_status()
    return resp.json()

get_backend(name='qpi')

Return a :class:QPIBackend handle for the named QPU.

Parameters:

Name Type Description Default
name str

Backend / QPU name (e.g. "mock", "qiskit_aer").

'qpi'

Returns:

Type Description
'QPIBackend'

A configured :class:QPIBackend instance bound to this client.

Source code in qpi-client/py/qpi_client/client.py
127
128
129
130
131
132
133
134
135
136
137
138
def get_backend(self, name: str = "qpi") -> "QPIBackend":
    """Return a :class:`QPIBackend` handle for the named QPU.

    Args:
        name: Backend / QPU name (e.g. ``"mock"``, ``"qiskit_aer"``).

    Returns:
        A configured :class:`QPIBackend` instance bound to this client.
    """
    from qpi_client.provider import QPIBackend

    return QPIBackend(self, name=name)

get_job(job_id)

Retrieve full details for job_id.

Returns:

Type Description
dict[str, Any]

A dict with at least "id", "status", "payload",

dict[str, Any]

"results", "created", and "updated" keys.

Raises:

Type Description
HTTPError

If the server returns a non-2xx status.

Source code in qpi-client/py/qpi_client/client.py
81
82
83
84
85
86
87
88
89
90
91
92
93
def get_job(self, job_id: str) -> dict[str, Any]:
    """Retrieve full details for *job_id*.

    Returns:
        A dict with at least ``"id"``, ``"status"``, ``"payload"``,
        ``"results"``, ``"created"``, and ``"updated"`` keys.

    Raises:
        requests.HTTPError: If the server returns a non-2xx status.
    """
    resp = self._session.get(f"{self.base_url}/api/jobs/{job_id}")
    resp.raise_for_status()
    return resp.json()

get_qpu(name)

Retrieve a single QPU by name.

Parameters:

Name Type Description Default
name str

The QPU's unique name.

required

Returns:

Type Description
dict[str, Any]

A QPU record dict.

Source code in qpi-client/py/qpi_client/client.py
165
166
167
168
169
170
171
172
173
174
175
176
def get_qpu(self, name: str) -> dict[str, Any]:
    """Retrieve a single QPU by name.

    Args:
        name: The QPU's unique name.

    Returns:
        A QPU record dict.
    """
    resp = self._session.get(f"{self.base_url}/api/qpus/{name}")
    resp.raise_for_status()
    return resp.json()

job(job_id)

Retrieve an existing job by ID.

Parameters:

Name Type Description Default
job_id str

The server-assigned job ID.

required

Returns:

Name Type Description
A 'QPIJob'

class:QPIJob handle (backend will be None).

Source code in qpi-client/py/qpi_client/client.py
140
141
142
143
144
145
146
147
148
149
150
151
def job(self, job_id: str) -> "QPIJob":
    """Retrieve an existing job by ID.

    Args:
        job_id: The server-assigned job ID.

    Returns:
        A :class:`QPIJob` handle (backend will be *None*).
    """
    from qpi_client.provider import QPIJob

    return QPIJob(backend=None, job_id=job_id, client=self)

list_jobs()

List all jobs belonging to the authenticated user.

Returns:

Type Description
list[dict[str, Any]]

A list of job-record dicts.

Raises:

Type Description
HTTPError

If the server returns a non-2xx status.

Source code in qpi-client/py/qpi_client/client.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def list_jobs(self) -> list[dict[str, Any]]:
    """List all jobs belonging to the authenticated user.

    Returns:
        A list of job-record dicts.

    Raises:
        requests.HTTPError: If the server returns a non-2xx status.
    """
    resp = self._session.get(f"{self.base_url}/api/jobs")
    resp.raise_for_status()
    data = resp.json()
    # The response might be a bare list or wrapped in {"jobs": [...]}.
    if isinstance(data, list):
        return data
    return data.get("jobs", [])

list_notifications()

List notifications visible to the authenticated user.

Returns:

Type Description
list[dict[str, Any]]

A list of notification record dicts.

Source code in qpi-client/py/qpi_client/client.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def list_notifications(self) -> list[dict[str, Any]]:
    """List notifications visible to the authenticated user.

    Returns:
        A list of notification record dicts.
    """
    resp = self._session.get(
        f"{self.base_url}/api/collections/notifications/records"
    )
    resp.raise_for_status()
    data = resp.json()
    if isinstance(data, list):
        return data
    return data.get("items", [])

list_qpus()

List all online QPUs.

Returns:

Type Description
list[dict[str, Any]]

A list of QPU record dicts.

Source code in qpi-client/py/qpi_client/client.py
155
156
157
158
159
160
161
162
163
def list_qpus(self) -> list[dict[str, Any]]:
    """List all online QPUs.

    Returns:
        A list of QPU record dicts.
    """
    resp = self._session.get(f"{self.base_url}/api/qpus")
    resp.raise_for_status()
    return resp.json()

list_time_requests()

List QPU time requests.

Returns:

Type Description
list[dict[str, Any]]

A list of QPU time request record dicts.

Source code in qpi-client/py/qpi_client/client.py
373
374
375
376
377
378
379
380
381
382
383
384
def list_time_requests(self) -> list[dict[str, Any]]:
    """List QPU time requests.

    Returns:
        A list of QPU time request record dicts.
    """
    resp = self._session.get(
        f"{self.base_url}/api/collections/qpu_time_requests/records"
    )
    resp.raise_for_status()
    data = resp.json()
    return data.get("items", [])

list_time_slots()

List all booking slots.

Returns:

Type Description
list[dict[str, Any]]

A list of booking slot record dicts.

Source code in qpi-client/py/qpi_client/client.py
296
297
298
299
300
301
302
303
304
305
def list_time_slots(self) -> list[dict[str, Any]]:
    """List all booking slots.

    Returns:
        A list of booking slot record dicts.
    """
    resp = self._session.get(f"{self.base_url}/api/collections/time_slots/records")
    resp.raise_for_status()
    data = resp.json()
    return data.get("items", [])

list_users()

List all registered users (admin-only).

Returns:

Type Description
list[dict[str, Any]]

A list of user record dicts.

Source code in qpi-client/py/qpi_client/client.py
435
436
437
438
439
440
441
442
443
444
def list_users(self) -> list[dict[str, Any]]:
    """List all registered users (admin-only).

    Returns:
        A list of user record dicts.
    """
    resp = self._session.get(f"{self.base_url}/api/collections/users/records")
    resp.raise_for_status()
    data = resp.json()
    return data.get("items", [])

submit_job(circuits, shots=1024, meas_level=2, meas_return='single', qpu_target='')

Submit a quantum job to the orchestrator.

Parameters:

Name Type Description Default
circuits list[dict[str, Any]]

A list of circuit payload dicts. Each dict must contain a "circuit" key whose value is an OpenQASM 3 string. Optional keys: "parameter_values", "shots".

required
shots int

Default number of shots for every circuit.

1024
meas_level int

Measurement level (2 = classified bits).

2
meas_return str

"single" or "avg".

'single'
qpu_target str

Optional QPU routing hint.

''

Returns:

Type Description
str

The server-assigned job ID as a string.

Raises:

Type Description
HTTPError

If the server returns a non-2xx status.

Source code in qpi-client/py/qpi_client/client.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def submit_job(
    self,
    circuits: list[dict[str, Any]],
    shots: int = 1024,
    meas_level: int = 2,
    meas_return: str = "single",
    qpu_target: str = "",
) -> str:
    """Submit a quantum job to the orchestrator.

    Args:
        circuits: A list of circuit payload dicts.  Each dict **must**
            contain a ``"circuit"`` key whose value is an OpenQASM 3
            string.  Optional keys: ``"parameter_values"``, ``"shots"``.
        shots: Default number of shots for every circuit.
        meas_level: Measurement level (``2`` = classified bits).
        meas_return: ``"single"`` or ``"avg"``.
        qpu_target: Optional QPU routing hint.

    Returns:
        The server-assigned job ID as a string.

    Raises:
        requests.HTTPError: If the server returns a non-2xx status.
    """
    payload: dict[str, Any] = {
        "circuits": circuits,
        "shots": shots,
        "meas_level": meas_level,
        "meas_return": meas_return,
    }
    if qpu_target:
        payload["qpu_target"] = qpu_target

    resp = self._session.post(f"{self.base_url}/api/jobs", json=payload)
    resp.raise_for_status()
    data = resp.json()

    # The orchestrator may return the ID at the top level or nested.
    job_id: str = data.get("id") or data.get("job_id", "")
    if not job_id:
        raise ValueError(f"Server response did not contain a job ID: {data!r}")
    return job_id

toggle_qpu(qpu_id, enabled)

Toggle QPU driver state (admin-only).

Parameters:

Name Type Description Default
qpu_id str

ID of the QPU.

required
enabled bool

Whether the QPU should be enabled.

required

Returns:

Type Description
dict[str, Any]

Response dict.

Source code in qpi-client/py/qpi_client/client.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def toggle_qpu(self, qpu_id: str, enabled: bool) -> dict[str, Any]:
    """Toggle QPU driver state (admin-only).

    Args:
        qpu_id: ID of the QPU.
        enabled: Whether the QPU should be enabled.

    Returns:
        Response dict.
    """
    resp = self._session.post(
        f"{self.base_url}/api/op/qpu/toggle",
        json={"id": qpu_id, "enabled": enabled},
    )
    resp.raise_for_status()
    return resp.json()

update_time_request(request_id, status, rejection_reason=None)

Update/Handle a QPU time request (admin-only).

Parameters:

Name Type Description Default
request_id str

ID of the time request.

required
status str

"approved" or "rejected".

required
rejection_reason str | None

Optional explanation if rejected.

None

Returns:

Type Description
dict[str, Any]

The updated request record dict.

Source code in qpi-client/py/qpi_client/client.py
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def update_time_request(
    self,
    request_id: str,
    status: str,
    rejection_reason: str | None = None,
) -> dict[str, Any]:
    """Update/Handle a QPU time request (admin-only).

    Args:
        request_id: ID of the time request.
        status: "approved" or "rejected".
        rejection_reason: Optional explanation if rejected.

    Returns:
        The updated request record dict.
    """
    payload = {"status": status}
    if rejection_reason is not None:
        payload["rejection_reason"] = rejection_reason
    resp = self._session.patch(
        f"{self.base_url}/api/collections/qpu_time_requests/records/{request_id}",
        json=payload,
    )
    resp.raise_for_status()
    return resp.json()

update_time_slot(slot_id, start_time=None, end_time=None)

Update an existing booking slot.

Parameters:

Name Type Description Default
slot_id str

ID of the booking slot.

required
start_time str | None

Optional start time RFC3339 string.

None
end_time str | None

Optional end time RFC3339 string.

None

Returns:

Type Description
dict[str, Any]

The updated booking slot dict.

Source code in qpi-client/py/qpi_client/client.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def update_time_slot(
    self,
    slot_id: str,
    start_time: str | None = None,
    end_time: str | None = None,
) -> dict[str, Any]:
    """Update an existing booking slot.

    Args:
        slot_id: ID of the booking slot.
        start_time: Optional start time RFC3339 string.
        end_time: Optional end time RFC3339 string.

    Returns:
        The updated booking slot dict.
    """
    payload = {}
    if start_time is not None:
        payload["start_time"] = start_time
    if end_time is not None:
        payload["end_time"] = end_time
    resp = self._session.patch(
        f"{self.base_url}/api/collections/time_slots/records/{slot_id}",
        json=payload,
    )
    resp.raise_for_status()
    return resp.json()

QPIJob

Bases: JobV1

A Qiskit-compatible job handle backed by the QPI REST API.

Instances are created by :meth:QPIBackend.run or :meth:QPIClient.job; you should not need to instantiate this class directly.

Source code in qpi-client/py/qpi_client/provider.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class QPIJob(JobV1):
    """A Qiskit-compatible job handle backed by the QPI REST API.

    Instances are created by :meth:`QPIBackend.run` or :meth:`QPIClient.job`;
    you should not need to instantiate this class directly.
    """

    def __init__(
        self,
        backend: "QPIBackend" | None,
        job_id: str,
        client: QPIClient,
        **kwargs: Any,
    ) -> None:
        super().__init__(backend, job_id, **kwargs)
        self._client = client
        self._result: Result | None = None

    @property
    def id(self) -> str:
        """Server-assigned job ID."""
        return self.job_id()

    # -- JobV1 interface -----------------------------------------------------

    def submit(self) -> None:
        """No-op — the job was already submitted by the backend."""

    def result(
        self,
        timeout: float | None = None,
        wait: float = 5.0,
    ) -> Result:
        """Block until the job completes and return a :class:`qiskit.result.Result`.

        Args:
            timeout: Maximum seconds to wait. *None* means wait indefinitely.
            wait: Polling interval in seconds.

        Returns:
            A Qiskit :class:`Result` object populated with counts and
            (optionally) memory from the QPI server response.

        Raises:
            TimeoutError: If the job does not finish within *timeout* seconds.
            RuntimeError: If the job fails or is cancelled on the server.
        """
        if self._result is not None:
            return self._result

        start = time.monotonic()
        while True:
            data = self._client.get_job(self.job_id())
            status = data.get("status", "")
            if status in ("completed", "failed", "cancelled"):
                break
            if timeout is not None and (time.monotonic() - start) > timeout:
                raise TimeoutError(
                    f"Job {self.job_id()} did not complete within {timeout}s"
                )
            time.sleep(wait)

        if status == "failed":
            error_msg = ""
            results_data = data.get("results")
            if isinstance(results_data, dict):
                error_msg = results_data.get("error", "")
            raise RuntimeError(f"Job {self.job_id()} failed: {error_msg}")

        if status == "cancelled":
            raise RuntimeError(f"Job {self.job_id()} was cancelled")

        self._result = self._build_result(data)
        return self._result

    def status(self) -> JobStatus:
        """Return the current server-side status of the job."""
        data = self._client.get_job(self.job_id())
        _STATUS_MAP: dict[str, JobStatus] = {
            "pending": JobStatus.QUEUED,
            "queued": JobStatus.QUEUED,
            "running": JobStatus.RUNNING,
            "completed": JobStatus.DONE,
            "failed": JobStatus.ERROR,
            "cancelled": JobStatus.CANCELLED,
        }
        return _STATUS_MAP.get(data.get("status", ""), JobStatus.ERROR)

    def cancel(self) -> None:
        """Request cancellation of this job on the server."""
        self._client.cancel_job(self.job_id())

    # -- internal helpers ----------------------------------------------------

    def _build_result(self, data: dict[str, Any]) -> Result:
        """Construct a :class:`qiskit.result.Result` from the API response.

        The server ``results`` payload may be:
        * A dict with a top-level ``"circuit_results"`` list (one entry per
          submitted circuit).
        * A single dict with ``"counts"``/``"hex_counts"``/``"memory"`` keys
          when only one circuit was submitted.
        * ``None`` (edge-case) — we still return a valid *Result* with no
          experiment data.
        """
        results_payload: Any = data.get("results") or {}

        # Normalise to a list of per-circuit result dicts.
        if isinstance(results_payload, dict):
            circuit_results: list[dict[str, Any]] = results_payload.get(
                "circuit_results", []
            )
            if not circuit_results:
                # Treat the whole dict as a single-circuit result.
                circuit_results = [results_payload]
        elif isinstance(results_payload, list):
            circuit_results = results_payload
        else:
            circuit_results = []

        experiment_results: list[ExperimentResult] = []
        for idx, cr in enumerate(circuit_results):
            counts = cr.get("counts") or cr.get("hex_counts") or {}
            # Ensure keys are hex-string formatted ("0x…").
            hex_counts: dict[str, int] = {}
            for key, val in counts.items():
                if isinstance(key, int):
                    hex_counts[hex(key)] = int(val)
                elif key.startswith("0x") or key.startswith("0X"):
                    hex_counts[key] = int(val)
                else:
                    # Assume binary string — convert to hex.
                    try:
                        hex_counts[hex(int(key, 2))] = int(val)
                    except ValueError:
                        hex_counts[key] = int(val)

            exp_data = ExperimentResultData(
                counts=hex_counts,
                memory=cr.get("memory"),
            )

            experiment_results.append(
                ExperimentResult(
                    shots=cr.get(
                        "shots", sum(hex_counts.values()) if hex_counts else 0
                    ),
                    success=True,
                    data=exp_data,
                    header=cr.get("header"),
                )
            )

        return Result(
            backend_name=self.backend().name if self.backend() else "qpi",
            backend_version="0.1.0",
            qobj_id=None,
            job_id=self.job_id(),
            success=True,
            results=experiment_results,
        )

id property

Server-assigned job ID.

cancel()

Request cancellation of this job on the server.

Source code in qpi-client/py/qpi_client/provider.py
130
131
132
def cancel(self) -> None:
    """Request cancellation of this job on the server."""
    self._client.cancel_job(self.job_id())

result(timeout=None, wait=5.0)

Block until the job completes and return a :class:qiskit.result.Result.

Parameters:

Name Type Description Default
timeout float | None

Maximum seconds to wait. None means wait indefinitely.

None
wait float

Polling interval in seconds.

5.0

Returns:

Type Description
Result

A Qiskit :class:Result object populated with counts and

Result

(optionally) memory from the QPI server response.

Raises:

Type Description
TimeoutError

If the job does not finish within timeout seconds.

RuntimeError

If the job fails or is cancelled on the server.

Source code in qpi-client/py/qpi_client/provider.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def result(
    self,
    timeout: float | None = None,
    wait: float = 5.0,
) -> Result:
    """Block until the job completes and return a :class:`qiskit.result.Result`.

    Args:
        timeout: Maximum seconds to wait. *None* means wait indefinitely.
        wait: Polling interval in seconds.

    Returns:
        A Qiskit :class:`Result` object populated with counts and
        (optionally) memory from the QPI server response.

    Raises:
        TimeoutError: If the job does not finish within *timeout* seconds.
        RuntimeError: If the job fails or is cancelled on the server.
    """
    if self._result is not None:
        return self._result

    start = time.monotonic()
    while True:
        data = self._client.get_job(self.job_id())
        status = data.get("status", "")
        if status in ("completed", "failed", "cancelled"):
            break
        if timeout is not None and (time.monotonic() - start) > timeout:
            raise TimeoutError(
                f"Job {self.job_id()} did not complete within {timeout}s"
            )
        time.sleep(wait)

    if status == "failed":
        error_msg = ""
        results_data = data.get("results")
        if isinstance(results_data, dict):
            error_msg = results_data.get("error", "")
        raise RuntimeError(f"Job {self.job_id()} failed: {error_msg}")

    if status == "cancelled":
        raise RuntimeError(f"Job {self.job_id()} was cancelled")

    self._result = self._build_result(data)
    return self._result

status()

Return the current server-side status of the job.

Source code in qpi-client/py/qpi_client/provider.py
117
118
119
120
121
122
123
124
125
126
127
128
def status(self) -> JobStatus:
    """Return the current server-side status of the job."""
    data = self._client.get_job(self.job_id())
    _STATUS_MAP: dict[str, JobStatus] = {
        "pending": JobStatus.QUEUED,
        "queued": JobStatus.QUEUED,
        "running": JobStatus.RUNNING,
        "completed": JobStatus.DONE,
        "failed": JobStatus.ERROR,
        "cancelled": JobStatus.CANCELLED,
    }
    return _STATUS_MAP.get(data.get("status", ""), JobStatus.ERROR)

submit()

No-op — the job was already submitted by the backend.

Source code in qpi-client/py/qpi_client/provider.py
67
68
def submit(self) -> None:
    """No-op — the job was already submitted by the backend."""