Streaming APIs in Custom R and Python Blocks

Modified on Mon, 20 Dec 2021 at 12:26 PM

When using read_input_records (Python) or read.input.records (R) api methods in custom block scripts, the data associated with each input is read in full and loaded into memory. This makes custom blocks unable to directly process large amount of data, or data which is streamed from a never-ending source.


When using streaming APIs it is possible to load and process data in chunks, without the need to load all the data at once.


Streaming APIs are a way to apply a stateless function to a stream of data. Omniscope will invoke the supplied function on each data chunk in the stream.


The rest of this section will explore how to use streaming APIs in custom blocks using the python language. R APIs are very similar, and you can find the complete references here https://help.visokio.com/a/solutions/articles/42000071109


You can find an example of streaming APIs in use here 

https://help.visokio.com/a/solutions/articles/42000091972


At the moment Streaming APIs are only supported for blocks with one input and one output. Furthermore, in order to use Streaming APIs the script must process each data chunk via a stateless function, transforming input data chunks to output data chunks (None/NULL values are not allowed in the output).


To enable streaming APIs open a custom block, navigate to the advanced option, and change the API mode from Batch to Streaming:



If the block is a new custom block, the script will change to a sample script using streaming APIs.


As mentioned before, streaming APIs are only supported for blocks with exactly one input and one output. If the block is configured to have more than one output, then it is necessary to remove the surplus output by clicking on the delete icon in the outputs section.


Also it should be noted that it is not possible to use read_input_records or write_output_recordswhen using streaming APIs. 


The first section of the custom script

from omniscope.api import OmniscopeApi
omniscope_api = OmniscopeApi()


is importing the Omniscope library, and creating an instance of OmniscopeApi: the object used to communicate to Omniscope. As a remainder only one instance of this class is allowed at the moment.


Next, the script provides the lambda function to the process_stream method. In this example the lambda function publishes the same data chunk it has received. 


The lambda function script transformation function  and it is responsible to transform each input data chunk to an output data chunk. When in python the function will receive a pandas.DataFrame as input and it must return a pandas.DataFrame (in R it is a data.frame


N.B. Omniscope assumes the function provided to the process_stream method is stateless, and it can invoke the function multiple times with the same input data, expecting the same output data.


# process the data stream, by applying a lambda function to each data chunk
omniscope_api.process_stream(lambda data: data)


Also It is required for the lambda function to produce outputs with a consistent schema (i.e., the same column and column types). If output data chunks with different schemas are emitted by the lambda function an error will be raised.


Because of the schema requirements, the lambda function is not allowed to return None. If for a particular input data chunk, the lambda function does not produce any record, then it will need to emit a data frame with the correct schema. There are several way of specifying a schema without any records. For example in  Python one can specify the schema of an empty data frame as:


df = pandas.DataFrame({'textCol':[], 'dateCol':[], 'doubleCol':[], 'intCol':[]})
df['textCol'] = df['textCol'].astype('str')
df['dateCol'] = df['dateCol'].astype('datetime64[ns]')
df['doubleCol'] = df['doubleCol'].astype('float64')"
df['intCol'] = df['intCol'].astype('int')


in R the same can be achieved via:


df = data.frame(
    textCol = character(),
    dateCol = as.Date(integer()),
    doubleCol = double(),
    intCol = integer()
)

     


Omniscope will take care of invoking the lambda supplied lambda function when needed during the workflow execution.

After the execution is over, the process_stream method will return.


At this point no other data can be read or written with the Omniscope APIs. The script should release any resources it has acquired during its execution, close the OmniscopeApi instance and terminate.


# the data stream is over. No other data can be read or written.
omniscope_api.close()

Was this article helpful?

That’s Great!

Thank you for your feedback

Sorry! We couldn't be helpful

Thank you for your feedback

Let us know how can we improve this article!

Select atleast one of the reasons
CAPTCHA verification is required.

Feedback sent

We appreciate your effort and will try to fix the article