Custom block allows execution of arbitrary code as part of the Workflow execution. Currently there are bindings available for Python (https://www.python.org/) and R (https://www.r-project.org/) programming languages.


Executing a custom block consists of establishing a connection with the Omniscope instance, communicating with Omniscope by reading block inputs, option values and writing block output, finally shutting down the connection, or aborting the execution.


Setup


Communication with the Omniscope instance happens via an instance of the OmniscopeApi class.

Only one instance of this class is allowed per script execution. The instance must be disposed properly (see Shutting down: Commit/Cancel/Abort section)


The OmniscopeApi class is part of the omniscope.api library, which is installed automatically on first execution. 

Creating an OmniscopeApi object requires no parameters: 


Python

from omniscope.api import OmniscopeApi
omniscope_api = OmniscopeApi()



R

library(omniscope.api)
omni.api = omniscope.api()



Reading block inputs


The custom script can access data connected to block inputs. Data is read from upstream blocks and provided as a pandas.DataFrame instance (Python) or data.frame instances (R) depending on the language.


It is possible to specify which of the block inputs to read from via the input_number keyword argument (first input if left unspecified). If no block is connected to the selected input, the data will be None (Python) or NULL (R)


Reading the same input multiple times will produce the same data frame.



Python

from omniscope.api import OmniscopeApi
omniscope_api = OmniscopeApi()

# read the records associated to the first block input
first_block_input_data = omniscope_api.read_input_records(input_number=0)

# read the records associated to the second block input
second_block_input_data = omniscope_api.read_input_records(input_number=1)

if (second_input is None):
    # handle second input not connected to the block
   omniscope_api.abort(message="Second block input must  be connected!")

omniscope_api.commit()


R

library(omniscope.api)

omni.api = omniscope.api()

# read the records associated to the first block input
first.block.input.data =  read.input.records(omni.api, input.number=1)

# read the records associated to the second block input
second_block_input_data = read.input.records(omni.api, input.number=2)

if (isNull(second_block_input_data)) {
    # handle second input not connected to the block
   abort(omni.api, message="Second block input must  be connected!")
}

commit(omni.api)


Note that, according to the languages conventions, Python is zero-based, while R is 1 based.


Writing block outputs


The script can publish its results by writing block outputs. These records can be later processed using Omniscope/ visualised or passed as block inputs to other custom blocks. 

The custom block has two outputs: it is currently not possible to add more outputs. 


The method arguments are the data itself represented as a pandas.DataFrame (Python) or a data.frame instance (R), and an optional output index (by default records are published in the first block output)


Python

from omniscope.api import OmniscopeApi
import pandas as pd

omniscope_api = OmniscopeApi()

output_data = pd.DataFrame(data= {'Id': [1, 2], 'Name': ["Paul", "John"]})
omniscope_api.write_output_records(output_data, output_number=0)

omniscope_api.commit()



R

library(omniscope.api)

omni.api = omniscope.api()

output.data = data.frame("Id" = c(1,2), "Name" = c("Paul", "John"))
write.output.records(omni.api, output.data, output.number=1)

commit(omni.api)


Note that, according to the languages conventions, Python is zero-based, while R is 1 based.


Once data has been written (and the OmniscopeApi instance closed), data will be visible in the data tab, as well as accessible by downstream blocks.




Custom outputs

Starting from version 2020.4 b21066 it is possible to customise the number and labels of outputs in a custom block:


 

In the script, It is still possible to write data to these outputs by their index, but a new api has been introduced to allow write records via their IDs. For example, to write to the 'roc' output in Python:


omniscope_api.write_output(output_data, "roc")

The function call is equivalent to 

omniscope_api.write_output(output_data, 2)

or

omniscope_api.write.output.records(omni.api, output.data, 2)


Similarly using R:

write.output(omni.api, output.data, "roc")

which is equivalent to

write.output(omni.api, output.data, 3)

or 

write.output.records(omni.api, output.data, output.number=3)


Writing large datasets

Both  `write_ouput`  and `write_output_records` methods accept a dataframe as one of their arguments. The dataframe effectively represents the data to write on a certain output. In cases where the data to write is large however, it might not be possible to have the entire dataset represented as a single data frame loaded in memory.  


In custom block scripts, it is possible to write large datasets by calling the method 'write_output_recordsor write_output  multiple times, typically in a for loop, using multiple smaller DataFrames as "chunks" of data to write to Omniscope. 


Each time  the method 'write_output_recordsor 'write_output' the data frame passed in will be appended to the block output. As such the the number of columns, name of the columns, and types of the columns must not change between different invocations.


For example, consider a scenario where a big dataset that has been split in various parquet files. You can learn more about parquet file format here: https://parquet.apache.org/documentation/latest/ .
Assume that the dataset is too big to be fully loaded in memory, but it is not a problem to load a single parquet file at a time. Because each parquet file is a fragment of the same dataset, we know the columns will be the same in every file.

We can create a custom block to read the entire dataset by retrieve all the parquet files that we need to read, one then load file at a time, and writing its content to an Omniscope output.


This example uses Python, but similar result can be obtained using R. To read the parquet file we are using the pyarrow library py-arrow: https://pypi.org/project/pyarrow/


from omniscope.api import OmniscopeApi
import pyarrow.parquet as pq
import os


def is_parquet_file(filename, folder):
    return os.path.isfile(os.path.join(folder, filename)) and filename.split(".")[-1] == "parquet"

omniscope_api = OmniscopeApi()

# retrieve the folder where the parquet file are located from the options.
folder = omniscope_api.get_option("folder") 

# retrieve the list of parquet files
parquet_files = [f for f in os.listdir(folder) if is_parquet_file(f, folder)]

for parquet_file in parquet_files:
    # read each parquet file in a data frame
    df = pq.read_pandas(os.path.join(folder, parquet_file)).to_pandas()
    # append each dataframe to the Omniscope ouput named 'Output'
    omniscope_api.write_output(df, "Output")
omniscope_api.close()



The custom block is attached at the end of this article.


Reading Option values


The script can access the values of the options defined in the custom block. Option values are retrieved via get_option (Python) or get.option (R) methods by passing the Name of the option in the custom block. 


For example:



Your script can access the value of the option named "currency" option via 


Python 

from omniscope.api import OmniscopeApi
import pandas as pd

omniscope_api = OmniscopeApi()

currency = omniscope_api.get_option("currency")
if (currency == "USD"):
  pass #...
if (currency == "GBP"):
  pass #...
#...
omniscope_api.commit()



R

library(omniscope.api)

omni.api = omniscope.api()

input.data = read.input.records(omni.api, input.number=1)
# my.option = get.option(omni.api, "my_option")

currency = get.option(omni.api, "currency")

if(currency == "USD") {
  # ...
} else if (currency == "GBP") {
  # ...
}
#...
commit(omni.api)


The return type of the get_option method depends on the option type. For example, a TEXT option returns values as strings, a boolean option returns values that are either True or False.


More information about custom block options can be found here.

Shutting down: Commit/Cancel/Abort 


Instances of the OmniscopeApi represent a connection to a running Omniscope instance.  The instance must be disposed properly to notify Omniscope that new data is available, or that some error has occurred.


the commit method is used to mark the end of the script, and publish the data. Your script will continue running after a call to close, but it is not allowed to use the OmniscopeApi instance anymore. It is possible to pass an optional message argument that will be displayed in the block 


For example:


Python

from omniscope.api import OmniscopeApi
omniscope_api = OmniscopeApi()

#...

omniscope_api.commit(message="No anomalies found in the dataset")



R

library(omniscope.api)

omni.api = omniscope.api()

input.data = read.input.records(omni.api, input.number=1)

# my.option = get.option(omni.api, "my_option")


commit(omni.api, message="No anomalies found in the dataset")



It is also possible to mark the script execution as failed, and to disregard any data written up to that point.

One can do so via a call to cancel or abort methods. The difference between cancel and abort is that cancel will continue execution your script normally, whereas abort will trigger a system exit. 

Both methods will take an optional message displayed in the block error section


Python

from omniscope.api import OmniscopeApi

omniscope_api = OmniscopeApi()

# ...

omniscope_api.abort(message="Second block input not connected")


R

library(omniscope.api)

omni.api = omniscope.api()

# ...

abort(omni.api, "Second block input not connected")



N.B.


Any non zero return status returned from the interpreter execution (e.g., a crash or an uncaught exception) will be considered as a block execution error, and the published data will be disregarded. 

Logs


Standard outputs and Standard errors of your script will be attached to custom_block_executions_log.txt log file found in the logs folder in the omniscope-server folder. Standard outputs and Standard errors will be attached to the log file after the script has terminated its execution.


Python

from omniscope.api import OmniscopeApi
omniscope_api = OmniscopeApi()

print("Hello there")

omniscope_api.close()

R

library(omniscope.api)

omni.api = omniscope.api()

print("Hello there")

close(omni.api)



To monitor the file you can use command line tools (e.g., tail) or the Omniscope admin page

2020-03-18T16:45:57.221+0000 INFO  [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: execution started
2020-03-18T16:45:57.222+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: transaction id = 026377c28c4225490a745f1e31a3d0a6-Custom-f9db02b3-92a8-41f9-ba1f-e5db169d3d3b
2020-03-18T16:45:58.252+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: process execution completed successfully
2020-03-18T16:45:58.252+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: process std out: Hello there
2020-03-18T16:45:58.252+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: process std err:
2020-03-18T16:45:58.257+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: execution completed
2020-03-18T16:46:35.962+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: execution started
2020-03-18T16:46:35.962+0000 INFO [omniscope.custom_block_executions_log.txt] - [File.iox, Custom]: transaction id = 026377c28c4225490a745f1e31a3d0a6-Custom-583ee039-6b35-43ad-aec7-236fff74819b


It is possible to update the message displayed in the block bar by using the method update_message (Python) or update.message (R).


For example, while reading a sequence of files, we could display the file we are currently reading via


# ...
for parquet_file in parquet_files:
    # Update UI message
    omniscope_api.update_message("Reading "+str(parquet_file))
    # ...


In R:


# ...
for (val parquet.file in parquet.files) {
 update.message(omni.api, paste("Reading", "parquet.file", sep=" "))
 # ...



During execution It will be rendered as