DEV Community

Semyon Kirekov
Semyon Kirekov

Posted on

Apache Spark, Hive, and Spring Boot — Testing Guide

Big Data is trending. The companies have to operate with a huge amount of data to compete with others. For example, this information is used to show you the relevant advertisements and recommend you the services that you may find interesting. The problem with Big Data software systems is their complexity. Testing becomes tough. How could you verify the app behaviour locally when it's tuned to connect to the HDFS cluster?

In this article, I'm showing you how to create a Spring Boot app that loads data from Apache Hive via Apache Spark to the Aerospike Database. More than that, I'm giving you a recipe for writing integration tests for such scenarios that can be run either locally or during the CI pipeline execution. The code examples are taken from this repository.

Article cover

Firstly, let's get over some basic concepts of the Big Data stack we're using. Don't worry, it won't take long. But it's necessary to understand the core idea.

Basics of HDFS

HDFS (Hadoop Distributed File System) is a distributed file system designed to run on many physical servers. So, a file in HDFS is an abstraction that hides the complexity of storing and replicating the data between multiple nodes. Why do we need HDFS? There are some reasons.

Hardware Failures

Hard disk drives crash. That's the reality we have to deal with. If a file is split between multiple nodes, individual failures won't affect the whole data. Besides, data is replicated in HDFS. So, even after a disk crash, the information can be restored from the other sources.

Really Large Files

HDFS allows building of a network of not so powerful machines into a huge system. For example, if you have 100 nodes with 1TB disk storage on each one, then you possess 100TB of HDFS space. If the replication factor equals 3, it's possible to store a single file with a size of 33TB.

Not to mention that lots of local file systems do not support so large files, even if you have the available disk space.

The Speed of Reading

If you read the file sequentially, it will take you N. But if the file is split into 10 chunks between 10 nodes, you can get its content in N/10 time! Because each node can read the chunk in parallel. So, HDFS is not only about safety. It's about swiftness.

We have omitted the time spend on network communications. But if files are huge, this part is just a fraction.

Basics of Apache Hive

Apache Hive is the database facility running over HDFS. It allows querying data with HQL (SQL-like language).

Regular databases (e.g. PostgreSQL, Oracle) act as an abstraction layer over the local file system. While Apache Hive acts as an abstraction over HDFS. That's it.

Basics of Apache Spark

Apache Spark is a platform for operating and transforming huge amounts of data. The key idea is that Apache Spark workers run on multiple nodes and store the intermediate results in RAM. It's written in Scala but it also supports Java and Python. Take a look at the schema below. It's the common representation of the Apache Spark batch job.

Apache Spark Architecture

Apache Spark loads data from Data Producer, proceeds some operations on it, and puts the result to Data Consumer (in our case, Apache Hive is data producer and Aerospike is data consumer). Apache Spark application is a regular .jar file that contains the transformation logic. Take a look at the example below.

JavaRDD<String> textFile = sc.textFile("hdfs://raw_data.txt");
JavaPairRDD<String, Integer> counts = textFile
    .flatMap(s -> Arrays.asList(s.split(" ")).iterator())
    .mapToPair(word -> new Tuple2<>(word, 1))
    .reduceByKey((a, b) -> a + b);
counts.saveAsTextFile("hdfs://words_count.txt");
Enter fullscreen mode Exit fullscreen mode

It's a simple word-count application. Firstly, we load the content of the raw_data.txt HDFS file. Then we split each line by " ", assign 1 for every word, and reduce the result by words to summarize the whole numbers. Then the obtained pairs are saved to word_count.txt.

The flow is similar to Java Stream API. The difference is that every lambda expression is executed on the workers. So, Spark transfers the code to the remote machines, performs the calculation, and returns the obtained results. If we owe a sufficient number of workers, we can proceed with the amount of data that is measured by terabytes or even zettabytes.

The Apache Spark approach of delivering code to data has some drawbacks. We'll discuss it when we get to the development.

Another important aspect is laziness. Just like Stream API, Apache Spark does not start any calculations until terminal operation invocation. In this case, reduceByKey is the one. The rest operations build the pipeline rules but do not trigger anything.

Build Configuration

Let's start the development process. Firstly, we need to choose the Java version. At the moment of writing the latest stable Apache Spark release is 3.2.1. It supports Java 11. So, we gonna use it.

Currently Apache Spark does not support Java 17. Make sure you don't use it for running integration tests. Otherwise, you'll get bizarre error messages.

The project is bootstrapped with Spring Initializr. Nothing special here. But the dependencies list should be clarified.

Dependencies Resolution

ext {
    set('testcontainersVersion', '1.16.2')
    set('sparkVersion', '3.2.1')
    set('slf4jVersion', '1.7.36')
    set('aerospikeVersion', '5.1.11')
}

dependencies {
    annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
    implementation('org.springframework.boot:spring-boot-starter-validation') {
        exclude group: 'org.slf4j'
    }
    implementation("com.aerospike:aerospike-client:${aerospikeVersion}") {
        exclude group: 'org.slf4j'
    }
    compileOnly "org.apache.spark:spark-core_2.13:${sparkVersion}"
    compileOnly "org.apache.spark:spark-hive_2.13:${sparkVersion}"
    compileOnly "org.apache.spark:spark-sql_2.13:${sparkVersion}"
    compileOnly "org.slf4j:slf4j-api:${slf4jVersion}"

    testImplementation 'org.apache.derby:derby'
    testImplementation "org.apache.spark:spark-core_2.13:${sparkVersion}"
    testImplementation "org.apache.spark:spark-hive_2.13:${sparkVersion}"
    testImplementation "org.apache.spark:spark-sql_2.13:${sparkVersion}"
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation "org.slf4j:slf4j-api:${slf4jVersion}"
    testImplementation 'org.codehaus.janino:janino:3.0.8'
    testImplementation 'org.testcontainers:junit-jupiter'
    testImplementation 'org.awaitility:awaitility:4.2.0'
    testImplementation 'org.hamcrest:hamcrest-all:1.3'
}
Enter fullscreen mode Exit fullscreen mode

Core Dependencies

First comes Apache Spark dependencies. The spark-core artefact is the root. The spark-hive enables data retrieving from Apache Hive. And the spark-sql dependency gives us the ability to query data from Apache Hive with SQL usage.

Note that all the artefacts have to share the same version (in our case, it is 3.2.1). As a matter of fact, the Apache Spark dependencies' version should match the one that runs the production cluster in your company.

All Spark dependencies have to be marked as compileOnly. It means that they won't be included in the assembled .jar file. Apache Spark will provide the required dependencies in runtime. If you include them as implementation scope, that may lead to hard-tracking bugs during execution.

Then we have aerospike-client dependency. You have probably noticed that the org.slf4j group is excluded everywhere and included as a compileOnly dependency as well. We'll talk about this later when we get to the Apache Spark logging facility.

Test Dependencies

And finally, here comes test scoped artefacts. Apache Spark ones are included as testImplementation. Because integration tests will start the local Spark node. So, they are required during the runtime. The slf4j-api is also the runtime dependency. Testcontainers will be used to run the Aerospike instance. The janino is required by Apache Spark during the job execution. And we need Apache Derby to tune Apache Hive for local running. We'll get to this point soon.

Logging Configuration

Apache Spark applies log4j with the slf4j wrapper. But the default Spring Boot logger is logback. This setup leads to exceptions during Spring context initializing due to multiple logging facilities present in the classpath. The easiest way to solve it is to exclude all auto-configured Spring Boot logging features. That's not a big deal. Anyway, Apache Spark provides its own slf4j implementation during the runtime. So, we just need to include this dependency as compileOnly. That is sufficient.

Excluding logback from the Spring Boot project is easy with Gradle. Take a look at the example below.

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }
    all {
        exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging'
        exclude group: 'org.springframework.boot', module: 'snakeyaml'
    }
}
Enter fullscreen mode Exit fullscreen mode

Possible application.yml issues

The snakeyml exclusion requires special attention. Spring Boot uses the library to parse properties from .yml files (i.e. application.yml). Some Apache Spark versions use the same library for internal operations. The thing is that the versions required by Spring Boot and Apache Spark differ. If you exclude it from Spring Boot dependency and rely on the one provided by Apache Spark, you will face the NoSuchMethodError (Spring Boot invokes the method that is absent in the version provided by Apache Spark). So, I would recommend sticking with the .properties format and removing Spring Boot YAML auto-configuration. That will help you to avoid unnecessary difficulties. Take a look at the code example below.

@SpringBootApplication(exclude = {GsonAutoConfiguration.class})
public class SparkBatchJobApplication {
    public static void main(String[] args) {
        SpringApplication.run(SparkBatchJobApplication.class, args);
    }
}
Enter fullscreen mode Exit fullscreen mode

Fat Jar

The result .jar is going to submitted to Apache Spark cluster (e.g. spark-submit command). So, it should contain all runtime artefacts. Unfortunately, the standard Spring Boot packaging does not put the dependencies in the way Apache Spark expects it. So, we'll use shadow-jar Gradle plugin. Take a look at the example below.

plugins {
    id 'org.springframework.boot' version '2.6.3'
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'java'
    id 'com.github.johnrengelman.shadow' version '2.0.4'
}

...

shadowJar {
    zip64 true
    mergeServiceFiles()
    append 'META-INF/spring.handlers'
    append 'META-INF/spring.schemas'
    append 'META-INF/spring.tooling'
    transform(PropertiesFileTransformer) {
        paths = ['META-INF/spring.factories']
        mergeStrategy = "append"
    }
}
Enter fullscreen mode Exit fullscreen mode

Now we can run all tests and build the artefact with the ./gradlew test shadowJar command.

Starting Development

Now we can get to the development process.

Apache Spark Configuration

We need to declare JavaSparkContext and SparkSession. The first one is the core Apache Spark for all operations. Whilst SparkSession is the part of spark-sql projects. It allows us to query data with SQL (which is quite handy for Apache Hive). Take a look at the Spring configuration below.

@Configuration
public class SparkConfig {
    @Value("${spring.application.name}")
    private String appName;

    @Bean
    @Profile(LOCAL)
    public SparkConf localSparkConf() throws IOException {
        final var localHivePath = Files.createTempDirectory("hiveDataWarehouse");
        FileSystemUtils.deleteRecursively(localHivePath);
        return new SparkConf()
            .setAppName(appName)
            .setMaster("local")
            .set("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:local;create=true")
            .set("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver")
            .set("hive.stats.jdbc.timeout", "80")
            .set("spark.ui.enabled", "false")
            .set("spark.sql.session.timeZone", "UTC")
            .set("spark.sql.catalogImplementation", "hive")
            .set("spark.sql.warehouse.dir", localHivePath.toAbsolutePath().toString());
    }

    @Bean
    @Profile(PROD)
    public SparkConf prodSparkConf() {
        return new SparkConf()
            .setAppName(appName);
    }

    @Bean
    public JavaSparkContext javaSparkContext(SparkConf sparkConf) {
        return new JavaSparkContext(sparkConf);
    }

    @Bean
    public SparkSession sparkSession(JavaSparkContext sparkContext) {
        return SparkSession.builder()
            .sparkContext(sparkContext.sc())
            .config(sparkContext.getConf())
            .enableHiveSupport()
            .getOrCreate();
    }
}
Enter fullscreen mode Exit fullscreen mode

SparkConf defines configuration keys for the Apache Spark job. As you have noticed, there are two beans for different Spring profiles. LOCAL is used for integration testing and PROD is applied in the production environment. The PROD configuration does not declare any properties because usually they are passed as command-line arguments in the spark-submit shell script.

On the contrary, the LOCAL profile defines a set of default properties required for proper running. Here are the most important ones.

  1. setMaster("local") tells Apache Spark to start a single local node.
  2. javax.jdo.option.ConnectionURL and javax.jdo.option.ConnectionDriverName declare the JDBC connection for Apache Hive meta-storage. That's why we added Apache Derby as the project dependency
  3. spark.sql.catalogImplementation means that local files shall be stored in the Apache Hive compatible format
  4. spark.sql.warehouse.dir is the directory for storing Apache Hive data. Here we're using temporary directory.

JavaSparkContext accepts the defined SparkConf as the constructor arguments. Meanwhile SparkSession wraps the existing JavaSparkContext. Note that Apache Hive support should be enabled manually (enableHiveSupport).

Creating Apache Hive Tables

When we submit an application to the production Apache Spark cluster, we probably won't need to create any Apache Hive tables. Most likely the tables have already been created by someone else. And our goal is to select rows and transfer the data to another storage. But when we run integration tests locally (or in the CI environment), there are no tables by default. So, we need to create them somehow.

In this project, we're working with one table - media.subscriber_info. It consists of two columns. MSISDN (phone number) and some subscriber ID.

Before each test run, we have to delete previous data and add new rows to ensure verifying rules' consistency. The easiest way to achieve it is to declare scripts for table creation and dropping. We'll keep them in the resources directory. Take a look at the structure below.

Resources Directory Structure

V1_media.hql

Creates media database if it's absent.

create database if not exists media
Enter fullscreen mode Exit fullscreen mode

V2__media.subscriber_info.hql

Creates subscriber_info table if it's absent.

create table if not exists media.subscriber_info (
  subscriber_id string,
  msisdn string
)
row format delimited
fields terminated by ','
lines terminated by '\n'
stored as textfile
Enter fullscreen mode Exit fullscreen mode

DROP V1__mediatv_dds.subscriber_info.hql

Drops the subscriber_info table.

drop table if exists media.subscriber_info
Enter fullscreen mode Exit fullscreen mode

V[N] prefixes are not obligatory. I put them to ensure that each new table script will be executed as the last one. It is helpful to make tests work deterministically.

OK, now we need a handler to process those HQL queries. Take a look at the example below.

@Component
@Profile(LOCAL)
public class InitHive {
    private final SparkSession session;
    private final ApplicationContext applicationContext;

    public void createTables() {
        executeSQLScripts(getResources(applicationContext, "classpath:hive/ddl/create/*.hql"));
    }

    public void dropTables() {
        executeSQLScripts(getResources(applicationContext, "classpath:hive/ddl/drop/*.hql"));
    }

    private void executeSQLScripts(Resource[] resources) {
        for (Resource resource : resources) {
            session.sql(readContent(resource));
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The first thing to notice is @Profile(LOCAL) usage. Because we don't need to create or drop tables in the production environment.

The createTables and dropTables methods provide the list of resources containing the required queries.

getResources is the utility function that reads files from the classpath. You can discover the implementation here.

So, now we're ready to write the business code!

Business Code

Facade

The core interface is EnricherService

public interface EnricherService {
    void proceedEnrichment();
}
Enter fullscreen mode Exit fullscreen mode

We're expecting that it might have many implementations. Each one represent a step in whole batch process.

Then we have EnricherServiceFacade that encapsulates all implementations of EnricherService and run them one by one.

@Service
public class EnricherServiceFacade {
    private final List<EnricherService> enricherServices;

    public void proceedEnrichment() {
        List<EnrichmentFailedException> errors = new ArrayList<>();
        for (EnricherService service : enricherServices)
            try {
                service.proceedEnrichment();
            } catch (Exception e) {
                errors.add(new EnrichmentFailedException("Unexpected error during enrichment processing", e));
            }
        if (!errors.isEmpty()) {
            throw new EnrichmentFailedException(errors);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

We're trying to run every provided enrichment step. If any of them fails, we throw the exception that combines all errors into a solid piece.

Finally, we need to tell Spring to execute EnricherServiceFacade.proceedEnrichment on application startup. We could add it directly to the main method, but it's not the Spring way. Therefore, it makes testing harder. The better option is @EventListener.

@Component
@Profile(PROD)
public class MainListener {
    private final EnricherServiceFacade enricherServiceFacade;

    @EventListener
    public void proceedEnrichment(ContextRefreshedEvent event) {
        final long startNano = System.nanoTime();
        LOG.info("Starting enrichment process");
        try {
            enricherServiceFacade.proceedEnrichment();
            LOG.info("Enrichment has finished successfully. It took " + Duration.ofNanos(System.nanoTime() - startNano));
        } catch (Exception e) {
            String err = "Enrichment has finished with error. It took " + Duration.ofNanos(System.nanoTime() - startNano);
            LOG.error(err, e);
            throw new EnrichmentFailedException(err, e);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The proceedEnrichment method is being invoked, when the Spring context is started. By the way, only the active PROD profile will trigger the job.

EnricherService Implementation

We're going to deal with a single EnricherService implementation. It simply selects all rows from the media.subcriber_info table and puts the result in the Aerospike database. Take a look at the code snippet below.

@Service
public class SubscriberIdEnricherService implements EnricherService, Serializable {
    private static final long serialVersionUID = 10L;

    private final SparkSession session;
    private final AerospikeProperties aerospikeProperties;

    @Override
    public void proceedEnrichment() {
        Dataset<Row> dataset = session.sql(
            "SELECT subscriber_id, msisdn FROM media.subscriber_info " +
                "WHERE msisdn IS NOT NULL AND subscriber_id IS NOT NULL"
        );
        dataset
            .foreachPartition(
                iterator -> {
                    final var aerospikeClient = newAerospikeClient(aerospikeProperties);
                    iterator.forEachRemaining(row -> {
                        String subscriberId = row.getAs("subscriber_id");
                        String msisdn = row.getAs("msisdn");
                        Key key = new Key("my-namespace", "huawei", subscriberId);
                        Bin bin = new Bin("msisdn", msisdn);
                        try {
                            aerospikeClient.put(null, key, bin);
                            LOG.info("Record has been successfully added {}", key);
                        } catch (Exception e) {
                            LOG.error("Fail during inserting record to Aerospike", e);
                        }
                    });
                }
            );
    }
}
Enter fullscreen mode Exit fullscreen mode

There are multiple points that has to be clarified.

Serialization

Apache Spark applies a standard Java serialization mechanism. So, any dependencies used inside lambdas (map, filter, groupBy, forEach, etc.) have to implement the Serializable interface. Otherwise, you'll get the NotSerializableException during the runtime.

We have a reference to AerospikeProperties inside the foreachPartition callback. Therefore, this class and the SubscriberIdEnricherService itself should be allowed for serializing (because the latter one keeps AerospikeProperties as a field). If a dependency is not used within any Apache Spark lambda, you can mark it as transient.

And finally, the serialVersionUID manual assignment is crucial. The reason is that Apache Spark might serialize and deserialize the passed objects multiple times. And there is no guarantee that each time auto-generated serialVersionUID will be the same. It can be a reason for hard-tracking floating bugs. To prevent this you should declare serialVersionUID by yourself.

The even better approach is to force the compiler to validate the serialVersionUID field presence on any Serializable classes. In this case, you need to mark -Xlint:serial warning as an error. Take a look at the Gradle example.

tasks.withType(JavaCompile) {
    options.compilerArgs << "-Xlint:serial" << "-Werror" 
}
Enter fullscreen mode Exit fullscreen mode

Aerospike Client Instantiation

Unfortunately, the Java Aerospike client does not implement the Serializable interface. So, we have to instantiate it inside the lambda expression. In that case, the object will be created on a worker node directly. It makes serialization redundant.

I should admit that Aerospike provides Aerospike Connect Framework that allows transferring data via Apache Spark in a declarative way without creating any Java clients. Anyway, if you want to use it, you have to install the packed library to the Apache Spark cluster directly. There is no guarantee that you'll have such an opportunity in your situation. So, I'm omitting this scenario.

Partitioning

The Dataset class has the foreach method that simply executes the given lambda for each present row. However, if you initialize some heavy resource inside that callback (e.g. database connection), the new one will be created for every row (in some cases there might billions of rows). Not very efficient, isn't it?

The foreachPartition method works a bit differently. Apache Spark executes it once per the Dataset partition. It also accepts Iterator<Row> as an argument. So, inside the lambda, we can initialize "heavy" resources (e.g. AerospikeClient) and apply them for calculations of every Row inside the iterator.

The partition size is calculated automatically based on the input source and Apache Spark cluster configuration. Though you can set it manually by calling the repartition method. Anyway, it is out of the scope of the article.

Testing

Aerospike Setup

OK, we've written some business code. How do we test it? Firstly, let's declare Aerospike setup for Testcontainers. Take a look at the code snippet below.

@ContextConfiguration(initializers = IntegrationSuite.Initializer.class)
public class IntegrationSuite {
    private static final String AEROSPIKE_IMAGE = "aerospike/aerospike-server:5.6.0.4";

    static class Initializer implements
        ApplicationContextInitializer<ConfigurableApplicationContext> {

        static final GenericContainer<?> aerospike =
            new GenericContainer<>(DockerImageName.parse(AEROSPIKE_IMAGE))
                .withExposedPorts(3000, 3001, 3002)
                .withEnv("NAMESPACE", "my-namespace")
                .withEnv("SERVICE_PORT", "3000")
                .waitingFor(Wait.forLogMessage(".*migrations: complete.*", 1));

        @Override
        public void initialize(
            ConfigurableApplicationContext applicationContext) {

            startContainers();
            aerospike.followOutput(
                new Slf4jLogConsumer(LoggerFactory.getLogger("Aerospike"))
            );
            ConfigurableEnvironment environment =
                applicationContext.getEnvironment();

            MapPropertySource testcontainers = new MapPropertySource(
                "testcontainers",
                createConnectionConfiguration()
            );

            environment.getPropertySources().addFirst(testcontainers);
        }

        private static void startContainers() {
            Startables.deepStart(Stream.of(aerospike)).join();
        }

        private static Map<String, Object> createConnectionConfiguration() {
            return Map.of(
                "aerospike.hosts",
                Stream.of(3000, 3001, 3002)
                    .map(port -> aerospike.getHost() + ":" + aerospike.getMappedPort(port))
                    .collect(Collectors.joining(","))
            );
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The IntegrationSuite class is used as the parent for all integration tests. The IntegrationSuite.Initializer inner class is used as the Spring context initializer. The framework calls it when all properties and bean definitions are already loaded but no beans have been created yet. It allows us to override some properties during the runtime.

We declare the Aerospike container as GenericContainer because the library does not provide out-of-box support for the database. Then inside the initialize method we retrieve the container's host and port and assign them to the aerospike.hosts property.

Apache Hive Utilities

Before each test method we are suppose to delete all data from Apache Hive and add new rows required for the current scenario. So, tests won't affect each other. Let's declare a custom test facade for Apache Hive. Take a look at the code snippet below.

@TestComponent
public class TestHiveUtils {
    @Autowired
    private SparkSession sparkSession;
    @Autowired
    private InitHive initHive;

    public void cleanHive() {
        initHive.dropTables();
        initHive.createTables();
    }

    public <T, E extends HiveTable<T>> E insertInto(Function<SparkSession, E> tableFunction) {
        return tableFunction.apply(sparkSession);
    }
}
Enter fullscreen mode Exit fullscreen mode

There are just two methods. The cleanHive drops all existing and creates them again. Therefore, all previous data is erased. The insertInto is tricky. It serves the purpose of inserting new rows to Apache Hive in a statically typed way. How is that done? First of all, let's inspect the HiveTable<T> interface.

public interface HiveTable<T> {
    void values(T... t);
}
Enter fullscreen mode Exit fullscreen mode

As you see, it's a regular Java functional interface. Though the implementations are not so obvious.

public class SubscriberInfo implements HiveTable<SubscriberInfo.Values> {
    private final SparkSession session;

    public static Function<SparkSession, SubscriberInfo> subscriberInfo() {
        return SubscriberInfo::new;
    }

    @Override
    public void values(Values... values) {
        for (Values value : values) {
            session.sql(format(
                    "insert into %s values('%s', '%s')",
                    "media.subscriber_info",
                    value.subscriberId,
                    value.msisdn
                )
            );
        }
    }

    public static class Values {
        private String subscriberId = "4121521";
        private String msisdn = "88005553535";

        public Values setSubscriberId(String subscriberId) {
            this.subscriberId = subscriberId;
            return this;
        }

        public Values setMsisdn(String msisdn) {
            this.msisdn = msisdn;
            return this;
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

The class accepts SparkSession as a constructor dependency. The SubscriberInfo.Values are the generic argument. The class represents the data structure containing values to insert. And finally, the values implementation performs the actual new row creation.

The key is the subscriberInfo static method. What's the reason to return Function<SparkSession, SubscriberInfo>? Its combination with TestHiveUtils.insertInto provides us with statically typed INSERT INTO statement. Take a look at the code example below.

hive.insertInto(subscriberInfo())
    .values(
        new SubscriberInfo.Values()
            .setMsisdn("msisdn1")
            .setSubscriberId("subscriberId1"),
        new SubscriberInfo.Values()
            .setMsisdn("msisdn2")
            .setSubscriberId("subscriberId2")
    );
Enter fullscreen mode Exit fullscreen mode

An elegant solution, don't you think?

Spark Integration Test Slice

Spring integration tests require a specific configuration. It's wise to declare it once and reuse it. Take a look at the code snippet below.

@SpringBootTest(
    classes = {
        SparkConfig.class,
        SparkContextDestroyer.class,
        AerospikeConfig.class,
        PropertiesConfig.class,
        InitHive.class,
        TestHiveUtils.class,
        TestAerospikeFacade.class,
        EnricherServiceTestConfiguration.class
    }
)
@ActiveProfiles(LOCAL)
public class SparkIntegrationSuite extends IntegrationSuite {
}
Enter fullscreen mode Exit fullscreen mode

Inside the SpringBootTest we have listed all the beans that are used during tests running.

TestAerospikeFacade is just a thin wrapper around the Java Aerospike client for test purposes. Its implementation is rather straightforward but you can check out the source code by this link.

The EnricherServiceTestConfiguration is the Spring configuration declaring all implementations for the EnricherService interface. Take a look at the example below.

@TestConfiguration
public class EnricherServiceTestConfiguration {

    @Bean
    public EnricherService subscriberEnricherService(SparkSession session,
                                                       AerospikeProperties aerospikeProperties) {
        return new SubscriberIdEnricherService(
            session, aerospikeProperties
        );
    }
}
Enter fullscreen mode Exit fullscreen mode

I want to point out that all EnricherService implementations should be listed inside the class. If we apply different configurations for each test suite, the Spring context will be reloaded. Mostly that's not a problem. But Apache Spark usage brings obstacles. You see, when JavaSparkContext is created, it starts the local Apache Spark node. But when we instantiate it twice during the application lifecycle, it will result in an exception. The easiest way to overcome the issue is to make sure that JavaSparkContext will be created only once.

Now we can get to the testing process.

Integration Test Example

Here is a simple integration test that inserts two rows to Apache Spark and checks that the corresponding two records are created in Aerospike within 10 seconds. Take look at the code snippet below.

class SubscriberIdEnricherServiceIntegrationTest extends SparkIntegrationSuite {
    @Autowired
    private TestHiveUtils hive;
    @Autowired
    private TestAerospikeFacade aerospike;
    @Autowired
    private EnricherService subscriberEnricherService;

    @BeforeEach
    void beforeEach() {
        aerospike.deleteAll("my-namespace");
        hive.cleanHive();
    }

    @Test
    void shouldSaveRecords() {
        hive.insertInto(subscriberInfo())
            .values(
                new SubscriberInfo.Values()
                    .setMsisdn("msisdn1")
                    .setSubscriberId("subscriberId1"),
                new SubscriberInfo.Values()
                    .setMsisdn("msisdn2")
                    .setSubscriberId("subscriberId2")
            );

        subscriberEnricherService.proceedEnrichment();

        List<KeyRecord> keyRecords = await()
            .atMost(TEN_SECONDS)
            .until(() -> aerospike.scanAll("my-namespace"),
                hasSize(2));
        assertThat(keyRecords, allOf(
            hasRecord("subscriberId1", "msisdn1"),
            hasRecord("subscriberId2", "msisdn2")
        ));
    }
}
Enter fullscreen mode Exit fullscreen mode

If you tune everything correctly, the test will pass.

The whole test source is available by this link.

Test execution result

Conclusion

That's basically all I wanted to tell you about testing Apache Hive, Apache Spark, and Aerospike integration with Spring Boot usage. As you can see, the Big Data world is not so complicated after all. All code examples are taken from this repository. You can clone it and play around with tests by yourself.

If you have any questions or suggestions, please leave your comments down below. Thanks for reading!

Resources

  1. Repository with examples
  2. HDFS (Hadoop Distributed File System)
  3. Apache Hive
  4. Apache Spark
  5. Apache Derby
  6. Aerospike Database
  7. Aerospike Connect Framework
  8. Java Stream API
  9. Spring Initializr
  10. Spring profiles
  11. Testcontainers
  12. Gradle plugin shadow-jar

Top comments (4)

Collapse
 
citronbrick profile image
CitronBrick

RDDs are oudated since Spark 2. Dataset should be used instead.

Note that, before Spark 2.0, the main programming interface of Spark was the Resilient Distributed Dataset (RDD). After Spark 2.0, RDDs are replaced by Dataset, which is strongly-typed like an RDD, but with richer optimizations under the hood.

Spark docs

Collapse
 
worldofsolitaires profile image
worldofsolitaires

Thanks a lot for this detailed and specific guide
super mario bros

Collapse
 
akarsh_46 profile image
Akarsh-

Hi, thanks for explanation. Is there a way we can use spring version 3.0.5+ and spark. I am getting servlet error class servletContainer is not a javax.servlet.Servlet

Collapse
 
kirekov profile image
Semyon Kirekov • Edited

I think you just need to remove dependency spring-web-starter