Data Streaming Mode in R and Python Custom Blocks + Demos

Modified on Mon, 20 Dec, 2021 at 5:03 PM

What are Streaming Custom Blocks?


Streaming Custom Blocks don't process all input data at once, but work on smaller data chunks instead. Working with chunks not only serves to address the problem of how to process huge amounts of data, but also provides a way for Custom Block to be more responsive. This works for blocks which don't need to go through all data first, before sending data downstream. Once an upstream block provides some amount of data, the data can be immediately processed by the Custom Block and released downstream, where e.g. a report is able to directly visualise it or process it further.


This feature is available for Omniscope EVO 2021.3 b21351 and onward.


Differences to normal Custom Blocks


In order to use Streaming Custom Blocks, change Api mode in the advanced Custom Block options.


Normal Custom Blocks work in BATCH mode. They call read_input_records(), process the data and output it via write_output_records(). In the advanced options, their Api mode must be set to BATCH (the default).




Streaming Custom Blocks on the other hand cannot use these two methods. Instead, they use process_stream(handler), where handler is a user-defined method that accepts a data frame as input and returns a data frame as output. These data frames are the smaller data chunks we talked about earlier. Only one input and one output can be used this way. In these blocks the Api mode must be set to STREAMING.


Scenarios


We are going to look at two scenarios. 

In the first we use two Custom Blocks: one to train a model via SciKit-Learn and a streaming one to apply it on a bunch of data. 

In the second we use just one streaming Custom Block which downloads a model from HuggingFace and applies it to a stream of Tweets from the Twitter connector.


Scenario 1:  SciKit-Learn


The demo can be accessed and downloaded from here.


The first workflow contains two Custom Blocks. A normal one (in BATCH mode) to train the model and saves it to disk. The other one is a streaming custom block (in STREAMING mode) which loads the model and applies it to chunks of data entering the block. We use a Split data block to randomly select 30% of the data for training and the rest for prediction. 


In the "Train SVM Model" the SVM model is created and written to disk:

# Build the model
svm_model = svm.SVC(kernel='rbf', C=1, gamma="auto").fit(x, np.ravel(y))

# Write to disk
dump(svm_model, model_file)


In the Streaming Custom Block "Apply SVM Model" the model is read from disk and applied to the incoming data:

def handle_chunk(chunk):

    # New data to predict with
    new_x = chunk[fields_to_use]

    # Make predictions
    predictions = pandas.DataFrame(svm_model.predict(new_x), columns=['Prediction'])

    # Output the results
    output_data = pandas.concat([chunk, predictions], axis=1)
    
    return (output_data)
    
    
omniscope_api.process_stream(handle_chunk)



Scenario 2: HuggingFace Text Classification model - analysing BBC Twitter data


The demo can be accessed and downloaded from here.


In this scenario we use a model downloaded from HuggingFace. Contrary to the first scenario, we don't need to train anything, as the model is already fully trained. We use a text classification model to categorise tweets from, or about the BBC, into very general categories such as Sports, Climate, Business, Technology, Science, Entertainment, and Health. For this purpose we create a simple workflow using the Twitter Connector and a Streaming Custom Block.


A text classification model is loaded from HuggingFace:

from transformers import pipeline
classifier = pipeline("zero-shot-classification", model="valhalla/distilbart-mnli-12-3")


And then used to categorise incoming tweets:

def categorise(row):
    c = classifier(row[text_field], categories)
    scores = c["scores"]
    index = scores.index(max(scores))
    return (c["labels"][index])


def handle_chunk(chunk):
    chunk["Label"] = chunk.apply(lambda row: categorise(row), axis = 1)
    return (chunk)

omniscope_api.process_stream(handle_chunk)


Read more - Streaming APIs in Custom R and Python Blocks.






Was this article helpful?

That’s Great!

Thank you for your feedback

Sorry! We couldn't be helpful

Thank you for your feedback

Let us know how can we improve this article!

Select at least one of the reasons
CAPTCHA verification is required.

Feedback sent

We appreciate your effort and will try to fix the article