Migrating a database is often a necessary step when scaling applications or seeking better performance. In a recent project, we needed to migrate data from MySQL to PostgreSQL, a more feature-rich and performant database. Our data structure involved a table for users, where each user had multiple publications and comments. To handle this large-scale migration efficiently, Spring Batch was chosen for its reliability in processing large datasets and its robust job management capabilities.
However, migrating between two relational databases like MySQL and PostgreSQL isn’t straightforward. The main challenges stemmed from differences in syntax and data types. For instance, MySQL uses TINYINT(1)
to represent booleans, while PostgreSQL has a dedicated BOOLEAN
type. Similarly, MySQL’s AUTO_INCREMENT
columns translate to SERIAL
in PostgreSQL, and the handling of ENUM
types differs significantly. These structural and syntactical variations required careful attention to ensure a seamless transition.
In this article, you will walk through how Spring Batch was used to overcome these challenges and successfully migrate the data between MySQL and PostgreSQL.
Technologies Used
For this project, we utilized the following technologies:
- Spring Boot: Simplifies Java development by providing a robust framework for creating standalone applications.
- Spring Batch: Manages batch processing by breaking jobs into steps, making it ideal for large-scale data migration.
- MySQL: The source database, known for its speed and reliability.
- PostgreSQL: The target database, offering advanced features and strong support for custom types.
- Flyway: Used for database version control and managing migrations across MySQL and PostgreSQL.
Dependencies
To set up the project, you need to include the following key dependencies in the pom.xml
:
- Lombok: Simplifies code by automatically generating getters, setters, and other boilerplate code.
- spring-boot-starter-data-jdbc: Provides JDBC support for database interaction.
- spring-boot-starter-batch: Enables batch processing in Spring Boot.
- spring-boot-starter-web: For creating RESTful web services to interact with the batch jobs.
- mysql-connector-java: MySQL database driver.
- postgresql: PostgreSQL database driver.
- flyway-core: Manages database migrations for both MySQL and PostgreSQL.
How Spring Batch Works
Spring Batch is designed to handle large datasets by breaking the migration job into multiple steps. Each step involves reading data from MySQL, processing it (such as transforming data types), and writing it into PostgreSQL. The framework's job management, error handling, and retry mechanisms are crucial for ensuring smooth and reliable migration.
Configuring Two DataSources
To facilitate the migration, you should configure two data sources: one for MySQL and one for PostgreSQL. Here’s the configuration in application.properties
:
spring.datasource.mysql.jdbc-url=jdbc:mysql://${MYSQL_HOST}:${MYSQL_PORT}/${MYSQL_DATABASE}
spring.datasource.mysql.username=${MYSQL_USER}
spring.datasource.mysql.password=${MYSQL_PASSWORD}
spring.datasource.mysql.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.postgres.jdbc-url=jdbc:postgresql://${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DATABASE}
spring.datasource.postgres.username=${POSTGRES_USER}
spring.datasource.postgres.password=${POSTGRES_PASSWORD}
spring.datasource.postgres.driver-class-name=org.postgresql.Driver
The actual database connection details are stored in the .env
file:
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_DATABASE=mydb
MYSQL_USER=root
MYSQL_PASSWORD=secret
POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DATABASE=mydb
POSTGRES_USER=postgres
POSTGRES_PASSWORD=secret
Configuring DataSources and Transaction Managers
To manage the migration between two databases—MySQL as the source and PostgreSQL as the target—you'll need to configure two separate DataSource
beans. Additionally, each DataSource
requires its own PlatformTransactionManager
to handle database transactions during the migration process. Here's how you can implement this configuration in the BatchConfiguration
class:
@Configuration
@RequiredArgsConstructor
public class BatchConfiguration {
@Bean(name = "mySqlDataSource")
@Primary
@ConfigurationProperties(prefix = "spring.datasource.mysql")
public DataSource mySqlDataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "postgresDataSource")
@ConfigurationProperties(prefix = "spring.datasource.postgres")
public DataSource postgresDataSource() {
return DataSourceBuilder.create().build();
}
@Bean(name = "mySqlTransactionManager")
@Primary
public PlatformTransactionManager mySqlTransactionManager(
@Qualifier("mySqlDataSource") DataSource mySqlDataSource) {
return new DataSourceTransactionManager(mySqlDataSource);
}
@Bean(name = "postgresTransactionManager")
public PlatformTransactionManager postgresTransactionManager(
@Qualifier("postgresDataSource") DataSource postgresDataSource) {
return new DataSourceTransactionManager(postgresDataSource);
}
}
-
MySQL DataSource and Transaction Manager: The
mySqlDataSource
bean is marked as@Primary
since it's the default data source in the project. Its connection properties are loaded from theapplication.properties
file, using the prefixspring.datasource.mysql
. The associatedmySqlTransactionManager
manages the transactions for this data source. -
PostgreSQL DataSource and Transaction Manager: Similarly, the
postgresDataSource
bean loads its connection properties fromspring.datasource.postgres
. ThepostgresTransactionManager
handles the transactions for this PostgreSQL database, ensuring that each batch job step interacts with the correct database.
By setting up separate DataSource
and TransactionManager
beans, Spring Batch can effectively read from MySQL and write to PostgreSQL without conflicts, ensuring that all transactions are managed correctly during the migration process.
Defining the Steps for User Data Migration
The migration of user data from MySQL to PostgreSQL involves setting up several components: userReader
, userWriter
, processor
, and UserRowMapper
. Here’s a brief overview of each component:
- User: This class maps the model for users table:
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Entity
@Table(name = "socialite_users")
public class User {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id", nullable = false)
private Long id;
@Column(name = "email", nullable = false)
private String email;
@Column(name = "password", nullable = false)
private String password;
@Column(name = "identity",nullable = false)
private String identity;
@Column(name = "phone_number")
private String phoneNumber;
@Column(name = "created_on")
@CreationTimestamp
private ZonedDateTime createdOn;
@Column(name = "updated_on")
@UpdateTimestamp
private ZonedDateTime updatedOn;
}
-
UserRowMapper: This class maps rows from the MySQL result set to
User
objects.
public class UserRowMapper implements RowMapper<User> {
@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
User user = new User();
user.setId(rs.getLong("id"));
user.setEmail(rs.getString("email"));
user.setPassword(rs.getString("password"));
user.setIdentity(rs.getString("identity"));
user.setPhoneNumber(rs.getString("phone_number"));
user.setCreatedOn(rs.getTimestamp("created_on").toInstant().atZone(ZoneId.systemDefault()));
user.setUpdatedOn(rs.getTimestamp("updated_on").toInstant().atZone(ZoneId.systemDefault()));
return user;
}
}
-
userReader: Reads user data from MySQL and maps it to
User
objects usingUserRowMapper
.
@Bean
public JdbcCursorItemReader<User> userReader(@Qualifier("mySqlDataSource") DataSource mySqlDataSource) {
return new JdbcCursorItemReaderBuilder<User>()
.dataSource(mySqlDataSource)
.name("userReader")
.sql("select * from socialite_users")
.rowMapper(new UserRowMapper())
.build();
}
-
processor: An optional component for processing
User
objects. In this setup, it simply passes theUser
objects through without modification.
@Bean
public ItemProcessor<User, User> processor() {
return user -> {
// Optional: make transformations
return user;
};
}
-
userWriter: Writes
User
objects to PostgreSQL using anINSERT
statement.
@Bean
public JdbcBatchItemWriter<User> userWriter(@Qualifier("postgresDataSource") DataSource postgresDataSource) {
return new JdbcBatchItemWriterBuilder<User>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO socialite_users (id, email, password, identity, phone_number, created_on, updated_on) " +
"VALUES (?, ?, ?, ?, ?, ?, ?)")
.itemPreparedStatementSetter((user, ps) -> {
ps.setLong(1, user.getId());
ps.setString(2, user.getEmail());
ps.setString(3, user.getPassword());
ps.setString(4, user.getIdentity());
ps.setString(5, user.getPhoneNumber());
ps.setTimestamp(6, Timestamp.from(user.getCreatedOn().toInstant()));
ps.setTimestamp(7, Timestamp.from(user.getUpdatedOn().toInstant()));
})
.dataSource(postgresDataSource)
.build();
}
-
Step Configuration: Combines the
userReader
,processor
, anduserWriter
into a step that processes chunks of 100 users at a time.
@Bean(name = "userStep")
public Step userStep(
JdbcCursorItemReader<User> reader,
JdbcBatchItemWriter<User> writer,
ItemProcessor<User, User> processor,
PlatformTransactionManager mySqlTransactionManager,
JobRepository jobRepository
) throws Exception {
return new StepBuilder("userStep", jobRepository)
.<User, User>chunk(100, mySqlTransactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.startLimit(1)
.build();
}
This setup ensures that user data is read from MySQL, optionally processed, and written to PostgreSQL efficiently in batches.
Step Configuration for Publication Data
1. Publication Model
The Publication
model represents the data structure for publications, including following fields
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Entity
@Table(name = "publications")
public class Publication {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "id", nullable = false)
private Long id;
@Column(name = "media", nullable = false)
private String media;
@Column(name = "content", nullable = false)
private String content;
@ManyToOne(cascade = {CascadeType.PERSIST, CascadeType.MERGE, CascadeType.DETACH, CascadeType.REFRESH})
@JoinColumn(name = "user_id", referencedColumnName = "id")
private User user;
@Column(name = "created_on")
@CreationTimestamp
private ZonedDateTime createdOn;
@Column(name = "updated_on")
@UpdateTimestamp
private ZonedDateTime updatedOn;
}
PS: One User can have multiples publications et publication belongs to one User
2. Publication Reader
The publicationReader
reads data from the MySQL publications
table and maps it to Publication
objects using PublicationRowMapper
.
@Bean
public JdbcCursorItemReader<Publication> publicationReader(@Qualifier("mySqlDataSource") DataSource mySqlDataSource) {
return new JdbcCursorItemReaderBuilder<Publication>()
.dataSource(mySqlDataSource)
.name("publicationReader")
.sql("select * from publications")
.rowMapper(new PublicationRowMapper())
.build();
}
3. Publication RowMapper
The PublicationRowMapper
maps the result set rows from MySQL to Publication
objects, handling the optional user
relationship.
public class PublicationRowMapper implements RowMapper<Publication> {
@Override
public Publication mapRow(ResultSet rs, int rowNum) throws SQLException {
Publication publication = new Publication();
publication.setId(rs.getLong("id"));
publication.setMedia(rs.getString("media"));
publication.setContent(rs.getString("content"));
publication.setCreatedOn(rs.getTimestamp("created_on").toInstant().atZone(ZoneId.systemDefault()));
publication.setUpdatedOn(rs.getTimestamp("updated_on").toInstant().atZone(ZoneId.systemDefault()));
Long userId = rs.getLong("user_id");
if (!rs.wasNull()) {
User user = new User();
user.setId(userId);
publication.setUser(user);
}
return publication;
}
}
4. Publication Processor
The publicationItemProcessor
allows for optional transformation of Publication
objects before they are written to PostgreSQL.
@Bean
public ItemProcessor<Publication, Publication> publicationItemProcessor() {
return publication -> {
// Optional: make transformations
return publication;
};
}
5. Publication Writer
The publicationWriter
writes Publication
objects to PostgreSQL, handling all necessary fields with an INSERT
statement.
@Bean
public JdbcBatchItemWriter<Publication> publicationWriter(@Qualifier("postgresDataSource") DataSource postgresDataSource) {
return new JdbcBatchItemWriterBuilder<Publication>()
.itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
.sql("INSERT INTO publications VALUES (?, ?, ?, ?, ?, ?)")
.itemPreparedStatementSetter((publication, ps) -> {
ps.setLong(1, publication.getId());
ps.setString(2, publication.getMedia());
ps.setString(3, publication.getContent());
ps.setObject(4, publication.getUser() != null ? publication.getUser().getId() : null, Types.BIGINT);
ps.setTimestamp(5, Timestamp.from(publication.getCreatedOn().toInstant()));
ps.setTimestamp(6, Timestamp.from(publication.getUpdatedOn().toInstant()));
})
.dataSource(postgresDataSource)
.build();
}
6. Publication Step
The publicationStep
combines the publicationReader
, processor
, and publicationWriter
into a step that processes chunks of 100 publications at a time.
@Bean(name = "publicationStep")
public Step publicationStep(
JdbcCursorItemReader<Publication> reader,
JdbcBatchItemWriter<Publication> writer,
ItemProcessor<Publication, Publication> publicationItemProcessor,
PlatformTransactionManager mySqlTransactionManager,
JobRepository jobRepository
) throws Exception {
return new StepBuilder("publicationStep", jobRepository)
.<Publication, Publication>chunk(100, mySqlTransactionManager)
.reader(reader)
.processor(publicationItemProcessor)
.writer(writer)
.startLimit(1)
.allowStartIfComplete(false)
.build();
}
Job Scheduling Configuration
Setting Up the Job
The migrateDataJob
bean is the heart of the data migration process, ensuring a smooth transition from MySQL to PostgreSQL. Here’s how it’s set up and why the order of steps is so important:
- User Step 🧑💼: The journey begins by migrating user data. Since both publications and comments depend on users, it’s essential to have this data in place first. Think of this as laying the foundation for everything that follows.
- Publication Step 📚: Next up, the publications are migrated. These depend on the users, so having user data available ensures that the publications are correctly associated with their respective users.
- Comment Step 💬: Finally, comments are processed. Comments rely on both the users and the publications, so they’re handled last to make sure all dependencies are in place.
@Bean
public Job migrateDataJob(
Step userStep,
Step publicationStep,
Step commentStep,
JobRepository jobRepository
) throws Exception {
return new JobBuilder("dataJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(userStep) // Start with user migration 🧑💼
.next(publicationStep) // Move on to publications 📚
.next(commentStep) // Finally, migrate comments 💬
.build();
}
By following this sequence, you ensure that all dependencies are handled correctly and the data is migrated in a way that maintains integrity and consistency. 🚀
Launching the Job with a Request
Starting the Migration Job
Once the job configuration is in place, it's time to kick things off! You can trigger the migration job via a simple REST API call. Here’s how it’s done:
-
Creating the Controller: The
BatchController
is your gateway to starting the data migration job. It usesJobLauncher
to launch the job andJobParameters
to pass any necessary parameters. -
Launching the Job: When you send a POST request to
/batch/start
, the job is launched with the current timestamp. This helps track when the migration started. -
Handling Responses:
- If the job starts successfully, you'll get a confirmation message with the job ID. 🆗
- If something goes wrong, you’ll receive an error message detailing what went wrong. 😓
Here’s the code that makes this happen:
@RestController
@RequiredArgsConstructor
@RequestMapping("/batch")
public class BatchController {
private final JobLauncher jobLauncher;
private final Job dataMigrationJob;
@PostMapping("start")
public ResponseEntity<String> startBatchJob() {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("startAt", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(dataMigrationJob, jobParameters);
return ResponseEntity.ok("Job launched successfully with ID: " + jobExecution.getJobId());
} catch (Exception exception) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("Job failed with error: " + exception.getMessage());
}
}
}
Final Checks Before Launching 🔍
Before kicking off the data migration job, it’s essential to ensure a smooth process:
- Backup the Source Database 🗄️: Make sure to back up your source database to safeguard against any unexpected issues during migration.
- Run Migration on the Destination Database 🛠️: Ensure that the destination database has the correct table structure by running any necessary migration scripts beforehand.
These steps will help avoid potential data loss and ensure that your migration runs as smoothly as possible. You can find the complete code for this project on GitHub at https://github.com/spencer2k19/batch-data-migration.
Happy migrating! 🌟
Top comments (0)