Custom block allows execution of arbitrary code as part of the Workflow execution. Currently there are bindings available for Python (https://www.python.org/) and R (https://www.r-project.org/) programming languages.


Executing a custom block consists of establishing a connection with the Omniscope instance, communicating with Omniscope by reading block inputs, option values and writing block output, finally shutting down the connection, or aborting the execution.


Setup


Communication with the Omniscope instance happens via an instance of the OmniscopeApi class.

Only one instance of this class is allowed per script execution. The instance must be disposed properly (see Shutting down: Commit/Cancel/Abort section)


The OmniscopeApi class is part of the omniscope.api library, which is installed automatically on first execution. 

Creating an OmniscopeApi object requires no parameters: 


Python

from omniscope.api import OmniscopeApi
omniscope_api = OmniscopeApi()



R

library(omniscope.api)
omni.api = omniscope.api()



Reading block inputs


The custom script can access data connected to block inputs. Data is read from upstream blocks and provided as a pandas.DataFrame instance (Python) or data.frame instances (R) depending on the language.


It is possible to specify which of the block inputs to read from via the input_number keyword argument (first input if left unspecified). If no block is connected to the selected input, the data will be None (Python) or NULL (R)


Reading the same input multiple times will produce the same data frame.



Python

from omniscope.api import OmniscopeApi
omniscope_api = OmniscopeApi()

# read the records associated to the first block input
first_block_input_data = omniscope_api.read_input_records(input_number=0)

# read the records associated to the second block input
second_block_input_data = omniscope_api.read_input_records(input_number=1)

if (second_block_input_data is None):
    # handle second input not connected to the block
   omniscope_api.abort(message="Second block input must  be connected!")

omniscope_api.commit()


R

library(omniscope.api)

omni.api = omniscope.api()

# read the records associated to the first block input
first.block.input.data =  read.input.records(omni.api, input.number=1)

# read the records associated to the second block input
second_block_input_data = read.input.records(omni.api, input.number=2)

if (is.null(second_block_input_data)) {
    # handle second input not connected to the block
   abort(omni.api, message="Second block input must  be connected!")
}

commit(omni.api)


Note that, according to the languages conventions, Python is 0 based, while R is 1 based.


Writing block outputs


The script can publish its results by writing block outputs. These records can be later processed using Omniscope/ visualised or passed as block inputs to other custom blocks. 

The custom block has two outputs: it is currently not possible to add more outputs. 


The method arguments are the data itself represented as a pandas.DataFrame (Python) or a data.frame instance (R), and an optional output index (by default records are published in the first block output)


Python

from omniscope.api import OmniscopeApi
import pandas as pd

omniscope_api = OmniscopeApi()

output_data = pd.DataFrame(data= {'Id': [1, 2], 'Name': ["Paul", "John"]})
omniscope_api.write_output_records(output_data, output_number=0)

omniscope_api.commit()



R

library(omniscope.api)

omni.api = omniscope.api()

output.data = data.frame("Id" = c(1,2), "Name" = c("Paul", "John"))
write.output.records(omni.api, output.data, output.number=1)

commit(omni.api)


Note that, according to the languages conventions, Python is zero-based, while R is 1 based.


Once data has been written (and the OmniscopeApi instance closed), data will be visible in the data tab, as well as accessible by downstream blocks.




Custom outputs

Starting from version 2020.4 b21066 it is possible to customise the number and labels of outputs in a custom block:


 

In the script, It is still possible to write data to these outputs by their index, but a new api has been introduced to allow write records via their IDs. For example, to write to the 'roc' output in Python:


omniscope_api.write_output(output_data, "roc")

The function call is equivalent to 

omniscope_api.write_output(output_data, 2)

or

omniscope_api.write.output.records(omni.api, output.data, 2)


Similarly using R:

write.output(omni.api, output.data, "roc")

which is equivalent to

write.output(omni.api, output.data, 3)

or 

write.output.records(omni.api, output.data, output.number=3)


Writing large datasets

Both  `write_ouput`  and `write_output_records` methods accept a dataframe as one of their arguments. The dataframe effectively represents the data to write on a certain output. In cases where the data to write is large however, it might not be possible to have the entire dataset represented as a single data frame loaded in memory.  


In custom block scripts, it is possible to write large datasets by calling the method 'write_output_recordsor write_output  multiple times, typically in a for loop, using multiple smaller DataFrames as "chunks" of data to write to Omniscope. 


Each time  the method 'write_output_recordsor 'write_output' the data frame passed in will be appended to the block output. As such the the number of columns, name of the columns, and types of the columns must not change between different invocations.


For example, consider a scenario where a big dataset that has been split in various parquet files. You can learn more about parquet file format here: https://parquet.apache.org/documentation/latest/ .
Assume that the dataset is too big to be fully loaded in memory, but it is not a problem to load a single parquet file at a time. Because each parquet file is a fragment of the same dataset, we know the columns will be the same in every file.

We can create a custom block to read the entire dataset by retrieve all the parquet files that we need to read, one then load file at a time, and writing its content to an Omniscope output.


This example uses Python, but similar result can be obtained using R. To read the parquet file we are using the pyarrow library py-arrow: https://pypi.org/project/pyarrow/


from omniscope.api import OmniscopeApi
import pyarrow.parquet as pq
import os


def is_parquet_file(filename, folder):
return os.path.isfile(os.path.join(folder, filename)) and filename.split(".")[-1] == "parquet"

omniscope_api = OmniscopeApi()

# retrieve the folder where the parquet file are located from the options.
folder = omniscope_api.get_option("folder")

# retrieve the list of parquet files
parquet_files = [f for f in os.listdir(folder) if is_parquet_file(f, folder)]

for parquet_file in parquet_files:
# read each parquet file in a data frame
df = pq.read_pandas(os.path.join(folder, parquet_file)).to_pandas()
# append each dataframe to the Omniscope ouput named 'Output'
omniscope_api.write_output(df, "Output")
omniscope_api.close()



The custom block is attached at the end of this article.


Reading Option values


The script can access the values of the options defined in the custom block. Option values are retrieved via get_option (Python) or get.option (R) methods by passing the Name of the option in the custom block. 


For example:



Your script can access the value of the option named "currency" option via 


Python 

from omniscope.api import OmniscopeApi
import pandas as pd

omniscope_api = OmniscopeApi()

currency = omniscope_api.get_option("currency")
if (currency == "USD"):
pass #...
if (currency == "GBP"):
pass #...
#...
omniscope_api.commit()



R

library(omniscope.api)

omni.api = omniscope.api()

input.data = read.input.records(omni.api, input.number=1)
# my.option = get.option(omni.api, "my_option")

currency = get.option(omni.api, "currency")

if(currency == "USD") {
# ...
} else if (currency == "GBP") {
# ...
}
#...
commit(omni.api)


The return type of the get_option method depends on the option type. For example, a TEXT option returns values as strings, a boolean option returns values that are either True or False.


More information about custom block options can be found here.

Shutting down: Commit/Cancel/Abort 


Instances of the OmniscopeApi represent a connection to a running Omniscope instance.  The instance must be disposed properly to notify Omniscope that new data is available, or that some error has occurred.


the commit method is used to mark the end of the script, and publish the data. Your script will continue running after a call to close, but it is not allowed to use the OmniscopeApi instance anymore. It is possible to pass an optional message argument that will be displayed in the block 


For example:


Python

from omniscope.api import OmniscopeApi
omniscope_api = OmniscopeApi()

#...

omniscope_api.commit(message="No anomalies found in the dataset")



R

library(omniscope.api)

omni.api = omniscope.api()

input.data = read.input.records(omni.api, input.number=1)

# my.option = get.option(omni.api, "my_option")


commit(omni.api, message="No anomalies found in the dataset")



It is also possible to mark the script execution as failed, and to disregard any data written up to that point.

One can do so via a call to cancel or abort methods. The difference between cancel and abort is that cancel will continue execution your script normally, whereas abort will trigger a system exit. 

Both methods will take an optional message displayed in the block error section


Python

from omniscope.api import OmniscopeApi

omniscope_api = OmniscopeApi()

# ...

omniscope_api.abort(message="Second block input not connected")


R

library(omniscope.api)

omni.api = omniscope.api()

# ...

abort(omni.api, "Second block input not connected")



N.B.


Any non zero return status returned from the interpreter execution (e.g., a crash or an uncaught exception) will be considered as a block execution error, and the published data will be disregarded. 

Logs


Standard outputs and Standard errors of your script will be attached to custom_block_executions_log.txt log file found in the Admin app -> Logs section and in the  logs folder within the omniscope-server folder. Standard outputs and Standard errors will be attached to the log file after the script has terminated its execution.


Python

from omniscope.api import OmniscopeApi
omniscope_api = OmniscopeApi()

print("Hello there")

omniscope_api.close()

R

library(omniscope.api)

omni.api = omniscope.api()

print("Hello there")

close(omni.api)



To monitor the file you can use command line tools (e.g., tail) or the Omniscope admin page

2020-03-18T16:45:57.221+0000 INFO  [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: execution started
2020-03-18T16:45:57.222+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: transaction id = 026377c28c4225490a745f1e31a3d0a6-Custom-f9db02b3-92a8-41f9-ba1f-e5db169d3d3b
2020-03-18T16:45:58.252+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: process execution completed successfully
2020-03-18T16:45:58.252+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: process std out: Hello there
2020-03-18T16:45:58.252+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: process std err:
2020-03-18T16:45:58.257+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: execution completed
2020-03-18T16:46:35.962+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: execution started
2020-03-18T16:46:35.962+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: transaction id = 026377c28c4225490a745f1e31a3d0a6-Custom-583ee039-6b35-43ad-aec7-236fff74819b


It is possible to update the message displayed in the block bar by using the method update_message (Python) or update.message (R).


For example, while reading a sequence of files, we could display the file we are currently reading via


# ...
for parquet_file in parquet_files:
# Update UI message
omniscope_api.update_message("Reading "+str(parquet_file))
# ...


In R:


# ...
for (val parquet.file in parquet.files) {
update.message(omni.api, paste("Reading", "parquet.file", sep=" "))
# ...



During execution It will be rendered as 




R Api Reference

omniscope.api()

The constructor fpr the OmniscopeApi instance. This object is used to represents a custom block execution. The execution is modelled as a transaction:

  • The transaction is open when this object is created
  • Block options values can be read
  • Input records can be read from block inputs
  • Output records can be appended to block outputs
  • The transaction is closed/committed or cancelled

It is not possible to re-use the same transactions for multiple executions.


abort(omni.api, message='')

Marks the custom block execution as completed unsuccessfully. No other interaction with this instance is allowed. The script execution will terminate immediately.

  • omni.api (OmniscopeApi) the omniscope api instance)
  • message (str) – an optional message to be displayed in the block summary.

cancel(omni.api, message='')

Marks the custom block execution as completed unsuccessfully. No other interaction with this instance is allowed. Unlike abort() the script execution will proceed normally after this method has been called.

  • omni.api (OmniscopeApi) the omniscope api instance)
  • message (str) – an optional message to be displayed in the block summary.

close(omni.api, message='')

Marks the custom block execution as completed successfully. No other interaction with this instance is allowed. This is equivalent to commit()

  • omni.api (OmniscopeApi) the omniscope api instance)
  • message (str) – an optional message to be displayed in the block summary.

commit(omni.api, message='')

Marks the custom block execution as completed successfully. No other interaction with this instance is allowed.

  • omni.api (OmniscopeApi) the omniscope api instance)
  • message (str) – an optional message to be displayed in the block summary.

get.option(omni.api, option.name)

  • omni.api (OmniscopeApi) the omniscope api instance)
  • option.name (str) – the name of the parameter as defined in the block options.

Returns the value of the option with a given name or None if the option does not exist.


is.docker(omni.api)

Returns TRUE if the script is executing with the custom block execution environment set to 'DOCKER' in the Omniscope settings page. If TRUE, the script should assume it is running in its dedicated docker container.

  • omni.api (OmniscopeApi) the omniscope api instance)

read.input.records(omni.api, input.number=1)

  • omni.api (OmniscopeApi) the omniscope api instance)
  • input.number (int) – identifies which of the block input to read. The number is one-based. That is to input.number is one to read the first input.

Returns A data.frame containing all the data read from the block input.


update.message(omni.api, message)

Updates the block summary with a message

  • omni.api (OmniscopeApi) the omniscope api instance)
  • message (str) – the message to be displayed in the block summary in the block UI. Useful to display updates to the user as the block is executing.

write.output(omni.api, output.data, output)

Appends output.data to the output identified by output.

  • omni.api (OmniscopeApi) the omniscope api instance)
  • output.data (data.frame) – A data frame containing the data to be appended to the block output. The fields and fields types much be the same across different invocations.
  • output (str or int) – the the number or the name of the output the data should be appended to. If a number, it is zero based that is 1 append the data to the first output.

write.output.records(omni.api, output.data, output.number=1)

Appends output.data to the output identified by output.

  • omni.api (OmniscopeApi) the omniscope api instance)
  • output.data (data.frame) – a data frame containing the data to be appended to the block output. The fields and fields types much be the same across different invocations.
  • output (int) – the the number of the output the data should be appended to. It is one-based that is 1 to append the data to the first output.

process.stream(stream.processing.function)

Applies param stream_processing_function to every chunk of data streamed from the upstream block

  • stream.processing.function is used to transform input data chunks into output data chunks: the function will receive a data frame as input and it must produce a data frame as output. The function is not allowed to return N. The schema of the output data frame must remain the same across function calls. The column names and column types must be the same across invocations.

    N.B. param stream.processing.function is intended to be a stateless function, it is possible for the function to be invoked multiple times with the same input data, and the same data is expected in the output.

When this method returns, the block execution is over, and no other operation is allowed on the OmniscopeApi instance.

This method is allowed only if streaming apis are enabled in the block.



Python Api Reference


omniscope.api.OmniscopeApi

The OmniscopeApi class represents a custom block execution. The execution is modelled as a transaction:

  • The transaction is open when this object is created
  • Block options values can be read
  • Input records can be read from block inputs
  • Output records can be appended to block outputs
  • The transaction is closed/committed or cancelled

It is not possible to re-use the same transactions for multiple executions.


omniscope.api.OmniscopeApi()

Constructor for the omniscope.api.OmniscopeApi class. By default details on how to connect to the Omniscope Evo server are read from environment variables controlled by Omniscope Evo.


abort(message='')

Marks the custom block execution as completed unsuccessfully. No other interaction with this instance is allowed. The script execution will terminate immediately.

  • message (str) – an optional message to be displayed in the block summary.

cancel(message='')

Marks the custom block execution as completed unsuccessfully. No other interaction with this instance is allowed. Unlike abort() the script execution will proceed normally after this method has been called.

  • message (str) – an optional message to be displayed in the block summary.

close(message='')

Marks the custom block execution as completed successfully. No other interaction with this instance is allowed. This is equivalent to commit().

  • message (str) – an optional message to be displayed in the block summary.

commit(message='')

Marks the custom block execution as completed successfully. No other interaction with this instance is allowed.

  • message (str) – an optional message to be displayed in the block summary.

get_option(option_name)

  • option_name (str) – the name of the parameter as defined in the block options.

Returns the value of the option with a given name or None if the option does not exist.


is_docker()

Returns True if the script is executing with the custom block execution environment set to 'DOCKER' in the Omniscope settings page. If True, the script should assume it is running in its dedicated docker container.


read_input_records(input_number=0)

  • input_number (int) – identifies which of the block input to read. The number is zero-based. That is to input_number is zero to read the first input.

Returns A pandas.core.frame.DataFrame containing all the data read from the block input.


update_message(message)

Updates the block summary with a message.

  • message (str) – the message to be displayed in the block summary in the block UI. Useful to display updates to the user as the block is executing.

write_output(output_data, output)

Appends output_data to the output identified by output.

  • output_data (pandas.core.frame.DataFrame) – A data frame containing the data to be appended to the block output. The fields and fields types much be the same across different invocations.

  • output (str or int) – the the number or the name of the output the data should be appended to. If a number, it is zero based that is 1 append the data to the first output.


write_output_records(output_data, output_number=0)

Appends output_data to the output identified by output.

  • output_data (pandas.core.frame.DataFrame) – a data frame containing the data to be appended to the block output. The fields and fields types much be the same across different invocations.

  • output (int) – the the number of the output the data should be appended to. It is zero based that is 1 append the data to the first output.



process_stream(stream_processing_function)

Applies param stream_processing_function to every chunk of data streamed from the upstream block

  • stream_processing_function is used to transform input data chunks into output data chunks: the function will receive a pandas.DataFrame as input and it must produce a pandas.DataFrame as output. The function is not allowed to return None. The schema of the output data frame must remain the same across function calls. The column names and column types must be the same across invocations.

    N.B. param stream_processing_function is intended to be a stateless function, it is possible for the function to be invoked multiple times with the same input data, and the same data is expected in the output.

When this method returns, the block execution is over, and no other operation is allowed on the OmniscopeApi instance.

This method is allowed only if streaming apis are enabled in the block.