[go: up one dir, main page]

DEV Community

Chanh Le
Chanh Le

Posted on • Edited on

Spring Integration: What is it?

I am working a project using Spring Integration for encrypting some PHI files.

The system is responsible for scanning input files a folder, then perform some encryption, and finally upload the output to another folder.

However, the input and output folders are actually remote folders, which the system can connect via SFTP.

With Spring Integration, you can imagine that you don't need to write code for interacting with the SFTP servers. All you need is some configuration using XML-based or Annotation-based.

By that, you can focus on the main tasks - encryption.

The workflow will have these main steps:

  1. Scan for input files on input SFTP folder using an InboundChannelAdapter with SFTP protocol. The output of this step will be a Message on a Channel (named processingChannel).
  2. Consume messages from the channel and do encryption, then build another message with the output and push it into another channel (named deliveringChannel).
  3. Finally, an OutboundChannelAdapter, will consume messages from the deliveringChannel and upload it to the output SFTP.

You can configure all 3 steps very easy and simple.
Then, there is only left for you to do is implementing the main business - encrypt the files.

You can see that our encryption service is actually the consumer of the processingChannel. And we can scale this consumer by using different kinds of channel: DirectChannel (for point-to-point consuming; ExecutorChannel for fanning out the messages to all threads inside a thread-pool; Pub/Sub Channel for multiple consumers.

Let's configure the channels:

ChannelConfig.java

@Configuration
@EnableIntegration
public class ChannelConfig {

    @Bean
    public MessageChannel processingChannel() {
        return new ExecutorChannel(Executors.newFixedThreadPool(10));
    }

    @Bean
    public MessageChannel deliveringChannel() {
        return new ExecutorChannel(Executors.newFixedThreadPool(10));

    }
}
Enter fullscreen mode Exit fullscreen mode

Both channels are ExecutorChannel which use a ThreadPool to handle messages concurrently.

Here is the configuration for Inbound Channel Adapter.

SftpInboundConfig.java

@Configuration
@EnableIntegration
public class SftpInboundConfig {

    @Bean
    public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPassword(password);
        factory.setAllowUnknownKeys(true);
        return factory;
    }

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlow
                .from(
                        Sftp.inboundAdapter(sftpSessionFactory())
                                .remoteDirectory(remoteDir)
                                .localDirectory(new File(localDir)),
                        e -> e.id("sftpInboundAdapter")
                                .autoStartup(true)
                                .poller(Pollers.fixedDelay(1000))
                )
                .channel("processingChannel")
                .get()
    }
}
Enter fullscreen mode Exit fullscreen mode

We define a SFTP session factory for creating session/connection to input SFTP server. The sftpInboundFlow will use this factory to create session and download files from Input SFTP server.

And here is for Outbound Channel Adapter.

SftpOutboundConfig.java

@Configuration
@EnableIntegration
public class SftpOutboundConfig {

    @Bean
    public SessionFactory<SftpClient.DirEntry> sftpOutboundSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPassword(password);
        factory.setAllowUnknownKeys(true);
        return factory;
    }

    @Bean
    public IntegrationFlow outboundFlow() {
        return IntegrationFlow.from("deliveringChannel")
                .handle(Sftp.outboundAdapter(sftpOutboundSessionFactory())
                        .remoteDirectory(remoteDir)
                        .autoCreateDirectory(true)
                )
                .get();
    }
}
Enter fullscreen mode Exit fullscreen mode

We also define a SFTP session factory for output SFTP server. The outboundFlow will use this factory to create session and upload encrypted files to output SFTP server.

Wiring with our main business

And finally, we need to map our EncryptionService with processingChannel and deliveringChannel. This service will consume messages from processingChannel, do encryption and push another message to deliveringChannel.

ProcessingFlowConfig.java

@Configuration
@EnableIntegration
public class ProcessingFlowConfig {

    @Bean
    public IntegrationFlow processFlow() {
        return IntegrationFlow.from("processingChannel")
                .handle("encryptionService", "encrypt")
                .channel("deliveringChannel")
                .get();
    }

}

Enter fullscreen mode Exit fullscreen mode

We use IntegrationFlow to consume messages from processingChannel and call encryptionService.encrypt method to do encryption and output messages will be pushed to deliveringChannel.

Let's pretend we have a simple EncryptionService like this:
EncryptionService.java

@Service
public class EncryptionService {

    public Message<File> encrypt(Message<File> message) {
        File encrypted = doSomeEncryption(message.getPayload());
        return MessageBuilder.withPayload(encrypted).build();
    }
}
Enter fullscreen mode Exit fullscreen mode

Just like that. We're done.

Conclusion

With Spring Integration, we can seamlessly connect systems, services, and components by using adapters, channels, and integration flows. This allows us to shift our focus away from the plumbing of connectivity and instead concentrate on our core business — processing and transforming messages and data across multiple systems within our enterprise and integrating with external services from partner organizations.

Top comments (0)