DEV Community

yangbongsoo
yangbongsoo

Posted on

Be careful of NullPointerException in EDA

In a Spring application, if messages are not exchanged directly but through Kafka based on an event-driven approach, special attention must be paid to avoiding NullPointerException.

Let's look at the code below.
In onStartOrder, the order process begins and publishes a point reservation command message to Kafka. Then, in onReservePoint, it receives and processes the point reservation command message.

EDA image1

The point reservation command message contains the pointReserveDto, but with a specification change, a new field became necessary. Thus, a newObject was created and additionally passed, and the modification was made in onReservePoint, which performs the point reservation, to retrieve and use it.

EDA image2

Here arises a problem. Existing point reservation command messages in Kafka do not contain the newObject in their pointReserveDto. Although the Spring application has been deployed with a new version, it still consumes old version messages and executes the code pointReserveDto.getNewObject().getData(), leading to a NullPointerException.

EDA image3

I have illustrated the entire process to provide a detailed explanation through code. It's beneficial to view the workflow and code together.
cf) The code for steps 5 and 6 has been omitted, as discussing persistence would make the explanation overly lengthy.

EDA image4

The starting point is the kafkaListener. When it receives a StartOrder command message from Kafka, it delegates the task to the orderService.

@Configuration
@RequiredArgsConstructor
public class KafkaListener {

    @Component
    @RequiredArgsConstructor
    public static class KafkaCommandHandler {
        private final OrderService orderService;

        public void onStartOrder(StartOrder startOrder) {
            this.orderService.startOrder(startOrder);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

In orderService.startOrder, a PointReserveDto is created, and due to a specification change, a NewObject is also created and included. It is then passed to the orderProcessor.

@Service
@RequiredArgsConstructor
public class OrderService {

    private final OrderProcessor orderProcessor;

    public void startOrder(StartOrder startOrder) {
        String orderId = startOrder.getOrderId();

        // create NewObject due to change the specification
        PointReserveDto pointReserveDto = PointReserveDto.builder()
            .newObject(
                NewObject.builder()
                    .data("newData")
                    .build()
            )
            .build();

        orderProcessor.processStartOrder(orderId, pointReserveDto);
    }
}
Enter fullscreen mode Exit fullscreen mode

In orderProcessor, business logic is executed. First, it retrieves the Order object from the database using the orderId received from the StartOrder command. Then, utilizing the Order object, it carries out the tasks related to starting the order. Upon completion, it calls order.applyOrderStarted to publish an event indicating that the StartOrder has finished.

@Component
public class OrderProcessor {
    public void processStartOrder(String orderId, PointReserveDto pointReserveDto) {
        Order order = getOrderById(orderId);

        // Do StartOrder process

        order.applyOrderStarted(pointReserveDto);
    }

    // In reality, it is necessary to query the database to retrieve the Order object.
    private Order getOrderById(String orderId) {
        return Order.builder().id(orderId).build();
    }
}
Enter fullscreen mode Exit fullscreen mode

cf) The main purpose is to explain the process leading to a NullPointerException, so we'll skip details, but it's worth noting that the structure of issuing events and commands from Order is a key point in an EDA.

Next, let's look at the following code. order.applyOrderStarted takes the received pointReserveDto, encapsulates it into an OrderStarted event, and adds it to the pendingEvents list. Lastly, through the apply method, it fills the Order object's pointReserveDto field.

Subsequently, it creates a ReservePoint command, encapsulates the pointReserveDto within it, and similarly adds it to the pendingCommands list. The remaining tasks involve publishing the messages in pendingEvents and pendingCommands to Kafka and saving the order in the database. This is the content for step 5 and 6, and the related code is omitted.

@Getter
@Builder
public class Order {
    String id;

    @Nullable
    PointReserveDto pointReserveDto;

    List<Event> pendingEvents = new ArrayList<>();
    List<Command> pendingCommands = new ArrayList<>();

    public void applyOrderStarted(PointReserveDto pointReserveDto) {
        this.pendAndApplyEvent(
            OrderStarted.builder()
                .orderId(this.getId())
                .pointReserveDto(pointReserveDto)
                .build(),
            this::apply
        );

        this.pendingCommands.add(
            ReservePoint.builder()
                .orderId(this.getId())
                .pointReserveDto(this.pointReserveDto)
                .build()
        );
    }

    private <T extends Event> void pendAndApplyEvent(T event, Consumer<T> apply) {
        this.pendingEvents.add(event);
        apply.accept(event);
    }

    private void apply(OrderStarted orderStarted) {
        this.pointReserveDto = orderStarted.getPointReserveDto();
    }
}
Enter fullscreen mode Exit fullscreen mode

Because the ReservePoint command was published above, the KafkaCommandHandler consumes it again through onReservePoint.

@Configuration
@RequiredArgsConstructor
public class KafkaListener {

    @Component
    @RequiredArgsConstructor
    public static class KafkaCommandHandler {
        private final OrderService orderService;

        public void onReservePoint(ReservePoint reservePoint) {
            this.orderService.reservePoint(reservePoint);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

In orderProcessor, attempting to extract the newObject.data value from the received pointReserveDto leads to encountering a NullPointerException.

@Component
public class OrderProcessor {
    public void processReservePoint(
        String orderId,
        PointReserveDto pointReserveDto
    ) {
        Order order = getOrderById(orderId);

        // Do ReservePoint process

        // Fixme: NPE !! pointReserveSource.getNewObject() is Null
        String newData = pointReserveDto.getNewObject().getData();
    }

    // In reality, it is necessary to query the database to retrieve the Order object.
    private Order getOrderById(String orderId) {
        return Order.builder().id(orderId).build();
    }
}
Enter fullscreen mode Exit fullscreen mode

Top comments (2)

Collapse
 
hebertrfreitas profile image
Hebert Freitas

Thanks for the post @yangbongsoo !
Going deep into the problem, looks like that you need a way to "versioning" message schemas. Specifically in kafka you can use the kafka schema registry. With this tool is possible register schemas and simplify the process of evolving a schema

Collapse
 
yangbongsoo profile image
yangbongsoo

@hebertrfreitas Thanks for your comment.
Our team decided not to use Kafka Schema Registry when setting up the initial architecture. Instead We use json message format.

The preferences of those leading the architecture design were reflected. There was no specific reason :)