Purpose
In this tutorial we will create a cluster storage with three servers that communicate via TCP protocol. There also will
be a cluster client. Each server will have an HTTP GUI server for simplicity of use and debugging. The cluster storage
will support file uploads with automatic repartitioning according to the provided rule and replication count.
What ActiveJ technologies will be used?
- ActiveFS - a lightweight abstraction for managing distributed file storage.
- ActiveInject - fast and powerful dependency injection library.
- ActiveJ HTTP - high-performance HTTP clients, servers and servlets.
- ActiveJ Launchers - a tool for managing application lifecycle.
- ActiveJ Eventloop - efficient management of asynchronous operations.
- ActiveJ Config - a useful extension for properties files.
What is ActiveFS?
ActiveFS is one of the most important technologies in this tutorial. It provides a lightweight abstraction on top of common file operations like upload, download, append, list, copy, move, delete, and others. It allows operations with local, remote or distributed file storage. ActiveFS is a stand-alone technology of ActiveJ Java platform, it can be used independently of the platform.
Import the components to your project
Add all the required Maven dependencies to your project:
<dependencies>
<dependency>
<groupId>io.activej</groupId>
<artifactId>activej-launchers-fs</artifactId>
<version>3.0-rc2</version>
</dependency>
<dependency>
<groupId>com.github.spullara.mustache.java</groupId>
<artifactId>compiler</artifactId>
<version>0.9.4</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
activej-launchers-fs
module will import all the required ActiveJ technologies that were mentioned before.
Set up Server Launcher
The first thing we need to do is to create a launcher class ClusterServerLauncher for our servers. We’ll need
the following instances:
- ActiveFsServer,
- local ActiveFS,
- AsyncHttpServer for GUI that will simplify working with your cluster storage,
- utils for repartitioning management like task schedulers, ClusterRepartitionController, and FsPartitions for tracking alive partitions and their statuses.
The partitions will communicate via TCP protocol, while GUI server will use HTTP protocol.
public final class ClusterServerLauncher extends Launcher {
public static final Path DEFAULT_PATH = Paths.get(System.getProperty("java.io.tmpdir"), "fs-storage");
@Provides
public Eventloop eventloop() {
return Eventloop.create();
}
@Provides
Executor executor() {
return Executors.newCachedThreadPool();
}
@Provides
ActiveFs localActivefs(Eventloop eventloop, Executor executor, Config config) {
return LocalActiveFs.create(eventloop, executor, config.get(ofPath(), "activefs.path", DEFAULT_PATH));
}
@Eager
@Provides
ActiveFsServer activeFsServer(Eventloop eventloop, ActiveFs activeFs, Config config) {
return ActiveFsServer.create(eventloop, activeFs)
.withInitializer(ofActiveFsServer(config.getChild("activefs")));
}
@Provides
AsyncServlet guiServlet(ActiveFs fs, ClusterRepartitionController controller) {
return ActiveFsGuiServlet.create(fs, "Cluster server [" + controller.getLocalPartitionId() + ']');
}
@Provides
@Eager
AsyncHttpServer guiServer(Eventloop eventloop, AsyncServlet servlet, Config config) {
return AsyncHttpServer.create(eventloop, servlet)
.withInitializer(ofHttpServer(config.getChild("activefs.http.gui")));
}
@Provides
FsPartitions fsPartitions(Config config, Eventloop eventloop, ActiveFs fs) {
Map<Object, ActiveFs> partitions = new LinkedHashMap<>();
partitions.put(config.get("activefs.repartition.localPartitionId"), fs);
return FsPartitions.create(eventloop, partitions)
.withInitializer(ofFsPartitions(config.getChild("activefs.cluster")));
}
@Provides
ClusterRepartitionController repartitionController(Config config, ActiveFsServer localServer, FsPartitions partitions) {
String localPartitionId = first(partitions.getAllPartitions());
assert localPartitionId != null;
return ClusterRepartitionController.create(localPartitionId, partitions)
.withInitializer(ofClusterRepartitionController(config.getChild("activefs.repartition")));
}
@Provides
@Eager
@Named("repartition")
EventloopTaskScheduler repartitionScheduler(Config config, ClusterRepartitionController controller) {
return EventloopTaskScheduler.create(controller.getEventloop(), controller::repartition)
.withInterval(Duration.ofSeconds(1));
}
@Provides
@Eager
@Named("clusterDeadCheck")
EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) {
return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions)
.withInterval(Duration.ofSeconds(1));
}
@Provides
Config config() {
return Config.create()
.with("activefs.listenAddresses", "*:9000")
.with("activefs.http.gui.listenAddresses", "*:8080")
.overrideWith(Config.ofClassPathProperties("activefs-server.properties", true))
.overrideWith(Config.ofSystemProperties("config"));
}
@Override
protected final Module getModule() {
return combine(
ServiceGraphModule.create(),
ConfigModule.create()
.withEffectiveConfigLogger());
}
@Override
protected void run() throws Exception {
awaitShutdown();
}
public static void main(String[] args) throws Exception {
new BasicClusterServerLauncher().launch(args);
}
}
Client Launcher
Now we can move on to creating a client launcher ClusterTcpClientLauncher. We need to provide a task scheduler to
detect dead partitions, AsyncHttpServer for GUI, remote ActiveFS, and FsPartitions for managing partitions.
We also need an instance of ClusterActiveFs class, an ActiveFs implementation that operates on other partitions as a cluster and contains some redundancy and
fail-tolerance capabilities.
public class ClusterTcpClientLauncher extends Launcher {
public static final String PROPERTIES_FILE = "activefs-client.properties";
public static final String DEFAULT_DEAD_CHECK_INTERVAL = "1 seconds";
public static final String DEFAULT_GUI_SERVER_LISTEN_ADDRESS = "*:8080";
@Provides
Eventloop eventloop() {
return Eventloop.create();
}
@Provides
@Eager
@Named("clusterDeadCheck")
EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) {
return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions)
.withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition.deadCheck")));
}
@Provides
@Eager
AsyncHttpServer guiServer(Eventloop eventloop, AsyncServlet servlet, Config config) {
return AsyncHttpServer.create(eventloop, servlet)
.withInitializer(ofHttpServer(config.getChild("activefs.http.gui")));
}
@Provides
AsyncServlet guiServlet(ActiveFs activeFs) {
return ActiveFsGuiServlet.create(activeFs, "Cluster FS Client");
}
@Provides
ActiveFs remoteActiveFs(Eventloop eventloop, FsPartitions partitions, Config config) {
return ClusterActiveFs.create(partitions)
.withInitializer(ofClusterActiveFs(config.getChild("activefs.cluster")));
}
@Provides
FsPartitions fsPartitions(Eventloop eventloop, Config config) {
return FsPartitions.create(eventloop)
.withInitializer(ofFsPartitions(config.getChild("activefs.cluster")));
}
@Provides
Config config() {
return createConfig()
.overrideWith(Config.ofClassPathProperties(PROPERTIES_FILE, true))
.overrideWith(Config.ofSystemProperties("config"));
}
protected Config createConfig(){
return Config.create()
.with("activefs.http.gui.listenAddresses", DEFAULT_GUI_SERVER_LISTEN_ADDRESS)
.with("activefs.repartition.deadCheck.schedule.type", "interval")
.with("activefs.repartition.deadCheck.schedule.value", DEFAULT_DEAD_CHECK_INTERVAL);
}
@Override
protected final Module getModule() {
return combine(
ServiceGraphModule.create(),
JmxModule.create(),
ConfigModule.create()
.withEffectiveConfigLogger());
}
@Override
protected void run() throws Exception {
awaitShutdown();
}
public static void main(String[] args) throws Exception {
new ClusterTcpClientLauncher().launch(args);
}
}
Here’s the architecture of our distributed P2P storage:
You can create as many partitions as you wish, cluster client is optional as you can create an alternative client
implementation that supports ActiveFS protocol.
Testing out the storage
Let’s launch three partitions on different ports. For this purpose you need to create three Run/Debug configurations for ClusterTcpServerLauncher and provide the following VM options:
-Dconfig.activefs.listenAddresses=*:9001
-Dconfig.activefs.http.gui.listenAddresses=*:8081
-Dconfig.activefs.cluster.partitions=127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003
-Dconfig.activefs.repartition.replicationCount=2
-Dconfig.activefs.path=/tmp/server1
-Dconfig.activefs.repartition.localPartitionId=127.0.0.1:9001
-Dconfig.activefs.listenAddresses=*:9002
-Dconfig.activefs.http.gui.listenAddresses=*:8082
-Dconfig.activefs.cluster.partitions=127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003
-Dconfig.activefs.repartition.replicationCount=2
-Dconfig.activefs.path=/tmp/server2
-Dconfig.activefs.repartition.localPartitionId=127.0.0.1:9002
-Dconfig.activefs.listenAddresses=*:9003
-Dconfig.activefs.http.gui.listenAddresses=*:8083
-Dconfig.activefs.cluster.partitions=127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003
-Dconfig.activefs.repartition.replicationCount=2
-Dconfig.activefs.path=/tmp/server3
-Dconfig.activefs.repartition.localPartitionId=127.0.0.1:9003
Next, provide the following VM options for ClusterTcpClientLauncher:
-Dconfig.activefs.http.gui.listenAddresses=*:8080
-Dconfig.activefs.cluster.partitions=127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003
The storage will run with the following configurations:
Server1: TCP-connection port 9001, HTTP GUI port 8081
Server2: TCP-connection port 9002, HTTP GUI port 8082
Server3: TCP-connection port 9003, HTTP GUI port 8083
Client: HTTP GUI port 8080
Launch all the created configurations and go to 127.0.0.1:8080
in your browser to work with the storage. Switch between ports (8081
, 8082
, 8083
) to check your partitions’ GUI.
You can try to upload files, create directories, kill some partitions to check repartitioning.
Top comments (0)