Abstract
An Apache Beam SingleStoreDB I/O connector is now available and, in this short article, we'll see how to connect to SingleStoreDB to read from a table and write to a table using Java.
The Java code files used in this article are available on GitHub.
Introduction
Previous articles have shown examples of SingleStoreDB use cases and integrations with various connectors. A new addition to the list of connectors is the Apache Beam SingleStoreDB I/O connector. This short article will test the connector with basic read-and-write operations using Java.
Create a SingleStoreDB Cloud account
A previous article showed the steps required to create a free SingleStoreDB Cloud account. We'll use Beam Demo Group as our Workspace Group Name and beam-demo as our Workspace Name.
Once we've created our database in the following steps, we'll make a note of our password and host name.
Create a Database and Tables
In our SingleStoreDB Cloud account, we'll use the SQL Editor to create a new database, as follows:
CREATE DATABASE IF NOT EXISTS adtech;
We'll also create two tables, derived from the previous AdTech example, as follows:
USE adtech;
CREATE TABLE campaigns_read (
campaign_id SMALLINT(6),
campaign_name VARCHAR(255)
);
CREATE TABLE campaigns_write (
campaign_id SMALLINT(6),
campaign_name VARCHAR(255)
);
We'll populate the campaigns_read
table, as follows:
INSERT INTO campaigns_read VALUES
(1,'demand great'),
(2,'blackout'),
(3,'flame broiled'),
(4,'take it from a fish'),
(5,'thank you'),
(6,'designed by you'),
(7,'virtual launch'),
(8,'ultra light'),
(9,'warmth'),
(10,'run healthy'),
(11,'virtual city'),
(12,'online lifestyle'),
(13,'dream burger'),
(14,'super bowl tweet');
The complete SQL code is listed in Appendix A.
Create a Maven project
For quick testing, we'll use maven
and build and run our code from the command line.
All the project code files are listed in Appendix B.
pom.xml
The pom.xml
file is very straightforward with details of the Java version, the three main dependencies and that we want to build a single jar file with all the dependencies.
S2ReadTable class
Our Java code will provide the connection details and read the data from the campaigns_read
table as key-value. We'll pass a parameter to our SQL query to make it more interesting.
Pipeline pipeline = Pipeline.create();
PCollection<KV<Integer, String>> data = pipeline.apply(SingleStoreIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(DataSourceConfiguration
.create(s2_host)
.withUsername("admin")
.withPassword(s2_password)
.withDatabase("adtech"))
.withQuery("SELECT * FROM campaigns_read WHERE campaign_id > ?")
.withStatementPreparator(new StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setInt(1, 7);
}
})
.withRowMapper(new RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
data
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV<Integer, String> kv) -> kv.getKey() + "," + kv.getValue()))
.apply(TextIO
.write().to("/path/to/s2").withNumShards(1).withSuffix(".csv")
);
Once the data are in our PCollection
, we'll convert the key-values to string format and write the data into a file. We'll replace /path/to/
with the actual path where we want to write the file to.
S2WriteTable class
We can also write data into a SingleStoreDB table. Our Java code will provide the connection details and read the data from a file as key-value and write it into the campaigns_write
table.
Pipeline pipeline = Pipeline.create();
PCollection<String> lines = pipeline.apply(
TextIO.read().from("/path/to/s2-00000-of-00001.csv"));
PCollection<KV<Integer, String>> keyValues = lines.apply(
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via((String line) -> {
String[] fields = line.split(",");
return KV.of(Integer.parseInt(fields[0]), fields[1]);
})
);
keyValues.apply(SingleStoreIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(DataSourceConfiguration
.create(s2_host)
.withUsername("admin")
.withPassword(s2_password)
.withDatabase("adtech"))
.withTable("campaigns_write")
.withUserDataMapper(new UserDataMapper<KV<Integer, String>>() {
public List<String> mapRow(KV<Integer, String> element) {
List<String> result = new ArrayList<>();
result.add(element.getKey().toString());
result.add(element.getValue());
return result;
}
})
);
We'll replace /path/to/
with the actual path where we want to read the file from. We'll use the same file created by S2ReadTable
.
Build and Run the Code
Just for testing purposes, we'll declare two environment variables:
export S2_HOST="<host>"
export S2_PASSWORD="<password>"
We'll replace the <host>
and <password>
with the values from our SingleStoreDB Cloud account.
Next, we'll build the code, as follows:
mvn clean compile assembly:single
First, we'll run the S2ReadTable
Java code, as follows:
java -cp target/s2-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.s2.beam.S2ReadTable
The CSV file should be written to the location we specified. If we look at the contents, we should see results similar to the following:
9,warmth
11,virtual city
8,ultra light
13,dream burger
10,run healthy
12,online lifestyle
14,super bowl tweet
Second, we'll run the S2WriteTable
Java code as follows:
java -cp target/s2-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.s2.beam.S2WriteTable
If we switch to the SQL Editor in SingleStoreDB Cloud, we can check the contents of the campaigns_write
table as follows:
SELECT * FROM campaigns_write;
The output should be similar to the following:
+-------------+------------------+
| campaign_id | campaign_name |
+-------------+------------------+
| 12 | online lifestyle |
| 10 | run healthy |
| 13 | dream burger |
| 9 | warmth |
| 8 | ultra light |
| 14 | super bowl tweet |
| 11 | virtual city |
+-------------+------------------+
Summary
This short article has shown examples of how to read from and write to SingleStoreDB using Apache Beam. Further information is available in the documentation.
Appendix A — SQL Code
CREATE DATABASE IF NOT EXISTS adtech;
USE adtech;
CREATE TABLE campaigns_read (
campaign_id SMALLINT(6),
campaign_name VARCHAR(255)
);
CREATE TABLE campaigns_write (
campaign_id SMALLINT(6),
campaign_name VARCHAR(255)
);
INSERT INTO campaigns_read VALUES
(1,'demand great'),
(2,'blackout'),
(3,'flame broiled'),
(4,'take it from a fish'),
(5,'thank you'),
(6,'designed by you'),
(7,'virtual launch'),
(8,'ultra light'),
(9,'warmth'),
(10,'run healthy'),
(11,'virtual city'),
(12,'online lifestyle'),
(13,'dream burger'),
(14,'super bowl tweet');
SELECT * FROM campaigns_write;
Appendix B — Java Project Code
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.s2</groupId>
<artifactId>s2-app</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-direct-java -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.44.0</version>
<scope>runtime</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.singlestore/singlestore-jdbc-client -->
<dependency>
<groupId>com.singlestore</groupId>
<artifactId>singlestore-jdbc-client</artifactId>
<version>1.1.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-singlestore -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-singlestore</artifactId>
<version>2.44.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>fully.qualified.MainClass</mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
S2ReadTable.java
package com.s2.beam;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.DataSourceConfiguration;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.RowMapper;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.StatementPreparator;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class S2ReadTable {
public static void main(String[] args) {
String s2_host = System.getenv("S2_HOST");
String s2_password = System.getenv("S2_PASSWORD");
Pipeline pipeline = Pipeline.create();
PCollection<KV<Integer, String>> data = pipeline.apply(SingleStoreIO.<KV<Integer, String>>read()
.withDataSourceConfiguration(DataSourceConfiguration
.create(s2_host)
.withUsername("admin")
.withPassword(s2_password)
.withDatabase("adtech"))
.withQuery("SELECT * FROM campaigns_read WHERE campaign_id > ?")
.withStatementPreparator(new StatementPreparator() {
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setInt(1, 7);
}
})
.withRowMapper(new RowMapper<KV<Integer, String>>() {
public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getInt(1), resultSet.getString(2));
}
})
);
data
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV<Integer, String> kv) -> kv.getKey() + "," + kv.getValue()))
.apply(TextIO
.write().to("/path/to/s2").withNumShards(1).withSuffix(".csv")
);
pipeline.run().waitUntilFinish();
}
}
S2WriteTable.java
package com.s2.beam;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.DataSourceConfiguration;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO.UserDataMapper;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.ArrayList;
import java.util.List;
public class S2WriteTable {
public static void main(String[] args) {
String s2_host = System.getenv("S2_HOST");
String s2_password = System.getenv("S2_PASSWORD");
Pipeline pipeline = Pipeline.create();
PCollection<String> lines = pipeline.apply(
TextIO.read().from("/path/to/s2-00000-of-00001.csv"));
PCollection<KV<Integer, String>> keyValues = lines.apply(
MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via((String line) -> {
String[] fields = line.split(",");
return KV.of(Integer.parseInt(fields[0]), fields[1]);
})
);
keyValues.apply(SingleStoreIO.<KV<Integer, String>>write()
.withDataSourceConfiguration(DataSourceConfiguration
.create(s2_host)
.withUsername("admin")
.withPassword(s2_password)
.withDatabase("adtech"))
.withTable("campaigns_write")
.withUserDataMapper(new UserDataMapper<KV<Integer, String>>() {
public List<String> mapRow(KV<Integer, String> element) {
List<String> result = new ArrayList<>();
result.add(element.getKey().toString());
result.add(element.getValue());
return result;
}
})
);
pipeline.run().waitUntilFinish();
}
}
Top comments (0)