Event hub is a big data ingestion offering from Microsoft, it leverages AMQP, HTTPS, and Apache Kafka under the hood. Event hub offers benefits like partitioning and check-pointing on the data stream, plus all the scalability your wallet can handle.
In this article I'll be going over how to setup a connection and send data to event hub using a spring boot web service. You can view the completed example here.
Setting up an Azure Account
If you haven't yet, you'll need to signup for an azure dev account. They hook you up with enough credit for what we need to do in this guide.
Creating the Event Hub Namespace
Log in to the azure portal and search for 'event hub' in the search box at the top. You'll want to select the 'Event Hubs' option.
From there click on 'Event Hubs', and click on the Add (+) icon.
Fill out the create form, for this example I'm using the barest possible settings. You'll probably need to create a resource group for this event hub as well.
Boom! We've got an event hub namespace setup. The namespace acts as an organizational directory for your created event hubs.
Creating the Event Hub
Navigate to your event hub namespace and click on the Event Hub Add (+) icon.
Give your event hub a snazzy name, scale the partition count as you need but for this guide I'm keeping mine at 2. Click create at the bottom of the form and we're good to go.
You should see a message saying that your event hub is getting created, once it's done, navigate to it from your namespace menu. You'll come upon a dashboard that shows all kind of sweet metrics, like throughput and message counts.
In the side menu, navigate over to 'Shared access policies' and click Add (+). Give it a name and give it manage access (which gives us read and write).
Creating a Shared Access Policy gives us the keys to the event hub castle. Click on the policy you just created and make a note of the connection string.
Sending Data to Event Hub
Adding in the event hub dependencies.
First things first, get those dependencies added to your pom file. As of this writing the latest versions of the dependencies are 3.0.0.
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>3.0.0</version>
</dependency>
Next, let's create a config class and create an EventHubClient.
Creating a client and hooking it up.
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.concurrent.Executors;
@Configuration
public class EventHubConfig {
@Value("${eventHub.connectionString}")
private String connectionString;
@Bean
public EventHubClient setupEventHubConnection() throws IOException, EventHubException {
return EventHubClient.createFromConnectionStringSync(connectionString,
Executors.newSingleThreadScheduledExecutor());
}
}
In this class I'm pulling in the connectionString from a configuration file (in my case the application's yml file), injecting into the config class, then passing it as-is to the createFromConnectionStringSync method, which takes in a connection string and a ScheduledExecutorService object. Since we're not doing anything fancy here I'm using a single threaded executor.
Here's how I've defined my yml file.
eventHub:
connectionString: 'connections string here'
Now that we have an Event Hub Client bean, let's go ahead and create a service component that uses it.
import com.dublin.eventhub.demo.controller.Controller;
import com.dublin.eventhub.demo.model.EventPayload;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.SerializationUtils;
import java.util.Objects;
@Service
public class EventHubService {
private final EventHubClient eventHubClient;
private Logger log = LoggerFactory.getLogger(Controller.class);
@Autowired
public EventHubService(EventHubClient eventHubClient) {
this.eventHubClient = eventHubClient;
}
public void sendEvent(EventPayload test) {
byte[] bytes = SerializationUtils.serialize(test);
log.info("Sending message to the event hub {}", eventHubClient.getEventHubName());
eventHubClient.send(EventData.create(Objects.requireNonNull(bytes)), test.toString());
}
}
We'll use constructor injection to inject the client into our service, from there I define the sendEvent method which will use the client to send the data.
To send data to event hub, we need to serialize the message into a byte array, wrap it in an EventData object, then pass it to the client's send method.
Creating a data class
For this example I've defined a simple data class.
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class EventPayload implements Serializable {
private String firstName;
private String lastName;
private String favoriteFood;
}
It's important to note that the data you're working with for event hub needs to be Serializable, so make sure to implement Serializable on the data class you're working with, otherwise you'll get a java.lang.IllegalArgumentException: Failed to deserialize object message.
Next, we'll define an endpoint to post our data to.
Building an Endpoint
import com.dublin.eventhub.demo.model.EventPayload;
import com.dublin.eventhub.demo.service.EventHubService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
public class Controller {
private final EventHubService eventHubService;
private Logger log = LoggerFactory.getLogger(Controller.class);
@Autowired
public Controller(EventHubService eventHubService) {
this.eventHubService = eventHubService;
}
@PostMapping(path = "/eventhub/send")
public ResponseEntity sendEvent(@RequestBody EventPayload payload) {
try {
log.info("Eventhub send endpoint called, sending {} to event hub..", payload.toString());
eventHubService.sendEvent(payload);
} catch (Exception e) {
log.error("An error arose sending a message to event hub: " + e);
return new ResponseEntity<Exception>(HttpStatus.INTERNAL_SERVER_ERROR);
}
return new ResponseEntity(HttpStatus.OK);
}
}
I've created the POST method "/eventhub/send" and defined the request body to be our data class. From there it just sends to the service.
Once we have all of that hooked up we should be able to run our application and see it successfully connect to event hub.
2019-09-24 13:42:21.670 INFO 16113 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.24]
2019-09-24 13:42:21.734 INFO 16113 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2019-09-24 13:42:21.734 INFO 16113 --- [ main] o.s.web.context.ContextLoader : Root WebApplicationContext: initialization completed in 1063 ms
2019-09-24 13:42:21.983 INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.MessagingFactory : messagingFactory[MF_41d4fd_1569350541964], hostName[dublin-rest-demo.servicebus.windows.net], info[starting reactor instance.]
2019-09-24 13:42:21.999 INFO 16113 --- [pool-1-thread-1] c.m.azure.eventhubs.impl.ReactorHandler : name[MF_41d4fd_1569350541964] reactor.onReactorInit
2019-09-24 13:42:22.002 INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.ConnectionHandler : onConnectionInit hostname[dublin-rest-demo.servicebus.windows.net], connectionId[MF_41d4fd_1569350541964]
2019-09-24 13:42:22.003 INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.ConnectionHandler : onConnectionLocalOpen hostname[dublin-rest-demo.servicebus.windows.net:5671], connectionId[MF_41d4fd_1569350541964], errorCondition[null], errorDescription[null]
2019-09-24 13:42:22.101 INFO 16113 --- [pool-1-thread-1] c.m.a.eventhubs.impl.ConnectionHandler : onConnectionBound hostname[dublin-rest-demo.servicebus.windows.net], connectionId[MF_41d4fd_1569350541964]
2019-09-24 13:42:23.157 INFO 16113 --- [pool-1-thread-4] c.m.a.eventhubs.impl.ConnectionHandler : onConnectionRemoteOpen hostname[dublin-rest-demo.servicebus.windows.net:5671], connectionId[MF_41d4fd_1569350541964], remoteContainer[100db877ccad41b1a689c5a458bf1fbc_G6]
2019-09-24 13:42:23.268 INFO 16113 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2019-09-24 13:42:23.449 INFO 16113 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 2 endpoint(s) beneath base path '/actuator'
2019-09-24 13:42:23.491 INFO 16113 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-09-24 13:42:23.495 INFO 16113 --- [ main] com.dublin.eventhub.demo.Application : Started Application in 3.126 seconds (JVM running for 3.493)
Now that we're all setup, let's send some data to it. Pull up your favorite HTTP client and post some data to it. For this example I'm using Insomnia.
And in the logs I should see that the message got sent to event hub successfully!
2019-09-24 13:43:25.806 INFO 16113 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller : Eventhub send endpoint called, sending EventPayload(firstName=Johnny, lastName=Carson, email=null, favoriteFood=Potatoes and Molasses) to event hub..
2019-09-24 13:43:25.808 INFO 16113 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller : Sending message to the event hub event-hub-test
We did it, we're serializing and sending data to event hub. 😁
Recap
In this guide I walked through creating an event hub in Azure, setting up an event hub client and service, along with an endpoint to post data to.
In part 2 I'll walk through consuming events.
I hope this guide helps, let me know what you think in the comments.
Top comments (0)