Big Data is trending. The companies have to operate with a huge amount of data to compete with others. For example, this information is used to show you the relevant advertisements and recommend you the services that you may find interesting. The problem with Big Data software systems is their complexity. Testing becomes tough. How could you verify the app behaviour locally when it's tuned to connect to the HDFS cluster?
In this article, I'm showing you how to create a Spring Boot app that loads data from Apache Hive via Apache Spark to the Aerospike Database. More than that, I'm giving you a recipe for writing integration tests for such scenarios that can be run either locally or during the CI pipeline execution. The code examples are taken from this repository.
Firstly, let's get over some basic concepts of the Big Data stack we're using. Don't worry, it won't take long. But it's necessary to understand the core idea.
Basics of HDFS
HDFS (Hadoop Distributed File System) is a distributed file system designed to run on many physical servers. So, a file in HDFS is an abstraction that hides the complexity of storing and replicating the data between multiple nodes. Why do we need HDFS? There are some reasons.
Hardware Failures
Hard disk drives crash. That's the reality we have to deal with. If a file is split between multiple nodes, individual failures won't affect the whole data. Besides, data is replicated in HDFS. So, even after a disk crash, the information can be restored from the other sources.
Really Large Files
HDFS allows building of a network of not so powerful machines into a huge system. For example, if you have 100 nodes with 1TB disk storage on each one, then you possess 100TB of HDFS space. If the replication factor equals 3, it's possible to store a single file with a size of 33TB.
Not to mention that lots of local file systems do not support so large files, even if you have the available disk space.
The Speed of Reading
If you read the file sequentially, it will take you N
. But if the file is split into 10 chunks between 10 nodes, you can get its content in N/10
time! Because each node can read the chunk in parallel. So, HDFS is not only about safety. It's about swiftness.
We have omitted the time spend on network communications. But if files are huge, this part is just a fraction.
Basics of Apache Hive
Apache Hive is the database facility running over HDFS. It allows querying data with HQL (SQL-like language).
Regular databases (e.g. PostgreSQL, Oracle) act as an abstraction layer over the local file system. While Apache Hive acts as an abstraction over HDFS. That's it.
Basics of Apache Spark
Apache Spark is a platform for operating and transforming huge amounts of data. The key idea is that Apache Spark workers run on multiple nodes and store the intermediate results in RAM. It's written in Scala but it also supports Java and Python. Take a look at the schema below. It's the common representation of the Apache Spark batch job.
Apache Spark loads data from Data Producer
, proceeds some operations on it, and puts the result to Data Consumer
(in our case, Apache Hive is data producer and Aerospike is data consumer). Apache Spark application is a regular .jar
file that contains the transformation logic. Take a look at the example below.
JavaRDD<String> textFile = sc.textFile("hdfs://raw_data.txt");
JavaPairRDD<String, Integer> counts = textFile
.flatMap(s -> Arrays.asList(s.split(" ")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.reduceByKey((a, b) -> a + b);
counts.saveAsTextFile("hdfs://words_count.txt");
It's a simple word-count application. Firstly, we load the content of the raw_data.txt
HDFS file. Then we split each line by " "
, assign 1
for every word, and reduce the result by words to summarize the whole numbers. Then the obtained pairs are saved to word_count.txt
.
The flow is similar to Java Stream API. The difference is that every lambda expression is executed on the workers. So, Spark transfers the code to the remote machines, performs the calculation, and returns the obtained results. If we owe a sufficient number of workers, we can proceed with the amount of data that is measured by terabytes or even zettabytes.
The Apache Spark approach of delivering code to data has some drawbacks. We'll discuss it when we get to the development.
Another important aspect is laziness. Just like Stream API, Apache Spark does not start any calculations until terminal operation invocation. In this case, reduceByKey
is the one. The rest operations build the pipeline rules but do not trigger anything.
Build Configuration
Let's start the development process. Firstly, we need to choose the Java version. At the moment of writing the latest stable Apache Spark release is 3.2.1. It supports Java 11. So, we gonna use it.
Currently Apache Spark does not support Java 17. Make sure you don't use it for running integration tests. Otherwise, you'll get bizarre error messages.
The project is bootstrapped with Spring Initializr. Nothing special here. But the dependencies list should be clarified.
Dependencies Resolution
ext {
set('testcontainersVersion', '1.16.2')
set('sparkVersion', '3.2.1')
set('slf4jVersion', '1.7.36')
set('aerospikeVersion', '5.1.11')
}
dependencies {
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
implementation('org.springframework.boot:spring-boot-starter-validation') {
exclude group: 'org.slf4j'
}
implementation("com.aerospike:aerospike-client:${aerospikeVersion}") {
exclude group: 'org.slf4j'
}
compileOnly "org.apache.spark:spark-core_2.13:${sparkVersion}"
compileOnly "org.apache.spark:spark-hive_2.13:${sparkVersion}"
compileOnly "org.apache.spark:spark-sql_2.13:${sparkVersion}"
compileOnly "org.slf4j:slf4j-api:${slf4jVersion}"
testImplementation 'org.apache.derby:derby'
testImplementation "org.apache.spark:spark-core_2.13:${sparkVersion}"
testImplementation "org.apache.spark:spark-hive_2.13:${sparkVersion}"
testImplementation "org.apache.spark:spark-sql_2.13:${sparkVersion}"
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation "org.slf4j:slf4j-api:${slf4jVersion}"
testImplementation 'org.codehaus.janino:janino:3.0.8'
testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.awaitility:awaitility:4.2.0'
testImplementation 'org.hamcrest:hamcrest-all:1.3'
}
Core Dependencies
First comes Apache Spark dependencies. The spark-core
artefact is the root. The spark-hive
enables data retrieving from Apache Hive. And the spark-sql
dependency gives us the ability to query data from Apache Hive with SQL usage.
Note that all the artefacts have to share the same version (in our case, it is
3.2.1
). As a matter of fact, the Apache Spark dependencies' version should match the one that runs the production cluster in your company.
All Spark dependencies have to be marked as compileOnly
. It means that they won't be included in the assembled .jar
file. Apache Spark will provide the required dependencies in runtime. If you include them as implementation
scope, that may lead to hard-tracking bugs during execution.
Then we have aerospike-client
dependency. You have probably noticed that the org.slf4j
group is excluded everywhere and included as a compileOnly
dependency as well. We'll talk about this later when we get to the Apache Spark logging facility.
Test Dependencies
And finally, here comes test scoped artefacts. Apache Spark ones are included as testImplementation
. Because integration tests will start the local Spark node. So, they are required during the runtime. The slf4j-api
is also the runtime dependency. Testcontainers will be used to run the Aerospike instance. The janino
is required by Apache Spark during the job execution. And we need Apache Derby to tune Apache Hive for local running. We'll get to this point soon.
Logging Configuration
Apache Spark applies log4j
with the slf4j
wrapper. But the default Spring Boot logger is logback
. This setup leads to exceptions during Spring context initializing due to multiple logging facilities present in the classpath. The easiest way to solve it is to exclude all auto-configured Spring Boot logging features. That's not a big deal. Anyway, Apache Spark provides its own slf4j
implementation during the runtime. So, we just need to include this dependency as compileOnly
. That is sufficient.
Excluding logback
from the Spring Boot project is easy with Gradle. Take a look at the example below.
configurations {
compileOnly {
extendsFrom annotationProcessor
}
all {
exclude group: 'org.springframework.boot', module: 'spring-boot-starter-logging'
exclude group: 'org.springframework.boot', module: 'snakeyaml'
}
}
Possible application.yml
issues
The snakeyml
exclusion requires special attention. Spring Boot uses the library to parse properties from .yml
files (i.e. application.yml
). Some Apache Spark versions use the same library for internal operations. The thing is that the versions required by Spring Boot and Apache Spark differ. If you exclude it from Spring Boot dependency and rely on the one provided by Apache Spark, you will face the NoSuchMethodError
(Spring Boot invokes the method that is absent in the version provided by Apache Spark). So, I would recommend sticking with the .properties
format and removing Spring Boot YAML auto-configuration. That will help you to avoid unnecessary difficulties. Take a look at the code example below.
@SpringBootApplication(exclude = {GsonAutoConfiguration.class})
public class SparkBatchJobApplication {
public static void main(String[] args) {
SpringApplication.run(SparkBatchJobApplication.class, args);
}
}
Fat Jar
The result .jar
is going to submitted to Apache Spark cluster (e.g. spark-submit
command). So, it should contain all runtime artefacts. Unfortunately, the standard Spring Boot packaging does not put the dependencies in the way Apache Spark expects it. So, we'll use shadow-jar Gradle plugin. Take a look at the example below.
plugins {
id 'org.springframework.boot' version '2.6.3'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
id 'com.github.johnrengelman.shadow' version '2.0.4'
}
...
shadowJar {
zip64 true
mergeServiceFiles()
append 'META-INF/spring.handlers'
append 'META-INF/spring.schemas'
append 'META-INF/spring.tooling'
transform(PropertiesFileTransformer) {
paths = ['META-INF/spring.factories']
mergeStrategy = "append"
}
}
Now we can run all tests and build the artefact with the ./gradlew test shadowJar
command.
Starting Development
Now we can get to the development process.
Apache Spark Configuration
We need to declare JavaSparkContext
and SparkSession
. The first one is the core Apache Spark for all operations. Whilst SparkSession
is the part of spark-sql
projects. It allows us to query data with SQL
(which is quite handy for Apache Hive). Take a look at the Spring configuration below.
@Configuration
public class SparkConfig {
@Value("${spring.application.name}")
private String appName;
@Bean
@Profile(LOCAL)
public SparkConf localSparkConf() throws IOException {
final var localHivePath = Files.createTempDirectory("hiveDataWarehouse");
FileSystemUtils.deleteRecursively(localHivePath);
return new SparkConf()
.setAppName(appName)
.setMaster("local")
.set("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:local;create=true")
.set("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver")
.set("hive.stats.jdbc.timeout", "80")
.set("spark.ui.enabled", "false")
.set("spark.sql.session.timeZone", "UTC")
.set("spark.sql.catalogImplementation", "hive")
.set("spark.sql.warehouse.dir", localHivePath.toAbsolutePath().toString());
}
@Bean
@Profile(PROD)
public SparkConf prodSparkConf() {
return new SparkConf()
.setAppName(appName);
}
@Bean
public JavaSparkContext javaSparkContext(SparkConf sparkConf) {
return new JavaSparkContext(sparkConf);
}
@Bean
public SparkSession sparkSession(JavaSparkContext sparkContext) {
return SparkSession.builder()
.sparkContext(sparkContext.sc())
.config(sparkContext.getConf())
.enableHiveSupport()
.getOrCreate();
}
}
SparkConf
defines configuration keys for the Apache Spark job. As you have noticed, there are two beans for different Spring profiles. LOCAL
is used for integration testing and PROD
is applied in the production environment. The PROD
configuration does not declare any properties because usually they are passed as command-line arguments in the spark-submit
shell script.
On the contrary, the LOCAL
profile defines a set of default properties required for proper running. Here are the most important ones.
-
setMaster("local")
tells Apache Spark to start a single local node. -
javax.jdo.option.ConnectionURL
andjavax.jdo.option.ConnectionDriverName
declare the JDBC connection for Apache Hive meta-storage. That's why we added Apache Derby as the project dependency -
spark.sql.catalogImplementation
means that local files shall be stored in the Apache Hive compatible format -
spark.sql.warehouse.dir
is the directory for storing Apache Hive data. Here we're using temporary directory.
JavaSparkContext
accepts the defined SparkConf
as the constructor arguments. Meanwhile SparkSession
wraps the existing JavaSparkContext
. Note that Apache Hive support should be enabled manually (enableHiveSupport
).
Creating Apache Hive Tables
When we submit an application to the production Apache Spark cluster, we probably won't need to create any Apache Hive tables. Most likely the tables have already been created by someone else. And our goal is to select rows and transfer the data to another storage. But when we run integration tests locally (or in the CI environment), there are no tables by default. So, we need to create them somehow.
In this project, we're working with one table - media.subscriber_info
. It consists of two columns. MSISDN (phone number) and some subscriber ID.
Before each test run, we have to delete previous data and add new rows to ensure verifying rules' consistency. The easiest way to achieve it is to declare scripts for table creation and dropping. We'll keep them in the resources
directory. Take a look at the structure below.
V1_media.hql
Creates media
database if it's absent.
create database if not exists media
V2__media.subscriber_info.hql
Creates subscriber_info
table if it's absent.
create table if not exists media.subscriber_info (
subscriber_id string,
msisdn string
)
row format delimited
fields terminated by ','
lines terminated by '\n'
stored as textfile
DROP V1__mediatv_dds.subscriber_info.hql
Drops the subscriber_info
table.
drop table if exists media.subscriber_info
V[N]
prefixes are not obligatory. I put them to ensure that each new table script will be executed as the last one. It is helpful to make tests work deterministically.
OK, now we need a handler to process those HQL queries. Take a look at the example below.
@Component
@Profile(LOCAL)
public class InitHive {
private final SparkSession session;
private final ApplicationContext applicationContext;
public void createTables() {
executeSQLScripts(getResources(applicationContext, "classpath:hive/ddl/create/*.hql"));
}
public void dropTables() {
executeSQLScripts(getResources(applicationContext, "classpath:hive/ddl/drop/*.hql"));
}
private void executeSQLScripts(Resource[] resources) {
for (Resource resource : resources) {
session.sql(readContent(resource));
}
}
}
The first thing to notice is @Profile(LOCAL)
usage. Because we don't need to create or drop tables in the production environment.
The createTables
and dropTables
methods provide the list of resources containing the required queries.
getResources
is the utility function that reads files from the classpath. You can discover the implementation here.
So, now we're ready to write the business code!
Business Code
Facade
The core interface is EnricherService
public interface EnricherService {
void proceedEnrichment();
}
We're expecting that it might have many implementations. Each one represent a step in whole batch process.
Then we have EnricherServiceFacade
that encapsulates all implementations of EnricherService
and run them one by one.
@Service
public class EnricherServiceFacade {
private final List<EnricherService> enricherServices;
public void proceedEnrichment() {
List<EnrichmentFailedException> errors = new ArrayList<>();
for (EnricherService service : enricherServices)
try {
service.proceedEnrichment();
} catch (Exception e) {
errors.add(new EnrichmentFailedException("Unexpected error during enrichment processing", e));
}
if (!errors.isEmpty()) {
throw new EnrichmentFailedException(errors);
}
}
}
We're trying to run every provided enrichment step. If any of them fails, we throw the exception that combines all errors into a solid piece.
Finally, we need to tell Spring to execute EnricherServiceFacade.proceedEnrichment
on application startup. We could add it directly to the main
method, but it's not the Spring way. Therefore, it makes testing harder. The better option is @EventListener
.
@Component
@Profile(PROD)
public class MainListener {
private final EnricherServiceFacade enricherServiceFacade;
@EventListener
public void proceedEnrichment(ContextRefreshedEvent event) {
final long startNano = System.nanoTime();
LOG.info("Starting enrichment process");
try {
enricherServiceFacade.proceedEnrichment();
LOG.info("Enrichment has finished successfully. It took " + Duration.ofNanos(System.nanoTime() - startNano));
} catch (Exception e) {
String err = "Enrichment has finished with error. It took " + Duration.ofNanos(System.nanoTime() - startNano);
LOG.error(err, e);
throw new EnrichmentFailedException(err, e);
}
}
}
The proceedEnrichment
method is being invoked, when the Spring context is started. By the way, only the active PROD
profile will trigger the job.
EnricherService Implementation
We're going to deal with a single EnricherService
implementation. It simply selects all rows from the media.subcriber_info
table and puts the result in the Aerospike database. Take a look at the code snippet below.
@Service
public class SubscriberIdEnricherService implements EnricherService, Serializable {
private static final long serialVersionUID = 10L;
private final SparkSession session;
private final AerospikeProperties aerospikeProperties;
@Override
public void proceedEnrichment() {
Dataset<Row> dataset = session.sql(
"SELECT subscriber_id, msisdn FROM media.subscriber_info " +
"WHERE msisdn IS NOT NULL AND subscriber_id IS NOT NULL"
);
dataset
.foreachPartition(
iterator -> {
final var aerospikeClient = newAerospikeClient(aerospikeProperties);
iterator.forEachRemaining(row -> {
String subscriberId = row.getAs("subscriber_id");
String msisdn = row.getAs("msisdn");
Key key = new Key("my-namespace", "huawei", subscriberId);
Bin bin = new Bin("msisdn", msisdn);
try {
aerospikeClient.put(null, key, bin);
LOG.info("Record has been successfully added {}", key);
} catch (Exception e) {
LOG.error("Fail during inserting record to Aerospike", e);
}
});
}
);
}
}
There are multiple points that has to be clarified.
Serialization
Apache Spark applies a standard Java serialization mechanism. So, any dependencies used inside lambdas (map
, filter
, groupBy
, forEach
, etc.) have to implement the Serializable
interface. Otherwise, you'll get the NotSerializableException
during the runtime.
We have a reference to AerospikeProperties
inside the foreachPartition
callback. Therefore, this class and the SubscriberIdEnricherService
itself should be allowed for serializing (because the latter one keeps AerospikeProperties
as a field). If a dependency is not used within any Apache Spark lambda, you can mark it as transient
.
And finally, the serialVersionUID
manual assignment is crucial. The reason is that Apache Spark might serialize and deserialize the passed objects multiple times. And there is no guarantee that each time auto-generated serialVersionUID
will be the same. It can be a reason for hard-tracking floating bugs. To prevent this you should declare serialVersionUID
by yourself.
The even better approach is to force the compiler to validate the serialVersionUID
field presence on any Serializable
classes. In this case, you need to mark -Xlint:serial
warning as an error. Take a look at the Gradle example.
tasks.withType(JavaCompile) {
options.compilerArgs << "-Xlint:serial" << "-Werror"
}
Aerospike Client Instantiation
Unfortunately, the Java Aerospike client does not implement the Serializable
interface. So, we have to instantiate it inside the lambda expression. In that case, the object will be created on a worker node directly. It makes serialization redundant.
I should admit that Aerospike provides Aerospike Connect Framework that allows transferring data via Apache Spark in a declarative way without creating any Java clients. Anyway, if you want to use it, you have to install the packed library to the Apache Spark cluster directly. There is no guarantee that you'll have such an opportunity in your situation. So, I'm omitting this scenario.
Partitioning
The Dataset
class has the foreach
method that simply executes the given lambda for each present row. However, if you initialize some heavy resource inside that callback (e.g. database connection), the new one will be created for every row (in some cases there might billions of rows). Not very efficient, isn't it?
The foreachPartition
method works a bit differently. Apache Spark executes it once per the Dataset
partition. It also accepts Iterator<Row>
as an argument. So, inside the lambda, we can initialize "heavy" resources (e.g. AerospikeClient
) and apply them for calculations of every Row
inside the iterator.
The partition size is calculated automatically based on the input source and Apache Spark cluster configuration. Though you can set it manually by calling the
repartition
method. Anyway, it is out of the scope of the article.
Testing
Aerospike Setup
OK, we've written some business code. How do we test it? Firstly, let's declare Aerospike setup for Testcontainers. Take a look at the code snippet below.
@ContextConfiguration(initializers = IntegrationSuite.Initializer.class)
public class IntegrationSuite {
private static final String AEROSPIKE_IMAGE = "aerospike/aerospike-server:5.6.0.4";
static class Initializer implements
ApplicationContextInitializer<ConfigurableApplicationContext> {
static final GenericContainer<?> aerospike =
new GenericContainer<>(DockerImageName.parse(AEROSPIKE_IMAGE))
.withExposedPorts(3000, 3001, 3002)
.withEnv("NAMESPACE", "my-namespace")
.withEnv("SERVICE_PORT", "3000")
.waitingFor(Wait.forLogMessage(".*migrations: complete.*", 1));
@Override
public void initialize(
ConfigurableApplicationContext applicationContext) {
startContainers();
aerospike.followOutput(
new Slf4jLogConsumer(LoggerFactory.getLogger("Aerospike"))
);
ConfigurableEnvironment environment =
applicationContext.getEnvironment();
MapPropertySource testcontainers = new MapPropertySource(
"testcontainers",
createConnectionConfiguration()
);
environment.getPropertySources().addFirst(testcontainers);
}
private static void startContainers() {
Startables.deepStart(Stream.of(aerospike)).join();
}
private static Map<String, Object> createConnectionConfiguration() {
return Map.of(
"aerospike.hosts",
Stream.of(3000, 3001, 3002)
.map(port -> aerospike.getHost() + ":" + aerospike.getMappedPort(port))
.collect(Collectors.joining(","))
);
}
}
}
The IntegrationSuite
class is used as the parent for all integration tests. The IntegrationSuite.Initializer
inner class is used as the Spring context initializer. The framework calls it when all properties and bean definitions are already loaded but no beans have been created yet. It allows us to override some properties during the runtime.
We declare the Aerospike container as GenericContainer
because the library does not provide out-of-box support for the database. Then inside the initialize
method we retrieve the container's host and port and assign them to the aerospike.hosts
property.
Apache Hive Utilities
Before each test method we are suppose to delete all data from Apache Hive and add new rows required for the current scenario. So, tests won't affect each other. Let's declare a custom test facade for Apache Hive. Take a look at the code snippet below.
@TestComponent
public class TestHiveUtils {
@Autowired
private SparkSession sparkSession;
@Autowired
private InitHive initHive;
public void cleanHive() {
initHive.dropTables();
initHive.createTables();
}
public <T, E extends HiveTable<T>> E insertInto(Function<SparkSession, E> tableFunction) {
return tableFunction.apply(sparkSession);
}
}
There are just two methods. The cleanHive
drops all existing and creates them again. Therefore, all previous data is erased. The insertInto
is tricky. It serves the purpose of inserting new rows to Apache Hive in a statically typed way. How is that done? First of all, let's inspect the HiveTable<T>
interface.
public interface HiveTable<T> {
void values(T... t);
}
As you see, it's a regular Java functional interface. Though the implementations are not so obvious.
public class SubscriberInfo implements HiveTable<SubscriberInfo.Values> {
private final SparkSession session;
public static Function<SparkSession, SubscriberInfo> subscriberInfo() {
return SubscriberInfo::new;
}
@Override
public void values(Values... values) {
for (Values value : values) {
session.sql(format(
"insert into %s values('%s', '%s')",
"media.subscriber_info",
value.subscriberId,
value.msisdn
)
);
}
}
public static class Values {
private String subscriberId = "4121521";
private String msisdn = "88005553535";
public Values setSubscriberId(String subscriberId) {
this.subscriberId = subscriberId;
return this;
}
public Values setMsisdn(String msisdn) {
this.msisdn = msisdn;
return this;
}
}
}
The class accepts SparkSession
as a constructor dependency. The SubscriberInfo.Values
are the generic argument. The class represents the data structure containing values to insert. And finally, the values
implementation performs the actual new row creation.
The key is the subscriberInfo
static method. What's the reason to return Function<SparkSession, SubscriberInfo>
? Its combination with TestHiveUtils.insertInto
provides us with statically typed INSERT INTO
statement. Take a look at the code example below.
hive.insertInto(subscriberInfo())
.values(
new SubscriberInfo.Values()
.setMsisdn("msisdn1")
.setSubscriberId("subscriberId1"),
new SubscriberInfo.Values()
.setMsisdn("msisdn2")
.setSubscriberId("subscriberId2")
);
An elegant solution, don't you think?
Spark Integration Test Slice
Spring integration tests require a specific configuration. It's wise to declare it once and reuse it. Take a look at the code snippet below.
@SpringBootTest(
classes = {
SparkConfig.class,
SparkContextDestroyer.class,
AerospikeConfig.class,
PropertiesConfig.class,
InitHive.class,
TestHiveUtils.class,
TestAerospikeFacade.class,
EnricherServiceTestConfiguration.class
}
)
@ActiveProfiles(LOCAL)
public class SparkIntegrationSuite extends IntegrationSuite {
}
Inside the SpringBootTest
we have listed all the beans that are used during tests running.
TestAerospikeFacade
is just a thin wrapper around the Java Aerospike client for test purposes. Its implementation is rather straightforward but you can check out the source code by this link.
The EnricherServiceTestConfiguration
is the Spring configuration declaring all implementations for the EnricherService
interface. Take a look at the example below.
@TestConfiguration
public class EnricherServiceTestConfiguration {
@Bean
public EnricherService subscriberEnricherService(SparkSession session,
AerospikeProperties aerospikeProperties) {
return new SubscriberIdEnricherService(
session, aerospikeProperties
);
}
}
I want to point out that all
EnricherService
implementations should be listed inside the class. If we apply different configurations for each test suite, the Spring context will be reloaded. Mostly that's not a problem. But Apache Spark usage brings obstacles. You see, whenJavaSparkContext
is created, it starts the local Apache Spark node. But when we instantiate it twice during the application lifecycle, it will result in an exception. The easiest way to overcome the issue is to make sure thatJavaSparkContext
will be created only once.
Now we can get to the testing process.
Integration Test Example
Here is a simple integration test that inserts two rows to Apache Spark and checks that the corresponding two records are created in Aerospike within 10 seconds. Take look at the code snippet below.
class SubscriberIdEnricherServiceIntegrationTest extends SparkIntegrationSuite {
@Autowired
private TestHiveUtils hive;
@Autowired
private TestAerospikeFacade aerospike;
@Autowired
private EnricherService subscriberEnricherService;
@BeforeEach
void beforeEach() {
aerospike.deleteAll("my-namespace");
hive.cleanHive();
}
@Test
void shouldSaveRecords() {
hive.insertInto(subscriberInfo())
.values(
new SubscriberInfo.Values()
.setMsisdn("msisdn1")
.setSubscriberId("subscriberId1"),
new SubscriberInfo.Values()
.setMsisdn("msisdn2")
.setSubscriberId("subscriberId2")
);
subscriberEnricherService.proceedEnrichment();
List<KeyRecord> keyRecords = await()
.atMost(TEN_SECONDS)
.until(() -> aerospike.scanAll("my-namespace"),
hasSize(2));
assertThat(keyRecords, allOf(
hasRecord("subscriberId1", "msisdn1"),
hasRecord("subscriberId2", "msisdn2")
));
}
}
If you tune everything correctly, the test will pass.
The whole test source is available by this link.
Conclusion
That's basically all I wanted to tell you about testing Apache Hive, Apache Spark, and Aerospike integration with Spring Boot usage. As you can see, the Big Data world is not so complicated after all. All code examples are taken from this repository. You can clone it and play around with tests by yourself.
If you have any questions or suggestions, please leave your comments down below. Thanks for reading!
Top comments (4)
RDD
s are oudated since Spark 2.Dataset
should be used instead.Spark docs
Thanks a lot for this detailed and specific guide
super mario bros
Hi, thanks for explanation. Is there a way we can use spring version 3.0.5+ and spark. I am getting servlet error class servletContainer is not a javax.servlet.Servlet
I think you just need to remove dependency spring-web-starter