Java – spring integration – abstractinboundfilesynchronizer does not update files

I would expect the FTP synchronization mechanism to update a changed file However, as you can see here, the file is downloaded only if it does not exist For now, even if the timestamp / content has changed, the file will not be saved locally

So this is what I've found so far

Class org springframework. integration. file. remote. synchronizer. AbstractInboundFileSynchronizer

@Override
    public void synchronizeToLocalDirectory(final File localDirectory) {
        final String remoteDirectory = this.remoteDirectoryExpression.getValue(this.evaluationContext,String.class);
        try {
            int transferred = this.remoteFileTemplate.execute(new SessionCallback<F,Integer>() {

                @Override
                public Integer doInSession(Session<F> session) throws IOException {
                    F[] files = session.list(remoteDirectory);
                    if (!ObjectUtils.isEmpty(files)) {
                        List<F> filteredFiles = filterFiles(files);
                        for (F file : filteredFiles) {
                            try {
                                if (file != null) {
                                    copyFileToLocalDirectory(
                                            remoteDirectory,file,localDirectory,session);
                                }
                            }
                            catch (RuntimeException e) {
                                if (AbstractInboundFileSynchronizer.this.filter instanceof ReversibleFileListFilter) {
                                    ((ReversibleFileListFilter<F>) AbstractInboundFileSynchronizer.this.filter)
                                            .rollback(file,filteredFiles);
                                }
                                throw e;
                            }
                            catch (IOException e) {
                                if (AbstractInboundFileSynchronizer.this.filter instanceof ReversibleFileListFilter) {
                                    ((ReversibleFileListFilter<F>) AbstractInboundFileSynchronizer.this.filter)
                                            .rollback(file,filteredFiles);
                                }
                                throw e;
                            }
                        }
                        return filteredFiles.size();
                    }
                    else {
                        return 0;
                    }
                }
            });
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(transferred + " files transferred");
            }
        }
        catch (Exception e) {
            throw new MessagingException("Problem occurred while synchronizing remote to local directory",e);
        }
    }

Filter files to download I want to use org springframework. integration. ftp. filters. FTP persistentacceptoncefilelistfilter, which compares the file name with the last modified date

It then calls the copyfiletolocaldirectory function with the filtered file (to be copied)

protected void copyFileToLocalDirectory(String remoteDirectoryPath,F remoteFile,File localDirectory,Session<F> session) throws IOException {
        String remoteFileName = this.getFilename(remoteFile);
        String localFileName = this.generateLocalFileName(remoteFileName);
        String remoteFilePath = remoteDirectoryPath != null
                ? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
                : remoteFileName;
        if (!this.isFile(remoteFile)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("cannot copy,not a file: " + remoteFilePath);
            }
            return;
        }

        File localFile = new File(localDirectory,localFileName);
        if (!localFile.exists()) {
            String tempFileName = localFile.getAbsolutePath() + this.temporaryFileSuffix;
            File tempFile = new File(tempFileName);
            OutputStream outputStream = new bufferedoutputstream(new FileOutputStream(tempFile));
            try {
                session.read(remoteFilePath,outputStream);
            }
            catch (Exception e) {
                if (e instanceof RuntimeException) {
                    throw (RuntimeException) e;
                }
                else {
                    throw new MessagingException("Failure occurred while copying from remote to local directory",e);
                }
            }
            finally {
                try {
                    outputStream.close();
                }
                catch (Exception ignored2) {
                }
            }

            if (tempFile.renameTo(localFile)) {
                if (this.deleteRemoteFiles) {
                    session.remove(remoteFilePath);
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("deleted " + remoteFilePath);
                    }
                }
            }
            if (this.preserveTimestamp) {
                localFile.setLastModified(getModified(remoteFile));
            }
        }
    }

However, if the file already exists on the local disk, this method will only check (based on the file name only) and download if the file does not exist Therefore, there is basically no opportunity to download the updated file (with a new timestamp)

I tried to change the ftpinboundfilesynchronizer while playing, but it became too complicated What is the best way to "Customize" the synchronization - / copytolocaldirectory method?

Solution

Abstractinboundfilesynchronizer can be updated to identify the updated file, but it is fragile and you are experiencing other problems

Update 13 / Nov / 2016: find out how to get the modification timestamp in a few seconds

The main problem with updating abstractinboundfilesynchronizer is that it has a setter method but no (protected) getter method If the setter method is smart in the future, the updated version introduced here will break

The main problem of updating files in the local directory is Concurrency: if you are processing local files while receiving updates, you may encounter various troubles The simple way is to move the local file to the (temporary) processing directory so that updates can be received as new files without updating abstractinboundfilesynchronizer See also camel timestamp remarks

By default, the FTP server provides a modification timestamp within a few minutes For testing, I updated the FTP client to use the MLSD command, which provides a modification timestamp in seconds (milliseconds if lucky), but not all FTP servers support this

As described in the spring FTP reference, the local file filter needs to be filesystempersistent acceptoncefilelistfilter to ensure that the local file is picked up when the timestamp is modified

Under my updated abstractinboundfilesynchronizer version, and then some test classes I use

public class FtpUpdatingFileSynchronizer extends FtpInboundFileSynchronizer {

    protected final Log logger = LogFactory.getLog(this.getClass());

    private volatile Expression localFilenameGeneratorExpression;
    private volatile EvaluationContext evaluationContext;
    private volatile boolean deleteRemoteFiles;
    private volatile String remoteFileSeparator = "/";
    private volatile boolean  preserveTimestamp;

    public FtpUpdatingFileSynchronizer(SessionFactory<FTPFile> sessionFactory) {
        super(sessionFactory);
        setPreserveTimestamp(true);
    }

    @Override
    public void setLocalFilenameGeneratorExpression(Expression localFilenameGeneratorExpression) {
        super.setLocalFilenameGeneratorExpression(localFilenameGeneratorExpression);
        this.localFilenameGeneratorExpression = localFilenameGeneratorExpression;
    }

    @Override
    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
        super.setIntegrationEvaluationContext(evaluationContext);
        this.evaluationContext = evaluationContext;
    }

    @Override
    public void setDeleteRemoteFiles(boolean deleteRemoteFiles) {
        super.setDeleteRemoteFiles(deleteRemoteFiles);
        this.deleteRemoteFiles = deleteRemoteFiles;
    }

    @Override
    public void setRemoteFileSeparator(String remoteFileSeparator) {
        super.setRemoteFileSeparator(remoteFileSeparator);
        this.remoteFileSeparator = remoteFileSeparator;
    }

    @Override
    public void setPreserveTimestamp(boolean preserveTimestamp) {
        // updated
        Assert.isTrue(preserveTimestamp,"for updating timestamps must be preserved");
        super.setPreserveTimestamp(preserveTimestamp);
        this.preserveTimestamp = preserveTimestamp;
    }

    @Override
    protected void copyFileToLocalDirectory(String remoteDirectoryPath,FTPFile remoteFile,Session<FTPFile> session) throws IOException {

        String remoteFileName = this.getFilename(remoteFile);
        String localFileName = this.generateLocalFileName(remoteFileName);
        String remoteFilePath = (remoteDirectoryPath != null
                ? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
                        : remoteFileName);

        if (!this.isFile(remoteFile)) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("cannot copy,not a file: " + remoteFilePath);
            }
            return;
        }

        // start update
        File localFile = new File(localDirectory,localFileName);
        boolean update = false;
        if (localFile.exists()) {
            if (this.getModified(remoteFile) > localFile.lastModified()) {
                this.logger.info("Updating local file " + localFile);
                update = true;
            } else {
                this.logger.info("File already exists: " + localFile);
                return;
            }
        }
        // end update

        String tempFileName = localFile.getAbsolutePath() + this.getTemporaryFileSuffix();
        File tempFile = new File(tempFileName);
        OutputStream outputStream = new bufferedoutputstream(new FileOutputStream(tempFile));
        try {
            session.read(remoteFilePath,outputStream);
        } catch (Exception e) {
            if (e instanceof RuntimeException) {
                throw (RuntimeException) e;
            }
            else {
                throw new MessagingException("Failure occurred while copying from remote to local directory",e);
            }
        } finally {
            try {
                outputStream.close();
            }
            catch (Exception ignored2) {
            }
        }
        // updated
        if (update && !localFile.delete()) {
            throw new MessagingException("Unable to delete local file [" + localFile + "] for update.");
        }
        if (tempFile.renameTo(localFile)) {
            if (this.deleteRemoteFiles) {
                session.remove(remoteFilePath);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("deleted " + remoteFilePath);
                }
            }
            // updated
            this.logger.info("Stored file locally: " + localFile);
        } else {
            // updated
            throw new MessagingException("Unable to rename temporary file [" + tempFile + "] to [" + localFile + "]");
        }
        if (this.preserveTimestamp) {
            localFile.setLastModified(getModified(remoteFile));
        }
    }

    private String generateLocalFileName(String remoteFileName) {

        if (this.localFilenameGeneratorExpression != null) {
            return this.localFilenameGeneratorExpression.getValue(this.evaluationContext,remoteFileName,String.class);
        }
        return remoteFileName;
    }

}

Follow some of the test classes I use I use dependency org springframework. integration:spring-integration-ftp:4.3. 5. Release and org apache. ftpserver:ftpserver-core:1.0. 6 (plus usual logging and test dependencies)

public class TestFtpSync {

    static final Logger log = LoggerFactory.getLogger(TestFtpSync.class);
    static final String FTP_ROOT_DIR = "target" + File.separator + "ftproot";
    // org.apache.ftpserver:ftpserver-core:1.0.6
    static FtpServer server;

    @BeforeClass
    public static void startServer() throws FtpException {

        File ftpRoot = new File (FTP_ROOT_DIR);
        ftpRoot.mkdirs();
        TestUserManager userManager = new TestUserManager(ftpRoot.getAbsolutePath());
        FtpServerFactory serverFactory = new FtpServerFactory();
        serverFactory.setUserManager(userManager);
        ListenerFactory factory = new ListenerFactory();
        factory.setPort(4444);
        serverFactory.addListener("default",factory.createListener());
        server = serverFactory.createServer();
        server.start();
    }

    @AfterClass
    public static void stopServer() {

        if (server != null) {
            server.stop();
        }
    }

    File ftpFile = Paths.get(FTP_ROOT_DIR,"test1.txt").toFile();
    File ftpFile2 = Paths.get(FTP_ROOT_DIR,"test2.txt").toFile();

    @Test
    public void syncDir() {

        // org.springframework.integration:spring-integration-ftp:4.3.5.RELEASE
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext();
        try {
            ctx.register(FtpSyncConf.class);
            ctx.refresh();
            PollableChannel msgChannel = ctx.getBean("inputChannel",PollableChannel.class);
            for (int j = 0; j < 2; j++) {
                for (int i = 0; i < 2; i++) {
                    storeFtpFile();
                }
                for (int i = 0; i < 4; i++) {
                    fetchMessage(msgChannel);
                }
            }
        } catch (Exception e) {
            throw new AssertionError("FTP test Failed.",e);
        } finally {
            ctx.close();
            cleanup();
        }
    }

    boolean tswitch = true;

    void storeFtpFile() throws IOException,InterruptedException {

        File f = (tswitch ? ftpFile : ftpFile2);
        tswitch = !tswitch;
        log.info("Writing message " + f.getName());
        Files.write(f.toPath(),("Hello " + System.currentTimeMillis()).getBytes());
    }

    Message<?> fetchMessage(PollableChannel msgChannel) {

        log.info("Fetching message.");
        Message<?> msg = msgChannel.receive(1000L);
        if (msg == null) {
            log.info("No message.");
        } else {
            log.info("Have a message: " + msg);
        }
        return msg;
    }

    void cleanup() {

        delFile(ftpFile);
        delFile(ftpFile2);
        File d = new File(FtpSyncConf.LOCAL_DIR);
        if (d.isDirectory()) {
            for (File f : d.listFiles()) {
                delFile(f);
            }
        }
        log.info("Finished cleanup");
    }

    void delFile(File f) {

        if (f.isFile()) {
            if (f.delete()) {
                log.info("Deleted " + f);
            } else {
                log.error("Cannot delete file " + f);
            }
        }
    }

}

public class MlistFtpSessionFactory extends AbstractFtpSessionFactory<MlistFtpClient> {

    @Override
    protected MlistFtpClient createClientInstance() {
        return new MlistFtpClient();
    }

}

public class MlistFtpClient extends FTPClient {

    @Override
    public FTPFile[] listFiles(String pathname) throws IOException {
        return super.mlistDir(pathname);
    }
}

@EnableIntegration
@Configuration
public class FtpSyncConf {

    private static final Logger log = LoggerFactory.getLogger(FtpSyncConf.class);

    public static final String LOCAL_DIR = "/tmp/received";

    @Bean(name = "ftpMetaData")
    public ConcurrentMetadataStore ftpMetaData() {
        return new SimpleMetadataStore();
    }

    @Bean(name = "localMetaData")
    public ConcurrentMetadataStore localMetaData() {
        return new SimpleMetadataStore();
    }

    @Bean(name = "ftpFileSyncer")
    public FtpUpdatingFileSynchronizer ftpFileSyncer(
            @Qualifier("ftpMetaData") ConcurrentMetadataStore MetadataStore) {

        MlistFtpSessionFactory ftpSessionFactory = new MlistFtpSessionFactory();
        ftpSessionFactory.setHost("localhost");
        ftpSessionFactory.setPort(4444);
        ftpSessionFactory.setUsername("demo");
        ftpSessionFactory.setPassword("demo");

        FtpPersistentAcceptOnceFileListFilter fileFilter = new FtpPersistentAcceptOnceFileListFilter(MetadataStore,"ftp");
        fileFilter.setFlushOnUpdate(true);
        FtpUpdatingFileSynchronizer ftpFileSync = new FtpUpdatingFileSynchronizer(ftpSessionFactory);
        ftpFileSync.setFilter(fileFilter);
        // ftpFileSync.setDeleteRemoteFiles(true);
        return ftpFileSync;
    }

    @Bean(name = "syncFtp")
    @InboundChannelAdapter(value = "inputChannel",poller = @Poller(fixedDelay = "500",maxMessagesPerPoll = "1"))
    public MessageSource<File> syncChannel(
            @Qualifier("localMetaData") ConcurrentMetadataStore MetadataStore,@Qualifier("ftpFileSyncer") FtpUpdatingFileSynchronizer ftpFileSync) throws Exception {

        FtpInboundFileSynchronizingMessageSource messageSource = new FtpInboundFileSynchronizingMessageSource(ftpFileSync);
        File receiveDir = new File(LOCAL_DIR);
        receiveDir.mkdirs();
        messageSource.setLocalDirectory(receiveDir);
        messageSource.setLocalFilter(new FileSystemPersistentAcceptOnceFileListFilter(MetadataStore,"local"));
        log.info("Message source bean created.");
        return messageSource;
    }

    @Bean(name = "inputChannel")
    public PollableChannel inputChannel() {

        QueueChannel channel = new QueueChannel();
        log.info("Message channel bean created.");
        return channel;
    }

}

/**
 * Copied from https://github.com/spring-projects/spring-integration-samples/tree/master/basic/ftp/src/test/java/org/springframework/integration/samples/ftp/support
 * @author Gunnar Hillert
 *
 */
public class TestUserManager extends AbstractUserManager {
    private BaseUser testUser;
    private BaseUser anonUser;

    private static final String TEST_USERNAME = "demo";
    private static final String TEST_PASSWORD = "demo";

    public TestUserManager(String homeDirectory) {
        super("admin",new ClearTextPasswordEncryptor());

        testUser = new BaseUser();
        testUser.setAuthorities(Arrays.asList(new Authority[] {new ConcurrentLoginPermission(1,1),new WritePermission()}));
        testUser.setEnabled(true);
        testUser.setHomeDirectory(homeDirectory);
        testUser.setMaxIdleTime(10000);
        testUser.setName(TEST_USERNAME);
        testUser.setPassword(TEST_PASSWORD);

        anonUser = new BaseUser(testUser);
        anonUser.setName("anonymous");
    }

    public User getUserByName(String username) throws FtpException {
        if(TEST_USERNAME.equals(username)) {
            return testUser;
        } else if(anonUser.getName().equals(username)) {
            return anonUser;
        }

        return null;
    }

    public String[] getAllUserNames() throws FtpException {
        return new String[] {TEST_USERNAME,anonUser.getName()};
    }

    public void delete(String username) throws FtpException {
        throw new UnsupportedOperationException("Deleting of FTP Users is not supported.");
    }

    public void save(User user) throws FtpException {
        throw new UnsupportedOperationException("Saving of FTP Users is not supported.");
    }

    public boolean doesExist(String username) throws FtpException {
        return (TEST_USERNAME.equals(username) || anonUser.getName().equals(username)) ? true : false;
    }

    public User authenticate(Authentication authentication) throws AuthenticationFailedException {
        if(UsernamePasswordAuthentication.class.isAssignableFrom(authentication.getClass())) {
            UsernamePasswordAuthentication upAuth = (UsernamePasswordAuthentication) authentication;

            if(TEST_USERNAME.equals(upAuth.getUsername()) && TEST_PASSWORD.equals(upAuth.getpassword())) {
                return testUser;
            }

            if(anonUser.getName().equals(upAuth.getUsername())) {
                return anonUser;
            }
        } else if(AnonymousAuthentication.class.isAssignableFrom(authentication.getClass())) {
            return anonUser;
        }

        return null;
    }
}

Update 15 / Nov / 2016: XML configuration considerations

Through spring-integration-ftp-4.3 5.RELEASE. jar!/ Meta inf / spring, via ftpnamespacehandler, via org springframework. integration. ftp. config. The ftpinboundchanneladapterparser links the XML element inbound channel adapter directly to the ftpinboundfilesynchronizer handlers. Follow the XML custom reference guide in the local meta-inf / spring The custom ftpnamespacehandler specified in the handlers file should allow you to use ftupdatingfilesynchronizer instead of ftpinboundfilesynchronizer It doesn't work for my unit test, and a correct solution may involve creating additional / modified XSD files so that the regular inbound channel adapter is using the regular ftpinboundfilesynchronizer and a special inbound update channel adapter is using the ftupdatingfilesynchronizer Doing this correctly is a little beyond the scope of this answer A quick hacker can get you started You can create the package org. Org in your local project springframework. integration. ftp. Config and class ftpnamespacehandler to override the default ftpnamespacehandler The contents are as follows:

package org.springframework.integration.ftp.config;

public class FtpNamespaceHandler extends org.springframework.integration.config.xml.AbstractIntegrationNamespaceHandler {

    @Override
    public void init() {
        System.out.println("Initializing FTP updating file synchronizer.");
        // one updated line below,rest copied from original FtpNamespaceHandler
        registerBeanDeFinitionParser("inbound-channel-adapter",new MyFtpInboundChannelAdapterParser());
        registerBeanDeFinitionParser("inbound-streaming-channel-adapter",new FtpStreamingInboundChannelAdapterParser());
        registerBeanDeFinitionParser("outbound-channel-adapter",new FtpOutboundChannelAdapterParser());
        registerBeanDeFinitionParser("outbound-gateway",new FtpOutboundGatewayParser());
    }

}

package org.springframework.integration.ftp.config;

import org.springframework.integration.file.remote.synchronizer.InboundFileSynchronizer;
import org.springframework.integration.ftp.config.FtpInboundChannelAdapterParser;

public class MyFtpInboundChannelAdapterParser extends FtpInboundChannelAdapterParser {

    @Override
    protected Class<? extends InboundFileSynchronizer> getInboundFileSynchronizerClass() {
        System.out.println("Returning updating file synchronizer.");
        return FtpUpdatingFileSynchronizer.class;
    }

}

In addition, add preserve timestamp = "true" in the XML file to prevent new illegalargumentexception: the update timestamp must be retained

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>