Module tripleblind.job
A Job represents a computational run coordinated by the TripleBlind Router amongst one or more Access Points.
For example, a Job could train a neural network on one or many datasets; train a regression; or use an already built and trained neural network to perform an inference.
Jobs have X basic elements: * An operation * One or more input datasets * A set of params to define options * Optionally, a preprocessor to assist in prepping the input dataset
The operation fits into one of several classes. It can be a predefined protocol, commonly used when training a model. It can also be a predefined operator which executes an well known procedure against the data, such as outlier detection. Finally, it can be the Asset identifier of a trained model, which will run the input data through that model to perform inference.
The typical workflow follows this pattern:
import tripleblind as tb
job = job = tb.create_job(
job_name="...something unique for reference...",
operation=op, # protocol, operation or Asset of a model
dataset=[data1,data2], # filename(s) or Assets for input
params={"security": "smpc", ...}, # options
}
if job.submit():
job.wait_for_completion()
if job.success:
output = job.result.asset # get the results
output.retrieve(save_as="output.csv") # download results locally
else:
print(f"Failed")
Functions
def create_job(job_name: str, operation: Union[Asset, Operation], dataset: Union[Asset, str, pathlib.Path, preprocessor.package.Package, List[Union[Asset, str, pathlib.Path, preprocessor.package.Package]], ForwardRef(None)] = None, preprocessor=None, params: Optional[dict] = None, session: Optional[Session] = None) -> Job
-
Create a job for execution.
Here is a job to train an already defined CNN network for 3 epochs on the standard MNIST dataset: job = tb.create_job( job_name="train-mnist-cnn", operation=network_asset, dataset=[tb.DATASET_MNIST], params={"epochs": 3}, )
Args
job_name
:str
- Plain name of the job (appears in UI)
operation
:Asset
orOperation
- An algorithm Asset or one of the built-in protocols, such as training a network.
dataset
:Asset, str, Path, Package
orlist
ofsame
, optional- A dataset or list of datasets. Datasets can be specified as Assets or as a filename. When a filename is given it will automatically be converted to a temporary Asset which gets deleted at the completion of the Job.
preprocessor
:Builder
- A preprocessor builder that will transform input data assets before operation.
params
:dict
, optional- A dictionary containing parameters appropriate to the operation being invoked.
session
:Session
, optional- A connection session. If not specified the default session is used.
Raises
TripleblindAPIError
- number of preprocessors is not 1 OR number of preprocessors does not match number of datasets
Returns
Job
- A job descriptor, None if Operation is invalid
def fetch_asset(algorithm: Optional[Asset] = None, dataset: Union[Asset, List[Asset], ForwardRef(None)] = None, session: Optional[Session] = None) -> AssetUsageProps
-
Retrieve the permission information for an asset
Args
algorithm
:Asset
, optional- An algorithm
dataset
:Asset
or[Asset]
, optional- A dataset
session
:Session
, optional- A connection session. If not specified the default session is used.
Returns
AssetUsageProps
- The asset description, or None if not found
def status_list(session: Optional[Session] = None, job_id: str = None) -> List[JobStatus]
-
Fetch the status reports of all jobs.
Args
session
:Session
, optional- A connection session. If not specified the default session is used.
Returns
List[JobStatus]
- List of all available jobs (running or completed)
Classes
class AssetUsageProps (status: str)
-
Properties for this usage of an asset in the context of a specific job
Class variables
var status : str
-
Required permission to use this asset: both | alg | data
class Job (id: uuid.UUID, job_name: str, metadata: object, created: datetime.datetime, owner: dict, dataset: Asset, preprocessor: str, operation: Asset, waiting_on: List[dict] = None, status: int = 0, model_status: dict = None)
-
A single operation involving an algorithm or training protocol, some form of data (either a data asset or an input file) and any parameters used by the algorithm/protocol.
Subclasses
Class variables
var created : datetime.datetime
-
When the job was started
var dataset : Asset
-
List of datasets used by this job.
var id : uuid.UUID
-
The unique identifier for this job
var job_name : str
-
Display name for the job
var metadata : object
-
Parameters and other properties associated with the job.
var model_status : dict
-
Additional metrics on model training
var operation : Asset
-
The algorithm or protocol being run in this job
var owner : dict
-
Data about the user that started the job
var preprocessor : str
-
Serialized str of preprocessor that will transform input datasets before use AP side.
var status : int
-
Status code from AP status updates. Will not see 1 (requested) or 2 (submitted) because we only check marketplace status until permissions are granted to job. Then self.update_status gets this status from AP and starts at 3 (in progress) and completes with either 4 (complete) or 10 (error). See message_schemas/status.py for any additional error codes.
var waiting_on : List[dict]
-
List of required items needing permissions (valid when _market_status = 'perm')
Static methods
def active() -> Job
def find(job_name: Union[str, re.Pattern, ForwardRef(None)] = None, job_id: Optional[str] = None, job_status: Optional[str] = None, session: Optional[Session] = None) -> Job
-
Return a job with a name or id that matches the search pattern
Args
job_name
:str
, optional- Name of job to find, optional
job_id
:str
, optional- ID of job to find, optional
job_status
:str
, optional- status of job being searched for. options include 'active', 'waiting', 'history'
session
:Session
, optional- A connection session. If not specified the default session is used.
Raises
TripleblindProcessError
- Search matches multiple jobs
Returns
Job
- The searched-for Job object, or None if not found
def find_all(job_name: Union[str, re.Pattern, ForwardRef(None)] = None, job_id: Optional[str] = None, job_status: Optional[str] = None, session: Optional[Session] = None, max: Optional[int] = None) -> List[Job]
-
Return a list of jobs with a name or id that matches the search pattern
Args
job_name
:str
, optional- Name of job to find, optional
job_id
:str
, optional- ID of job to find, optional
job_status
:str
, optional- status of job being searched for. options include 'active', 'waiting', 'history'
session
:Session
, optional- A connection session. If not specified the default session is used.
max
:int
, optional- maximum number of jobs to return, optional
Returns
List[Job]
- A list of found jobs, or None if no match found
Instance variables
var result : JobResult
var success : bool
Methods
def cancel(self, session: Optional[Session] = None) -> bool
-
Cancel a pending (waiting on permissions or queued) job
This will NOT stop a job which has already begun calculation. Use Job.kill() if you wish to unconditionally stop a job which is waiting on permission, queued or already calculating.
Args
session
:Session
, optional- A connection session. If not specified, the default session is used.
Returns
bool
- True if job was canceled.
def get_status_stream(self, session: Optional[Session] = None) -> StatusOutputStream
-
Obtain a generator which produces process status messages
Args
session
:Session
, optional- A connection session. If not specified, the default session is used.
Returns
StatusOutputStream
- The status message generator.
def handle_keyboard_interrupt(self, auto_cleanup: bool = True, session: Optional[Session] = None)
-
For interactive scripts, prompt user when a Ctrl+C is detected
When a local script that launches a process is stopped, the remote operation can be allowed to continue or can be stopped also. This method determines if that question is appropriate and acts accordingly.
Args
auto_cleanup
:bool
- If True, temporary assets will be deleted.
session
:Session
, optional- A connection session. If not specified, the default session is used.
def kill(self, session: Optional[Session] = None) -> bool
-
Immediately terminate an active job.
This will also cancel a job which is waiting on an access request response or waiting in the queue for computational resources.
Args
session
:Session
, optional- A connection session. If not specified, the default session is used.
Returns
bool
- True if job was terminated.
def pretty_status_msg(self)
def submit(self, input_file=None, session: Optional[Session] = None) -> bool
-
Begin execution of a job.
To submit a file along with your job you would do something like this
with open("/path/to/myfile.png", "rb") as f: job.submit(input_file=f})
Args
input_file
:file
, optional- File to submit with the job (deprecated)
session
:Session
, optional- A connection session. If not specified, the default session is used.
Returns
bool
- True if successfully started, False if failed
def update_router_status(self, session: Session = None) -> bool
-
Fetch the latest status from the Router
NOTE: This updates both self._market_status AND the self.waiting_on
Args
session
:Session
, optional- A connection session. If not specified, the default session is used.
Result
bool: True if updated, False if unchanged
def update_status(self, session: Optional[Session] = None, update_throttle=None)
-
Fetch the latest status for this job
Args
session
:Session
, optional- A connection session. If not specified, the default session is used.
update_throttle
:float
- Number of seconds to wait before reaching out to the Router. Useful to avoid flooding the network needlessly. Default is a 1 sec throttle.
Returns
bool
- True if found and updated, False otherwise
def wait_for_completion(self, silent: bool = False, wait_for_permission: bool = True, session: Optional[Session] = None)
-
Wait for a job to complete, optionally waiting for permissions.
Args
silent
:bool
, optional- Suppress status messages during execution? Default is to show messages.
wait_for_permission
:bool
, optional- When True this will block until a dataset owner grants permission. Otherwise it will return immediately if permission is required. Defaults to blocking behavior.
session
:Session
, optional- A connection session. If not specified, the default session is used.
Raises
TripleblindProcessError
- Process failed or cancelled.
def waiting_on_permission(self) -> bool
-
Tests if the job is waiting for one or more access permission grant(s)
Returns
bool
- True if waiting, False otherwise
def waiting_on_queue(self) -> bool
-
Tests if the job is in the queue
Returns
bool
- True if queued, False otherwise
class JobResult (raw_status: object)
-
JobResult wraps the text output by a completed job.
Job results are all returned as simple strings. These strings can be parsed differently depending on the type of job. Some have simple text output (e.g. an inference), some contain the asset IDs of algorithms or datasets generated by the process.
Class variables
var raw_status : object
-
Raw, unparsed output from the job.
Instance variables
var asset : Asset
-
Identifier of the dataset created by the job.
var dataframe : pd.DataFrame
-
Dataframe of tabular output (shortcut for table.dataframe)
var table : TableAsset
-
Table version of the dataset created by the job.
class JobStatus (id: uuid.UUID, job_id: uuid.UUID, code: int, status: str, when: datetime.datetime, communications: int, error: str = '')
-
JobStatus is a snapshot of the state of a Job performing an operation
NOTE: This object is static, you will need to request a new status snapshot if the job is still running.
Class variables
var code : int
var communications : int
var error : str
var id : uuid.UUID
var job_id : uuid.UUID
var status : str
var when : datetime.datetime
Instance variables
var error_code : int
-
Failure code, None if the job succeeded.
var result : JobResult
-
A JobResult for a successful run, None if the job failed.
class RemoteStatusOutputStream (job_id: str, session=None)
-
Remote Continuous Inference Output Stream
Similar to the
StatusOutputStream
, but used when connecting to a Process initiated by another party (if allowed).See
StatusOutputStream
for more detail on the output format.Methods
def handle_exception(self, e: BaseException)
def remote_status(self, max_retries: int = 5, initial_wait: float = 0.5, backoff_factor: int = 2) -> Generator[dict, None, Optional[Exception]]
-
Obtain a generator which produces process status messages
Note: Use the
StatusOutputStream.format_status_message()
to convert the emitted status messages into nicely formatted output.Args
max_retries
:int
- How many times to re-attempt a connection if a network failure occurs.
initial_wait
:float
- Seconds to wait in the initial retry.
backoff_factor
:int
- Step factor used in retry pauses.
Returns
RemoteStatusOutputStream
- The status message generator.
class StatusOutputStream (job: Job, session: Session)
-
Job Output Status Message interface
During the running of a Job it can generate status messages to communicate intermediate results (e.g. in a Continuous Inference), checkpoint information (e.g. in a training at each epoch), or any kind of message the protocol wants to send.
Typical usage pattern is::
output_stream = model.infer(..., stream_output=True) try: for status in output_stream.status(): # status is either a string with control messages, or a dict if isinstance(status, dict): print(f"Calculated at: {status['data_gathered_timestamp']}") do_something(status["result"]) except BaseException as e: output_stream.handle_exception(e)
Each protocol has its own status output format. The basic format of the status messages is::
{ "__type": "MSG_TYPE" # custom fields for the type }
Below are details of formats from current protocols.
Continuous Federated Inference::
{ "__type": "InferenceResults", # Protocol specific data "data_gathered_timestamp": 1680441227.5509229, "result": [[5.0], [5.0], [5.0], [5.0], [5.0], [5.0], [5.0], [5.0], [5.0], [5.0]], }
Type = "SplitTrainMetrics"::
{ '__type': 'SplitTrainMetrics', 'test_results': 'Client0: Test records: 2500, Accuracy: 10.08%, Loss: 2.30\nClient1: Test records: 2500, Accuracy: 10.48%, Loss: 2.30\n', 'train_reports': [ {'accuracy': 10.035555555555556, 'loss': 2.4022168402777777, 'client_id': 0, '__type': 'SplitTrainClassificationReport'}, {'accuracy': 9.933333333333334, 'loss': 2.3709133680555556, 'client_id': 1, '__type': 'SplitTrainClassificationReport'} ], 'epoch_count': 1, 'epoch': 0 }
Type = "FederatedTrainMetrics"::
{ '__type': 'FederatedTrainMetrics', }
Type = "NNSMPCMetrics"::
{ '__type': 'NNSMPCMetrics', }
Type = "NNFEDMetrics"::
{ '__type': 'NNFEDMetrics', }
Type = "RegressionTrainingMetrics"::
{ '__type': 'RegressionTrainingMetrics', }
Type = "PSIMetrics"::
{ '__type': 'PSIMetrics', 'hash_table_size': 100 }
Type = "RecommenderTrainMetrics"::
{ '__type': 'RecommenderTrainMetrics', }
Static methods
def format_status_message(msg: dict) -> List[str]
-
Convert a message from the status stream into a human-readable form
The msg dict is assumed to be in the standard output format returned by the Access Point for a running job.
Args
msg
:dict
- The status message structure.
Returns
List[str], str
- The formatted output (may be multi-line) and the next single-line message
Methods
def handle_exception(self, e: BaseException, auto_cleanup: bool = True)
-
Generic handler for exceptions that can occur during stream handling
Args
e
:BaseException
- Any exception, including the expected ones of KeyboardInterrupt and StopIteration
Raises
e
- If the exception is truly unexpected, it is re-raised.
def status(self, max_retries: int = 5, initial_wait: float = 0.5, backoff_factor: int = 2) -> Generator[Union[dict, Literal['waiting on permission', 'starting']], None, Optional[Exception]]
-
Obtain a generator which produces process status messages
Note: Use the format_status_message() to convert the emitted status messages into nicely formatted output.
Args
max_retries
:int
- How many times to re-attempt a connection if a network failure occurs.
initial_wait
:float
- Seconds to wait in the initial retry.
backoff_factor
:int
- Step factor used in retry pauses.
Returns
StatusOutputStream
- The status message generator.