DEV Community

Loïc
Loïc

Posted on

How to migrate data between 02 databases by using Spring batch

Migrating a database is often a necessary step when scaling applications or seeking better performance. In a recent project, we needed to migrate data from MySQL to PostgreSQL, a more feature-rich and performant database. Our data structure involved a table for users, where each user had multiple publications and comments. To handle this large-scale migration efficiently, Spring Batch was chosen for its reliability in processing large datasets and its robust job management capabilities.

However, migrating between two relational databases like MySQL and PostgreSQL isn’t straightforward. The main challenges stemmed from differences in syntax and data types. For instance, MySQL uses TINYINT(1) to represent booleans, while PostgreSQL has a dedicated BOOLEAN type. Similarly, MySQL’s AUTO_INCREMENT columns translate to SERIAL in PostgreSQL, and the handling of ENUM types differs significantly. These structural and syntactical variations required careful attention to ensure a seamless transition.

In this article, you will walk through how Spring Batch was used to overcome these challenges and successfully migrate the data between MySQL and PostgreSQL.

Technologies Used

For this project, we utilized the following technologies:

  • Spring Boot: Simplifies Java development by providing a robust framework for creating standalone applications.
  • Spring Batch: Manages batch processing by breaking jobs into steps, making it ideal for large-scale data migration.
  • MySQL: The source database, known for its speed and reliability.
  • PostgreSQL: The target database, offering advanced features and strong support for custom types.
  • Flyway: Used for database version control and managing migrations across MySQL and PostgreSQL.

Dependencies

To set up the project, you need to include the following key dependencies in the pom.xml:

  • Lombok: Simplifies code by automatically generating getters, setters, and other boilerplate code.
  • spring-boot-starter-data-jdbc: Provides JDBC support for database interaction.
  • spring-boot-starter-batch: Enables batch processing in Spring Boot.
  • spring-boot-starter-web: For creating RESTful web services to interact with the batch jobs.
  • mysql-connector-java: MySQL database driver.
  • postgresql: PostgreSQL database driver.
  • flyway-core: Manages database migrations for both MySQL and PostgreSQL.

How Spring Batch Works

Spring Batch is designed to handle large datasets by breaking the migration job into multiple steps. Each step involves reading data from MySQL, processing it (such as transforming data types), and writing it into PostgreSQL. The framework's job management, error handling, and retry mechanisms are crucial for ensuring smooth and reliable migration.

Configuring Two DataSources

To facilitate the migration, you should configure two data sources: one for MySQL and one for PostgreSQL. Here’s the configuration in application.properties:

spring.datasource.mysql.jdbc-url=jdbc:mysql://${MYSQL_HOST}:${MYSQL_PORT}/${MYSQL_DATABASE}
spring.datasource.mysql.username=${MYSQL_USER}
spring.datasource.mysql.password=${MYSQL_PASSWORD}
spring.datasource.mysql.driver-class-name=com.mysql.cj.jdbc.Driver

spring.datasource.postgres.jdbc-url=jdbc:postgresql://${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DATABASE}
spring.datasource.postgres.username=${POSTGRES_USER}
spring.datasource.postgres.password=${POSTGRES_PASSWORD}
spring.datasource.postgres.driver-class-name=org.postgresql.Driver

Enter fullscreen mode Exit fullscreen mode

The actual database connection details are stored in the .env file:

MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_DATABASE=mydb
MYSQL_USER=root
MYSQL_PASSWORD=secret

POSTGRES_HOST=localhost
POSTGRES_PORT=5432
POSTGRES_DATABASE=mydb
POSTGRES_USER=postgres
POSTGRES_PASSWORD=secret

Enter fullscreen mode Exit fullscreen mode

Configuring DataSources and Transaction Managers

To manage the migration between two databases—MySQL as the source and PostgreSQL as the target—you'll need to configure two separate DataSource beans. Additionally, each DataSource requires its own PlatformTransactionManager to handle database transactions during the migration process. Here's how you can implement this configuration in the BatchConfiguration class:

@Configuration
@RequiredArgsConstructor
public class BatchConfiguration {

    @Bean(name = "mySqlDataSource")
    @Primary
    @ConfigurationProperties(prefix = "spring.datasource.mysql")
    public DataSource mySqlDataSource() {
        return DataSourceBuilder.create().build();
    }

    @Bean(name = "postgresDataSource")
    @ConfigurationProperties(prefix = "spring.datasource.postgres")
    public DataSource postgresDataSource() {
        return DataSourceBuilder.create().build();
    }

    @Bean(name = "mySqlTransactionManager")
    @Primary
    public PlatformTransactionManager mySqlTransactionManager(
            @Qualifier("mySqlDataSource") DataSource mySqlDataSource) {
        return new DataSourceTransactionManager(mySqlDataSource);
    }

    @Bean(name = "postgresTransactionManager")
    public PlatformTransactionManager postgresTransactionManager(
            @Qualifier("postgresDataSource") DataSource postgresDataSource) {
        return new DataSourceTransactionManager(postgresDataSource);
    }
}

Enter fullscreen mode Exit fullscreen mode
  • MySQL DataSource and Transaction Manager: The mySqlDataSource bean is marked as @Primary since it's the default data source in the project. Its connection properties are loaded from the application.properties file, using the prefix spring.datasource.mysql. The associated mySqlTransactionManager manages the transactions for this data source.
  • PostgreSQL DataSource and Transaction Manager: Similarly, the postgresDataSource bean loads its connection properties from spring.datasource.postgres. The postgresTransactionManager handles the transactions for this PostgreSQL database, ensuring that each batch job step interacts with the correct database.

By setting up separate DataSource and TransactionManager beans, Spring Batch can effectively read from MySQL and write to PostgreSQL without conflicts, ensuring that all transactions are managed correctly during the migration process.

Defining the Steps for User Data Migration

The migration of user data from MySQL to PostgreSQL involves setting up several components: userReader, userWriter, processor, and UserRowMapper. Here’s a brief overview of each component:

  • User: This class maps the model for users table:
    @AllArgsConstructor
    @NoArgsConstructor
    @Getter
    @Setter
    @Entity
    @Table(name = "socialite_users")
    public class User {
        @Id
        @GeneratedValue(strategy = GenerationType.IDENTITY)
        @Column(name = "id", nullable = false)
        private Long id;

        @Column(name = "email", nullable = false)
        private String email;

        @Column(name = "password", nullable = false)
        private String password;

        @Column(name = "identity",nullable = false)
        private String identity;

        @Column(name = "phone_number")
        private String phoneNumber;

        @Column(name = "created_on")
        @CreationTimestamp
        private ZonedDateTime createdOn;

        @Column(name = "updated_on")
        @UpdateTimestamp
        private ZonedDateTime updatedOn;
    }
Enter fullscreen mode Exit fullscreen mode
  • UserRowMapper: This class maps rows from the MySQL result set to User objects.
    public class UserRowMapper implements RowMapper<User> {
        @Override
        public User mapRow(ResultSet rs, int rowNum) throws SQLException {
            User user = new User();
            user.setId(rs.getLong("id"));
            user.setEmail(rs.getString("email"));
            user.setPassword(rs.getString("password"));
            user.setIdentity(rs.getString("identity"));
            user.setPhoneNumber(rs.getString("phone_number"));
            user.setCreatedOn(rs.getTimestamp("created_on").toInstant().atZone(ZoneId.systemDefault()));
            user.setUpdatedOn(rs.getTimestamp("updated_on").toInstant().atZone(ZoneId.systemDefault()));
            return user;
        }
    }

Enter fullscreen mode Exit fullscreen mode
  • userReader: Reads user data from MySQL and maps it to User objects using UserRowMapper.
    @Bean
    public JdbcCursorItemReader<User> userReader(@Qualifier("mySqlDataSource") DataSource mySqlDataSource) {
        return new JdbcCursorItemReaderBuilder<User>()
                .dataSource(mySqlDataSource)
                .name("userReader")
                .sql("select * from socialite_users")
                .rowMapper(new UserRowMapper())
                .build();
    }

Enter fullscreen mode Exit fullscreen mode
  • processor: An optional component for processing User objects. In this setup, it simply passes the User objects through without modification.
    @Bean
    public ItemProcessor<User, User> processor() {
        return user -> {
            // Optional: make transformations
            return user;
        };
    }

Enter fullscreen mode Exit fullscreen mode
  • userWriter: Writes User objects to PostgreSQL using an INSERT statement.
    @Bean
    public JdbcBatchItemWriter<User> userWriter(@Qualifier("postgresDataSource") DataSource postgresDataSource) {
        return new JdbcBatchItemWriterBuilder<User>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO socialite_users (id, email, password, identity, phone_number, created_on, updated_on) " +
                        "VALUES (?, ?, ?, ?, ?, ?, ?)")
                .itemPreparedStatementSetter((user, ps) -> {
                    ps.setLong(1, user.getId());
                    ps.setString(2, user.getEmail());
                    ps.setString(3, user.getPassword());
                    ps.setString(4, user.getIdentity());
                    ps.setString(5, user.getPhoneNumber());
                    ps.setTimestamp(6, Timestamp.from(user.getCreatedOn().toInstant()));
                    ps.setTimestamp(7, Timestamp.from(user.getUpdatedOn().toInstant()));
                })
                .dataSource(postgresDataSource)
                .build();
    }

Enter fullscreen mode Exit fullscreen mode
  • Step Configuration: Combines the userReader, processor, and userWriter into a step that processes chunks of 100 users at a time.
    @Bean(name = "userStep")
    public Step userStep(
            JdbcCursorItemReader<User> reader,
            JdbcBatchItemWriter<User> writer,
            ItemProcessor<User, User> processor,
            PlatformTransactionManager mySqlTransactionManager,
            JobRepository jobRepository
    ) throws Exception {
        return new StepBuilder("userStep", jobRepository)
                .<User, User>chunk(100, mySqlTransactionManager)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .startLimit(1)
                .build();
    }

Enter fullscreen mode Exit fullscreen mode

This setup ensures that user data is read from MySQL, optionally processed, and written to PostgreSQL efficiently in batches.

Step Configuration for Publication Data

1. Publication Model

The Publication model represents the data structure for publications, including following fields

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Entity
@Table(name = "publications")
public class Publication {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name = "id", nullable = false)
    private Long id;

    @Column(name = "media", nullable = false)
    private String media;

    @Column(name = "content", nullable = false)
    private String content;

    @ManyToOne(cascade = {CascadeType.PERSIST, CascadeType.MERGE, CascadeType.DETACH, CascadeType.REFRESH})
    @JoinColumn(name = "user_id", referencedColumnName = "id")
    private User user;

    @Column(name = "created_on")
    @CreationTimestamp
    private ZonedDateTime createdOn;

    @Column(name = "updated_on")
    @UpdateTimestamp
    private ZonedDateTime updatedOn;
}

Enter fullscreen mode Exit fullscreen mode

PS: One User can have multiples publications et publication belongs to one User

2. Publication Reader

The publicationReader reads data from the MySQL publications table and maps it to Publication objects using PublicationRowMapper.

@Bean
public JdbcCursorItemReader<Publication> publicationReader(@Qualifier("mySqlDataSource") DataSource mySqlDataSource) {
    return new JdbcCursorItemReaderBuilder<Publication>()
            .dataSource(mySqlDataSource)
            .name("publicationReader")
            .sql("select * from publications")
            .rowMapper(new PublicationRowMapper())
            .build();
}

Enter fullscreen mode Exit fullscreen mode

3. Publication RowMapper

The PublicationRowMapper maps the result set rows from MySQL to Publication objects, handling the optional user relationship.

public class PublicationRowMapper implements RowMapper<Publication> {
    @Override
    public Publication mapRow(ResultSet rs, int rowNum) throws SQLException {
        Publication publication = new Publication();
        publication.setId(rs.getLong("id"));
        publication.setMedia(rs.getString("media"));
        publication.setContent(rs.getString("content"));
        publication.setCreatedOn(rs.getTimestamp("created_on").toInstant().atZone(ZoneId.systemDefault()));
        publication.setUpdatedOn(rs.getTimestamp("updated_on").toInstant().atZone(ZoneId.systemDefault()));

        Long userId = rs.getLong("user_id");
        if (!rs.wasNull()) {
            User user = new User();
            user.setId(userId);
            publication.setUser(user);
        }
        return publication;
    }
}

Enter fullscreen mode Exit fullscreen mode

4. Publication Processor

The publicationItemProcessor allows for optional transformation of Publication objects before they are written to PostgreSQL.

@Bean
public ItemProcessor<Publication, Publication> publicationItemProcessor() {
    return publication -> {
        // Optional: make transformations
        return publication;
    };
}

Enter fullscreen mode Exit fullscreen mode

5. Publication Writer

The publicationWriter writes Publication objects to PostgreSQL, handling all necessary fields with an INSERT statement.

@Bean
public JdbcBatchItemWriter<Publication> publicationWriter(@Qualifier("postgresDataSource") DataSource postgresDataSource) {
    return new JdbcBatchItemWriterBuilder<Publication>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
            .sql("INSERT INTO publications VALUES (?, ?, ?, ?, ?, ?)")
            .itemPreparedStatementSetter((publication, ps) -> {
                ps.setLong(1, publication.getId());
                ps.setString(2, publication.getMedia());
                ps.setString(3, publication.getContent());
                ps.setObject(4, publication.getUser() != null ? publication.getUser().getId() : null, Types.BIGINT);
                ps.setTimestamp(5, Timestamp.from(publication.getCreatedOn().toInstant()));
                ps.setTimestamp(6, Timestamp.from(publication.getUpdatedOn().toInstant()));
            })
            .dataSource(postgresDataSource)
            .build();
}

Enter fullscreen mode Exit fullscreen mode

6. Publication Step

The publicationStep combines the publicationReader, processor, and publicationWriter into a step that processes chunks of 100 publications at a time.

@Bean(name = "publicationStep")
public Step publicationStep(
        JdbcCursorItemReader<Publication> reader,
        JdbcBatchItemWriter<Publication> writer,
        ItemProcessor<Publication, Publication> publicationItemProcessor,
        PlatformTransactionManager mySqlTransactionManager,
        JobRepository jobRepository
) throws Exception {
    return new StepBuilder("publicationStep", jobRepository)
            .<Publication, Publication>chunk(100, mySqlTransactionManager)
            .reader(reader)
            .processor(publicationItemProcessor)
            .writer(writer)
            .startLimit(1)
            .allowStartIfComplete(false)
            .build();
}

Enter fullscreen mode Exit fullscreen mode

Job Scheduling Configuration

Setting Up the Job

The migrateDataJob bean is the heart of the data migration process, ensuring a smooth transition from MySQL to PostgreSQL. Here’s how it’s set up and why the order of steps is so important:

  1. User Step 🧑‍💼: The journey begins by migrating user data. Since both publications and comments depend on users, it’s essential to have this data in place first. Think of this as laying the foundation for everything that follows.
  2. Publication Step 📚: Next up, the publications are migrated. These depend on the users, so having user data available ensures that the publications are correctly associated with their respective users.
  3. Comment Step 💬: Finally, comments are processed. Comments rely on both the users and the publications, so they’re handled last to make sure all dependencies are in place.
@Bean
public Job migrateDataJob(
        Step userStep,
        Step publicationStep,
        Step commentStep,
        JobRepository jobRepository
) throws Exception {
    return new JobBuilder("dataJob", jobRepository)
            .incrementer(new RunIdIncrementer())
            .start(userStep)             // Start with user migration 🧑‍💼
            .next(publicationStep)      // Move on to publications 📚
            .next(commentStep)         // Finally, migrate comments 💬
            .build();
}

Enter fullscreen mode Exit fullscreen mode

By following this sequence, you ensure that all dependencies are handled correctly and the data is migrated in a way that maintains integrity and consistency. 🚀

Launching the Job with a Request

Starting the Migration Job

Once the job configuration is in place, it's time to kick things off! You can trigger the migration job via a simple REST API call. Here’s how it’s done:

  1. Creating the Controller: The BatchController is your gateway to starting the data migration job. It uses JobLauncher to launch the job and JobParameters to pass any necessary parameters.
  2. Launching the Job: When you send a POST request to /batch/start, the job is launched with the current timestamp. This helps track when the migration started.
  3. Handling Responses:
    • If the job starts successfully, you'll get a confirmation message with the job ID. 🆗
    • If something goes wrong, you’ll receive an error message detailing what went wrong. 😓

Here’s the code that makes this happen:

@RestController
@RequiredArgsConstructor
@RequestMapping("/batch")
public class BatchController {
    private final JobLauncher jobLauncher;
    private final Job dataMigrationJob;

    @PostMapping("start")
    public ResponseEntity<String> startBatchJob() {
        try {
            JobParameters jobParameters = new JobParametersBuilder()
                    .addLong("startAt", System.currentTimeMillis())
                    .toJobParameters();

            JobExecution jobExecution = jobLauncher.run(dataMigrationJob, jobParameters);
            return ResponseEntity.ok("Job launched successfully with ID: " + jobExecution.getJobId()); 
        } catch (Exception exception) {
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                    .body("Job failed with error: " + exception.getMessage()); 
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Final Checks Before Launching 🔍

Before kicking off the data migration job, it’s essential to ensure a smooth process:

  • Backup the Source Database 🗄️: Make sure to back up your source database to safeguard against any unexpected issues during migration.
  • Run Migration on the Destination Database 🛠️: Ensure that the destination database has the correct table structure by running any necessary migration scripts beforehand.

These steps will help avoid potential data loss and ensure that your migration runs as smoothly as possible. You can find the complete code for this project on GitHub at https://github.com/spencer2k19/batch-data-migration.

Happy migrating! 🌟

Top comments (0)