Aller au contenu principal

Gérer les ressources de calcul et de données Qiskit Serverless

Versions des packages

Le code de cette page a été développé avec les dépendances suivantes. Nous recommandons d'utiliser ces versions ou des versions plus récentes.

qiskit[all]~=2.0.0
qiskit-ibm-runtime~=0.37.0
qiskit-serverless~=0.22.0

Avec Qiskit Serverless, tu peux gérer les ressources de calcul et les données dans ton pattern Qiskit, notamment les CPU, les QPU et d'autres accélérateurs de calcul.

Définir des statuts détaillés

Les workloads Serverless traversent plusieurs étapes au cours d'un workflow. Par défaut, les statuts suivants sont consultables via job.status() :

  • QUEUED : le workload est en file d'attente pour les ressources classiques
  • INITIALIZING : le workload est en cours de configuration
  • RUNNING : le workload s'exécute actuellement sur des ressources classiques
  • DONE : le workload s'est terminé avec succès

Tu peux également définir des statuts personnalisés pour décrire plus précisément l'étape en cours dans le workflow, comme indiqué ci-dessous.

# Added by doQumentation — required packages for this notebook
!pip install -q qiskit qiskit-ibm-runtime qiskit-serverless
# This cell is hidden from users, it just creates a new folder
from pathlib import Path

Path("./source_files").mkdir(exist_ok=True)
%%writefile ./source_files/status_example.py

from qiskit_serverless import update_status, Job

## If your function has a mapping stage, particularly application functions, you can set the status to "RUNNING: MAPPING" as follows:
update_status(Job.MAPPING)

## While handling transpilation, error suppression, and so forth, you can set the status to "RUNNING: OPTIMIZING_FOR_HARDWARE":
update_status(Job.OPTIMIZING_HARDWARE)

## After you submit jobs to Qiskit Runtime, the underlying quantum job will be queued. You can set status to "RUNNING: WAITING_FOR_QPU":
update_status(Job.WAITING_QPU)

## When the Qiskit Runtime job starts running on the QPU, set the following status "RUNNING: EXECUTING_QPU":
update_status(Job.EXECUTING_QPU)

## Once QPU is completed and post-processing has begun, set the status "RUNNING: POST_PROCESSING":
update_status(Job.POST_PROCESSING)
Writing ./source_files/status_example.py

Une fois le workload terminé avec succès (via save_result()), ce statut sera automatiquement mis à jour à DONE.

Workflows parallèles

Pour les tâches classiques pouvant être parallélisées, utilise le décorateur @distribute_task pour définir les besoins en ressources de calcul nécessaires à l'exécution d'une tâche. Commence par reprendre l'exemple transpile_remote.py du guide Écrire ton premier programme Qiskit Serverless avec le code suivant.

Le code ci-dessous nécessite que tu aies déjà sauvegardé tes identifiants.

%%writefile ./source_files/transpile_remote.py

from qiskit.transpiler import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_serverless import distribute_task

service = QiskitRuntimeService()

@distribute_task(target={"cpu": 1})
def transpile_remote(circuit, optimization_level, backend):
"""Transpiles an abstract circuit (or list of circuits) into an ISA circuit for a given backend."""
pass_manager = generate_preset_pass_manager(
optimization_level=optimization_level,
backend=service.backend(backend)
)
isa_circuit = pass_manager.run(circuit)
return isa_circuit
Writing ./source_files/transpile_remote.py

Dans cet exemple, tu as décoré la fonction transpile_remote() avec @distribute_task(target={"cpu": 1}). À l'exécution, cela crée une tâche parallèle asynchrone utilisant un seul cœur CPU, et renvoie une référence permettant de suivre le worker. Pour récupérer le résultat, passe la référence à la fonction get(). On peut ainsi lancer plusieurs tâches en parallèle :

%%writefile --append ./source_files/transpile_remote.py

from time import time
from qiskit_serverless import get, get_arguments, save_result, update_status, Job

# Get arguments
arguments = get_arguments()
circuit = arguments.get("circuit")
optimization_level = arguments.get("optimization_level")
backend = arguments.get("backend")
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Start distributed transpilation

update_status(Job.OPTIMIZING_HARDWARE)

start_time = time()
transpile_worker_references = [
transpile_remote(circuit, optimization_level, backend)
for circuit in arguments.get("circuit_list")
]

transpiled_circuits = get(transpile_worker_references)
end_time = time()
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Save result, with metadata

result = {
"circuits": transpiled_circuits,
"metadata": {
"resource_usage": {
"RUNNING: OPTIMIZING_FOR_HARDWARE": {
"CPU_TIME": end_time - start_time,
"QPU_TIME": 0,
},
}
},
}

save_result(result)
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It uploads the serverless program and checks it runs.

def test_serverless_job(title, entrypoint):
# Import in function to stop them interfering with user-facing code
from qiskit.circuit.random import random_circuit
from qiskit_serverless import IBMServerlessClient, QiskitFunction
import time
import uuid

title += "_" + uuid.uuid4().hex[:8]
serverless = IBMServerlessClient()
transpile_remote_demo = QiskitFunction(
title=title,
entrypoint=entrypoint,
working_dir="./source_files/",
)
serverless.upload(transpile_remote_demo)
job = serverless.get(title).run(
circuit=random_circuit(3, 3),
circuit_list=[random_circuit(3, 3) for _ in range(3)],
backend="ibm_torino",
optimization_level=1,
)
for retry in range(25):
time.sleep(5)
status = job.status()
if status == "DONE":
print("Job completed successfully")
return
if status not in [
"QUEUED",
"INITIALIZING",
"RUNNING",
"RUNNING: OPTIMIZING_FOR_HARDWARE",
"DONE",
]:
raise Exception(
f"Unexpected job status '{status}'.\nHere's the logs:\n"
+ job.logs()
)
print(f"Waiting for job (status '{status}')")
raise Exception("Job did not complete in time")

test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully

Explorer différentes configurations de tâches

Tu peux allouer de manière flexible des CPU, des GPU et de la mémoire à tes tâches via @distribute_task(). Sur IBM Quantum® Platform, chaque programme Qiskit Serverless dispose de 16 cœurs CPU et de 32 Go de RAM, qui peuvent être alloués dynamiquement selon les besoins.

Les cœurs CPU peuvent être alloués en unités entières ou même fractionnaires, comme illustré ci-dessous.

La mémoire est allouée en octets. Rappelle-toi qu'il y a 1 024 octets dans un kilo-octet, 1 024 kilo-octets dans un méga-octet et 1 024 méga-octets dans un giga-octet. Pour allouer 2 Go de mémoire à ton worker, il faut spécifier "mem": 2 * 1024 * 1024 * 1024.

%%writefile --append ./source_files/transpile_remote.py

@distribute_task(target={
"cpu": 16,
"mem": 2 * 1024 * 1024 * 1024
})
def transpile_remote(circuit, optimization_level, backend):
return None
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It checks the distributed program works.
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully

Gérer les données dans ton programme

Qiskit Serverless te permet de gérer des fichiers dans le répertoire /data, accessible à tous tes programmes. Cette fonctionnalité comporte quelques limitations :

  • Seuls les fichiers tar et h5 sont pris en charge pour le moment
  • Il s'agit uniquement d'un stockage plat /data ; les sous-répertoires du type /data/dossier/ ne sont pas supportés

Le code ci-dessous montre comment uploader des fichiers. Assure-toi d'avoir authentifié Qiskit Serverless avec ton compte IBM Quantum (voir Déployer sur IBM Quantum Platform pour les instructions).

import tarfile
from qiskit_serverless import IBMServerlessClient

# Create a tar
filename = "transpile_demo.tar"
file = tarfile.open(filename, "w")
file.add("./source_files/transpile_remote.py")
file.close()

# Get a reference to a QiskitFunction
serverless = IBMServerlessClient()
transpile_remote_demo = next(
program
for program in serverless.list()
if program.title == "transpile_remote_serverless"
)

# Upload the tar to Serverless data directory
serverless.file_upload(file=filename, function=transpile_remote_demo)
'{"message":"/usr/src/app/media/5e1f442128cdf60018496a04/transpile_demo.tar"}'

Tu peux ensuite lister tous les fichiers présents dans ton répertoire data. Ces données sont accessibles à tous les programmes.

serverless.files(function=transpile_remote_demo)
['classifier_name.pkl.tar', 'output.json.tar', 'transpile_demo.tar']

Depuis un programme, utilise file_download() pour télécharger le fichier dans l'environnement du programme, puis décompresse le tar.

%%writefile ./source_files/extract_tarfile.py

import tarfile
from qiskit_serverless import IBMServerlessClient

serverless = IBMServerlessClient(token="<YOUR_API_KEY>") # Use the 44-character API_KEY you created and saved from the IBM Quantum Platform Home dashboard
files = serverless.files()
demo_file = files[0]
downloaded_tar = serverless.file_download(demo_file)

with tarfile.open(downloaded_tar, 'r') as tar:
tar.extractall()

À ce stade, ton programme peut interagir avec les fichiers comme lors d'une expérience locale. file_upload(), file_download() et file_delete() peuvent être appelés depuis ton expérience locale ou depuis ton programme uploadé, pour une gestion des données cohérente et flexible.

Prochaines étapes

Recommandations