Java – why doesn’t Postgres replication stream work when used in a separate function?

I'm working on the Postgres replication stream API Encountered unusual behavior while working hard

When I use the copy slot to write the whole code in the main block, everything is normal

public class Server implements Config {

public static void main(String[] args) {
    Properties prop = new Properties();
    prop.load(new FileInputStream(System.getProperty("prop")));

    String user = prop.getProperty("user");
    String password = prop.getProperty("password");
    String url = prop.getProperty("url");

    Properties props = new Properties();
    PGProperty.USER.set(props,user);
    PGProperty.PASSWORD.set(props,password);
    PGProperty.ASSUME_MIN_SERVER_VERSION.set(props,"9.4");
    PGProperty.REPLICATION.set(props,"database");
    PGProperty.PREFER_QUERY_MODE.set(props,"simple");

    Connection conn= null;
    PGConnection replicationConnection= null;
    PGReplicationStream stream = null;

        conn = DriverManager.getConnection(url,props);
        replicationConnection = conn.unwrap(PGConnection.class);
        stream = replicationConnection.getReplicationAPI().replicationStream().logical()
                .withSlotName("replication_slot")
                .withSlotOption("include-xids",true)
                .withSlotOption("include-timestamp","on")
                .withSlotOption("skip-empty-xacts",true)
                .withStatusInterval(20,TimeUnit.SECONDS).start();

    while (true) {
            ByteBuffer msg;
            try {
                msg = stream.readPending();         

            if (msg == null) {
                TimeUnit.MILLISECONDS.sleep(10L);
                continue;
            }

            int offset = msg.arrayOffset();
            byte[] source = msg.array();
            int length = source.length - offset;
            // convert byte buffer into string
            String data = new String(source,offset,length);

            // then convert it into bufferedreader
            BufferedReader reader = new BufferedReader(new StringReader(data));
            String line = reader.readLine();

            while (line != null) {
                System.out.println(line);
                line = reader.readLine();
            }
            stream.setAppliedLSN(stream.getLastReceiveLSN());
            stream.setFlushedLSN(stream.getLastReceiveLSN());
        } catch (sqlException | IOException | InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        }
}

But when I try to separate flow logic using such a separate method

public class Server implements Config {

public static void main(String[] args) {
    PGReplicationStream stream = getReplicationStream();
        while (true) {
            ByteBuffer msg;
            try {
                msg = stream.readPending();
    if (msg == null) {
                TimeUnit.MILLISECONDS.sleep(10L);
                continue;
            }

            int offset = msg.arrayOffset();
            byte[] source = msg.array();
            int length = source.length - offset;
            String data = new String(source,length);

            BufferedReader reader = new BufferedReader(new StringReader(data));
            String line = reader.readLine();

            while (line != null) {
                System.out.println(line);
                line = reader.readLine();
            }
            stream.setAppliedLSN(stream.getLastReceiveLSN());
            stream.setFlushedLSN(stream.getLastReceiveLSN());
        } catch (sqlException | IOException | InterruptedException e) {
            e.printStackTrace();
        }
        }

}
public static PGReplicationStream getReplicationStream() {
    Properties prop = new Properties();
    try {
        prop.load(new FileInputStream(System.getProperty("prop")));
    } catch (IOException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }

    String user = prop.getProperty("user");
    String password = prop.getProperty("password");
    String url = prop.getProperty("url");

    Properties props = new Properties();
    PGProperty.USER.set(props,"simple");

    Connection conn= null;
    PGConnection replicationConnection= null;
    PGReplicationStream stream = null;
    try {
        conn = DriverManager.getConnection(url,TimeUnit.SECONDS).start();
    } catch (sqlException e) {
        e.printStackTrace();
    }
    return stream;
}

}

After reading some data, the program made an error

org.postgresql.util.PsqlException: Database connection Failed when reading from copy
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1028)
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:78)
at Server.main(Server.java:47)
Caused by: java.net.socketException: Socket closed
at java.net.socketInputStream.socketRead0(Native Method)
at java.net.socketInputStream.socketRead(SocketInputStream.java:116)
at java.net.socketInputStream.read(SocketInputStream.java:171)
at java.net.socketInputStream.read(SocketInputStream.java:141)
at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:140)
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:109)
at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:191)
at org.postgresql.core.PGStream.receive(PGStream.java:495)
at org.postgresql.core.PGStream.receive(PGStream.java:479)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1161)
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1026)
... 5 more

I don't think there is any difference between the two methods But the plan behaves differently Someone can explain the problem with the second method

Solution

I believe your problem and connection Once your function returns, it will be out of range, and the garbage collector will collect and finalize it In finalization, the connection is closed, and then your program may fail Try moving the connection and other required intermediate variables within the scope available in the main method, and then try again

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>