Source code for qiskit.primitives.primitive_job

# This code is part of Qiskit.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""
Job implementation for the reference implementations of Primitives.
"""

import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Generic, TypeVar

from qiskit.providers import JobError, JobStatus, JobV1

from .base.base_result import BasePrimitiveResult

T = TypeVar("T", bound=BasePrimitiveResult)


class PrimitiveJob(JobV1, Generic[T]):
    """
    PrimitiveJob class for the reference implemetations of Primitives.
    """

    def __init__(self, function, *args, **kwargs):
        """
        Args:
            function: a callable function to execute the job.
        """
        job_id = str(uuid.uuid4())
        super().__init__(None, job_id)
        self._future = None
        self._function = function
        self._args = args
        self._kwargs = kwargs

    def submit(self):
        if self._future is not None:
            raise JobError("Primitive job has already been submitted.")

        executor = ThreadPoolExecutor(max_workers=1)  # pylint: disable=consider-using-with
        self._future = executor.submit(self._function, *self._args, **self._kwargs)
        executor.shutdown(wait=False)

    def result(self) -> T:
        """Return the results of the job."""
        self._check_submitted()
        return self._future.result()

    def cancel(self):
        self._check_submitted()
        return self._future.cancel()

    def status(self):
        self._check_submitted()
        if self._future.running():
            return JobStatus.RUNNING
        elif self._future.cancelled():
            return JobStatus.CANCELLED
        elif self._future.done() and self._future.exception() is None:
            return JobStatus.DONE
        return JobStatus.ERROR

    def _check_submitted(self):
        if self._future is None:
            raise JobError("Job not submitted yet!. You have to .submit() first!")