DEV Community

Arjun Shetty
Arjun Shetty

Posted on

Streaming API

Streaming API's are something we need when we have a large set of data to be sent to the client. The analogy that I can think of is this.If I wanted to water my garden I have two options fetch water in a bucket and then get to my garden and plants water one by one. This is similar a rest api where garden is the client app and I need to wait on the server(water source) to fill my bucket and then return that filled data set to the client. Streaming API lets me connect a pipe from the water source to the garden so that I get water in realtime and I need not wait until the complete data is retrieved. It also save a lot server memory since there is no need to hold huge data volume in memory.

In this example I am trying to pull data from a database, since the volume of data is huge I will pull data in subsequent calls to the DB. Well there is trade off in this scenario since we have to make multiple calls to DB.

Lets create a streaming controller just like any rest api.

//Controller.java

@Autowired
private TaskExecutor taskExecutor;

//input query: SELECT * FROM TABLE1 WHERE ID_COLUMN > $INDEX LIMIT $BACTHSIZE
@PostMapping("/stream/query")
public ResponseBodyEmitter executeMdx(@RequestBody Input input) {

    final ResponseBodyEmitter emitter = new ResponseBodyEmitter();

    taskExecutor.execute(() -> {
        try {

            final String batchSize = "10";

            for (int i = 0; i < 10; i++) {
                input.setInputQuery(input.getInputQuery().replace("$INDEX", String.valueOf(i + 1))
                        .replace("$BATCHSIZE", batchSize));

                Connector c = _applicationContext.getBean(Connector.class, input);

                String[][] result = c.executeQuery(input);
                if (result == null)
                    break;

                for (String[] x : result) {
                    emitter.send(String.join(",", x));
                }
            }
            emitter.complete();
        } catch (Exception e) {
            emitter.completeWithError(e);
            e.printStackTrace();
        }
        emitter.complete();

    });

        return emitter;
}
Enter fullscreen mode Exit fullscreen mode

Our use case client applications asks for one row at a time with column comma seperated. This is very crude way. You can send binary of json as well using the emitter. Also note batchsize calculations can be manipulated dynamically depending on the requirement.

Taskexecutor bean is configured like this,

//ThreadConfig.java
@Configuration
public class ThreadConfig {
    @Bean
    public TaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(4);        
        executor.setThreadNamePrefix("task_executor_thread");
        executor.initialize();
        return executor;
    }
}
Enter fullscreen mode Exit fullscreen mode

Make sure teh beans you use are thread ready I mean in our case Connector implements Runnable and so is accessible through the Task Executor.

//Connector.java
@Service
@Scope("prototype")
public class Connector implements Runnable {

    @Autowired
    private Logger log;

    @Autowired
    private Grid grid;

    private Input _input;

    public Connector(Input input) {
        _input = input;
    }

    public String[][] executeQuery(Input input) {
        try {
            return grid.executeQuery(input);

        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }

        return null;
    }

    @Override
    public void run() {        
        executeQuery(_input);
    }
}
Enter fullscreen mode Exit fullscreen mode

To pass values to the thread we are using constructor arguments.

This sample should be able to atleast guide you with the things required to acheive the streaming API. Might not be the right way for all the usecases.

Now that we have created the server side streaming API we will need to consume it in our UI this way,

async function postData(url = '', data = {}) {

    var request = {
        method: 'POST',
        mode: 'cors',
        headers: {
            'Content-Type': 'application/json'
        },
        body: JSON.stringify(data)
    }

    await fetch('http://localhost:8080/stream/query', request)
        .then(response => {
            const reader = response.body.getReader()
            const utf8Decoder = new TextDecoder("utf-8");
            const stream = new ReadableStream({
                start(controller) {
                    function push() {
                        reader.read().then(({
                            done,
                            value
                        }) => {
                            if (done) {
                                controller.close();
                                return;
                            }

                            controller.enqueue(value);
                            let listItem = document.createElement('li');
                            listItem.textContent = utf8Decoder.decode(value);
                            console.log(listItem.textContent);
                            document.body.appendChild(listItem);
                            push();
                        });
                    };

                    push();
                }
            });

            return new Response(stream, {
                headers: {
                    "Content-Type": "text/html"
                }
            });
        });
}

postData('http://localhost:8080/stream/query', {    
    "inputQuery": "SELECT {[AUGW120]:[AUGW420],[AUG20],[Q3FY20]} ON COLUMNS, NON EMPTY CrossJoin ({[Fulfillment Region].children}, CrossJoin ( SUBSET({Descendants ([Product_Org],3)},$INDEX,$BATCHSIZE), {[CSR_Unit_Sales],[UNIT_Ships]}))ON ROWS FROM [LIBASOD2.NEW_DM] WHERE ([Version].[&CVersion],[Site].[Site],[Customer].[Customer], [Geo].[Geo],[Channel].[Channel],[Config].[Config])"
}).then(data => {
    console.log(data);
});
Enter fullscreen mode Exit fullscreen mode

References

Top comments (3)

Collapse
 
dabjazz profile image
Yash_Jaiswal

Thanks for the explanation. The analogy was great and easy to relate. Also did you use EJB here? I am familiar with core java now.
So I would like to ask you a piece of advice.
Can you tell me if this roadmap is correct for becoming java developer?
Core java->advanced java(jsp, servlets, jdbc)->maven/gradle->spring/spring boot/hibernate

Here '/' means or.

Collapse
 
xarjunshetty profile image
Arjun Shetty • Edited

You are on the right path, Once you are strong the core aspects of Java programming tools and framework will be a breeze to learn.

Spring boot is an easy pick to start with projects there is a lot abstractions around dont worry too much about it unless you hit a roadblock. Starting is what matters.

Couple of references.
baeldung
howtodoinjava

These sites will be a great tool in your arsenal while learning java.

Collapse
 
dabjazz profile image
Yash_Jaiswal

Thank you so much 🙏❤️.
I am learning html,css and javascript for now as a front-end development. When I will reach the backend part I will take Java roadmap. As you said Spring boot will be a good choice I will surely learn that. Thank you 🙏❤️