Source code for qiskit.providers.ibmq.job.job_monitor

# -*- coding: utf-8 -*-

# This code is part of Qiskit.
#
# (C) Copyright IBM 2017, 2018.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.


"""A module for monitoring jobs."""

import sys
import time
from typing import TextIO, Optional

from .ibmqjob import IBMQJob
from ..utils.converters import duration_difference


def _text_checker(job: IBMQJob,
                  interval: float,
                  _interval_set: bool = False,
                  output: TextIO = sys.stdout) -> None:
    """A text-based job status checker.

    Args:
        job: The job to check.
        interval: The interval at which to check.
        _interval_set: Was interval time set by user?
        output: The file like object to write status messages to.
            By default this is sys.stdout.

    """
    status = job.status()
    msg = status.value
    prev_msg = msg
    msg_len = len(msg)
    prev_time_str = ''

    print('\r%s: %s' % ('Job Status', msg), end='', file=output)
    while status.name not in ['DONE', 'CANCELLED', 'ERROR']:
        time.sleep(interval)
        status = job.status()
        msg = status.value

        if status.name == 'QUEUED':
            queue_info = job.queue_info()

            if queue_info:
                if queue_info.estimated_start_time:
                    est_time = queue_info.estimated_start_time
                    time_str = duration_difference(est_time)
                    prev_time_str = time_str

            else:
                time_str = prev_time_str

            msg += ' ({queue}) [Est. wait time: {time}]'.format(queue=job.queue_position(),
                                                                time=time_str)

            if job.queue_position() is None:
                if not _interval_set:
                    interval = 2
            elif not _interval_set:
                interval = max(job.queue_position(), 2)
        else:
            if not _interval_set:
                interval = 2

        if status.name == 'RUNNING':
            msg = 'RUNNING'
            job_mode = job.scheduling_mode()
            if job_mode:
                msg += ' - {}'.format(job_mode)

        elif status.name == 'ERROR':
            msg = 'ERROR - {}'.format(job.error_message())

        # Adjust length of message so there are no artifacts
        if len(msg) < msg_len:
            msg += ' ' * (msg_len - len(msg))
        elif len(msg) > msg_len:
            msg_len = len(msg)

        if msg != prev_msg:
            print('\r%s: %s' % ('Job Status', msg), end='', file=output)
            prev_msg = msg

    print('', file=output)


[docs]def job_monitor(job: IBMQJob, interval: Optional[float] = None, output: TextIO = sys.stdout) -> None: """Monitor the status of an ``IBMQJob`` instance. Args: job: Job to monitor. interval: Time interval between status queries. output: The file like object to write status messages to. By default this is sys.stdout. """ if interval is None: _interval_set = False interval = 5 else: _interval_set = True _text_checker(job, interval, _interval_set, output=output)