Blog

Spring Data Aerospike: Reactive Repositories

Author's photo
Roi Menashe
Software Engineer
September 29, 2021|6 min read

Reactive Repositories for Spring Data Aerospike allow you to build high scale — high throughput non-blocking applications, suitable for microservices and event-driven architectures.

The Concept

Traditional non-reactive database operations block the current thread while waiting for a response. Imagine handling a huge number of database requests concurrently, with each request is running in a separate thread and waiting on a response. Until the database responds the thread will idle and not do any actual work. Scaling this approach can lead to a performance bottleneck, which is then commonly treated by increasing the number of threads.

Increasing the number of threads might help in some scenarios but it is suboptimal. You will still end up having many idle threads that are waiting for data, indicating an inefficient use of server resources.

The non-blocking Reactive Aerospike Repository operations let an execution thread switch to another active task, using the same underlying resources, and later comes back to the current task when the asynchronous processing has finished. This results in less idle time for the threads, and better usage of the server’s resources.

Developers have been publishing articles and benchmarks that describe how they have benefitted from using reactive programming:

“Adopting the Reactive Model allowed us to scale much faster in terms and commerce systems supported.”

https://itembase.com/resources/blog/tech/reactive-programming-performance-and-trade-offs/ By Itembase.

“Reactive application was able to process nearly twice the requests than blocking application on both JVMs. (8/11)”

https://pattern-match.com/blog/reactive-vs-blocking/ By Marek Kwiecień in Patten Match.

“WebFlux scores much better for a high load… When we execute blocking calls, the server can’t process so many requests in parallel. It’s clearly limited by the number of server threads running.”

https://thepracticaldeveloper.com/full-reactive-stack-4-conclusions/ By Moises Macero in The Practical Developer.

Some of these articles refer to HTTP requests instead of database calls, and describe using WebFlux instead of Project Reactor, or even using a relational database. Nevertheless the point remains the same, that using reactive non-blocking calls in the right way can improve performance and play a critical factor in scaling applications.

Set up

Spring Data Aerospike Reactive Repositories is built on the Aerospike Reactor client, which wraps the Aerospike Java client with reactive capabilities using Project Reactor under the hood.

Dependencies

In order to use Spring Data Aerospike reactively you need to install the following dependencies:

  1. spring-data-aerospike.

  2. aerospike-reactor-client, which is included in spring-data-aerospike but marked as “optional”, so you need to add it explicitly.

<dependency>
 <groupId>com.aerospike</groupId>
 <artifactId>spring-data-aerospike</artifactId>
 <version>3.2.0</version>
</dependency>
<!-- Adding aerospike-reactor-client explicitly since aerospike-reactor-client is marked with "optional" tag -->
<dependency>
 <groupId>com.aerospike</groupId>
 <artifactId>aerospike-reactor-client</artifactId>
 <version>5.0.7</version>
</dependency>
 ```    

Configuration

In order to configure Reactive Aerospike, all you need to do is to extend AbstractReactiveAerospikeDataConfiguration in your configuration class, override 3 methods (getHosts(), nameSpace() and eventLoops()) and add the @EnableReactiveAerospikeRepositories annotation with basePackageClasses field pointing to the requested Reactive Aerospike Repository. This example shows a custom AerospikeUserReactiveRepository.

@Configuration
@EnableReactiveAerospikeRepositories(basePackageClasses = AerospikeUserReactiveRepository.class)
public class AerospikeReactiveConfiguration extends AbstractReactiveAerospikeDataConfiguration {
    @Override
    protected Collection<Host> getHosts() {
        return Collections.singleton(new Host("localhost", 3000));
    }
    @Override
    protected String nameSpace() {
        return "test";
    }
    @Override
    protected EventLoops eventLoops() {
        return new NioEventLoops();
    }
}

Repository

Reactive Aerospike Repositories are very similar to Aerospike Repositories — with the minor difference of extending ReactiveAerospikeRepository instead of AerospikeRepository.

public interface AerospikeUserReactiveRepository extends ReactiveAerospikeRepository<User, Object> {
}

In this demo I also used a “User” as the object to store — You can check out the entire demo project code at https://github.com/aerospike-examples/spring-data-aerospike-reactive-demo.

Usage

The ReactiveAerospikeRepository works similar to the AerospikeRepository. All the known Spring Repository features are still supported; you don’t need to implement or define any CRUD operation (save, delete, findById,…). You still have query methods just by adding method signatures to the Repository interface (findBy, countBy, deleteBy,…) etc… The main difference is the return types — the reactive repository makes use of the Flux and Mono publishers, which you later subscribe to in order to start the execution of the non-blocking operation, and eventually get data. This is important — the execution of reactive operations only starts once you subscribe, nothing happens till the subscribe method is called. Let’s see some code examples; we will use the reactor-test StepVerifier in order to test our reactive code:

@SpringBootTest()
class AerospikeUserReactiveRepositoryTests {
    @Autowired
    AerospikeUserReactiveRepository aerospikeUserReactiveRepository;
    private User user1, user2, user3, user4;
...

We will use a cleanUp method that will run before each test, delete all the data, and insert 4 “built in” users to the Aerospike database.

@BeforeEach
public void cleanUp() {
    aerospikeUserReactiveRepository.deleteAll();
    user1 = new User(1, "userName1", "userName1@gmail.com", 31);
    user2 = new User(2, "userName2", "userName2@gmail.com", 32);
    user3 = new User(3, "userName3", "userName3@gmail.com", 33);
    user4 = new User(4, "userName4", "userName4@gmail.com", 34);
    StepVerifier.create(aerospikeUserReactiveRepository.saveAll(Flux.just(user1, user2, user3, user4))).expectNextCount(4).verifyComplete();
}

findBy

@Test
public void findById() {
    StepVerifier.create(aerospikeUserReactiveRepository.findById(user1.getId())
            .subscribeOn(Schedulers.parallel())).consumeNextWith(actual ->
            assertThat(actual).isEqualTo(user1)
    ).verifyComplete();
}
@Test
public void findById_ShouldNotReturnNotExistent() {
    StepVerifier.create(aerospikeUserReactiveRepository.findById("non-existent-id")
            .subscribeOn(Schedulers.parallel()))
            .expectNextCount(0).verifyComplete();
}

We expect to consume “user1” once we subscribe, and we don’t expect any results when we try to execute findById on a non existing user.

exists

@Test
public void existsById_ShouldReturnTrueWhenExists() {
    StepVerifier.create(aerospikeUserReactiveRepository.existsById(user2.getId()).subscribeOn(Schedulers.parallel()))
            .expectNext(true).verifyComplete();
}
@Test
public void existsByIdPublisher_ShouldCheckOnlyFirstElement() {
    StepVerifier.create(aerospikeUserReactiveRepository.existsById(Flux.just(user1.getId(), "non-existent-id"))
            .subscribeOn(Schedulers.parallel()))
            .expectNext(true).verifyComplete();
}

We expect true when running existsById on “user2". We also expect true when calling existsById with multiple ids — including non existing user ids when the first element exists in the Aerospike database.

delete

@Test
public void delete_ShouldDeleteExistent() {
    StepVerifier.create(aerospikeUserReactiveRepository.delete(user3).subscribeOn(Schedulers.parallel())).verifyComplete();
    StepVerifier.create(aerospikeUserReactiveRepository.findById(user3.getId())).expectNextCount(0).verifyComplete();
}
@Test
public void deleteAllPublisher_ShouldSkipNonexistent() {
    User nonExistentUser = new User(9, "nonExistingUser", "nonEistingUser@gmail.com", 75);
    aerospikeUserReactiveRepository.deleteAll(Flux.just(user1, nonExistentUser, user4)).subscribeOn(Schedulers.parallel()).block();
    StepVerifier.create(aerospikeUserReactiveRepository.findById(user1.getId())).expectNextCount(0).verifyComplete();
    StepVerifier.create(aerospikeUserReactiveRepository.findById(user4.getId())).expectNextCount(0).verifyComplete();
}

Once we delete user3 we expect findById on the id of user3 not to return any results. If we call deleteAll on multiple users it will delete existing users and ignore non existing users.

save

@Test
public void saveEntityShouldInsertNewEntity() {
    User newUserToSave = new User(5, "newUserToSave", "newUserToSave@gmail.com", 40);
    StepVerifier.create(aerospikeUserReactiveRepository.save(newUserToSave).subscribeOn(Schedulers.parallel())).expectNext(newUserToSave).verifyComplete();
    assertUserExistsInRepo(newUserToSave);
}
@Test
public void savePublisherOfMixedEntitiesShouldInsertNewAndUpdateOld() {
    User newUserToSave1 = new User(5, "newUserToSave1", "newUserToSave1@gmail.com", 41);
    User newUserToSave2 = new User(6, "newUserToSave2", "newUserToSave2@gmail.com", 42);
    User newUserToSave3 = new User(7, "newUserToSave3", "newUserToSave3@gmail.com", 43);
    StepVerifier.create(aerospikeUserReactiveRepository.save(newUserToSave1).subscribeOn(Schedulers.parallel()))
            .expectNext(newUserToSave1).verifyComplete();
    user1.setEmail("newUserToSave1NewEmail");
    user1.setAge(51);
    StepVerifier.create(aerospikeUserReactiveRepository.saveAll(Flux.just(newUserToSave1, newUserToSave2, newUserToSave3))).expectNextCount(3).verifyComplete();
    assertUserExistsInRepo(newUserToSave1);
    assertUserExistsInRepo(newUserToSave2);
    assertUserExistsInRepo(newUserToSave3);
}
private void assertUserExistsInRepo(User user) {
    StepVerifier.create(aerospikeUserReactiveRepository.findById(user.getId())).consumeNextWith(actual -> {
        assertThat(actual.getName()).isEqualTo(user.getName());
        assertThat(actual.getEmail()).isEqualTo(user.getEmail());
        assertThat(actual.getAge()).isEqualTo(user.getAge());
    }).verifyComplete();
}

If we call save operation on a non-existent user it will insert it. If we call save on multiple users, it will insert new users and update existing users.

Conclusion

We covered just the tip of the iceberg. The goal of this article was to expose the possibility of benefitting from Reactive Repositories with emphasis on scaling Aerospike Database access. There is much more you can achieve using reactive features such as a rich vocabulary of operators, simple orchestration of multiple non-blocking tasks, fully lazy computation and much more.

Learn more about Reactive Programming in general and Project Reactor in particular:

https://www.baeldung.com/java-reactive-systems

https://projectreactor.io/docs/core/release/reference/index.html

Demo project can be found here:

https://github.com/aerospike-examples/spring-data-aerospike-reactive-demo