Java – why doesn’t Postgres replication stream work when used in a separate function?
I'm working on the Postgres replication stream API Encountered unusual behavior while working hard
When I use the copy slot to write the whole code in the main block, everything is normal
public class Server implements Config { public static void main(String[] args) { Properties prop = new Properties(); prop.load(new FileInputStream(System.getProperty("prop"))); String user = prop.getProperty("user"); String password = prop.getProperty("password"); String url = prop.getProperty("url"); Properties props = new Properties(); PGProperty.USER.set(props,user); PGProperty.PASSWORD.set(props,password); PGProperty.ASSUME_MIN_SERVER_VERSION.set(props,"9.4"); PGProperty.REPLICATION.set(props,"database"); PGProperty.PREFER_QUERY_MODE.set(props,"simple"); Connection conn= null; PGConnection replicationConnection= null; PGReplicationStream stream = null; conn = DriverManager.getConnection(url,props); replicationConnection = conn.unwrap(PGConnection.class); stream = replicationConnection.getReplicationAPI().replicationStream().logical() .withSlotName("replication_slot") .withSlotOption("include-xids",true) .withSlotOption("include-timestamp","on") .withSlotOption("skip-empty-xacts",true) .withStatusInterval(20,TimeUnit.SECONDS).start(); while (true) { ByteBuffer msg; try { msg = stream.readPending(); if (msg == null) { TimeUnit.MILLISECONDS.sleep(10L); continue; } int offset = msg.arrayOffset(); byte[] source = msg.array(); int length = source.length - offset; // convert byte buffer into string String data = new String(source,offset,length); // then convert it into bufferedreader BufferedReader reader = new BufferedReader(new StringReader(data)); String line = reader.readLine(); while (line != null) { System.out.println(line); line = reader.readLine(); } stream.setAppliedLSN(stream.getLastReceiveLSN()); stream.setFlushedLSN(stream.getLastReceiveLSN()); } catch (sqlException | IOException | InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
But when I try to separate flow logic using such a separate method
public class Server implements Config { public static void main(String[] args) { PGReplicationStream stream = getReplicationStream(); while (true) { ByteBuffer msg; try { msg = stream.readPending(); if (msg == null) { TimeUnit.MILLISECONDS.sleep(10L); continue; } int offset = msg.arrayOffset(); byte[] source = msg.array(); int length = source.length - offset; String data = new String(source,length); BufferedReader reader = new BufferedReader(new StringReader(data)); String line = reader.readLine(); while (line != null) { System.out.println(line); line = reader.readLine(); } stream.setAppliedLSN(stream.getLastReceiveLSN()); stream.setFlushedLSN(stream.getLastReceiveLSN()); } catch (sqlException | IOException | InterruptedException e) { e.printStackTrace(); } } } public static PGReplicationStream getReplicationStream() { Properties prop = new Properties(); try { prop.load(new FileInputStream(System.getProperty("prop"))); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } String user = prop.getProperty("user"); String password = prop.getProperty("password"); String url = prop.getProperty("url"); Properties props = new Properties(); PGProperty.USER.set(props,"simple"); Connection conn= null; PGConnection replicationConnection= null; PGReplicationStream stream = null; try { conn = DriverManager.getConnection(url,TimeUnit.SECONDS).start(); } catch (sqlException e) { e.printStackTrace(); } return stream; }
}
After reading some data, the program made an error
org.postgresql.util.PsqlException: Database connection Failed when reading from copy at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1028) at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41) at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155) at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124) at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:78) at Server.main(Server.java:47) Caused by: java.net.socketException: Socket closed at java.net.socketInputStream.socketRead0(Native Method) at java.net.socketInputStream.socketRead(SocketInputStream.java:116) at java.net.socketInputStream.read(SocketInputStream.java:171) at java.net.socketInputStream.read(SocketInputStream.java:141) at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:140) at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:109) at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:191) at org.postgresql.core.PGStream.receive(PGStream.java:495) at org.postgresql.core.PGStream.receive(PGStream.java:479) at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1161) at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1026) ... 5 more
I don't think there is any difference between the two methods But the plan behaves differently Someone can explain the problem with the second method
Solution
I believe your problem and connection Once your function returns, it will be out of range, and the garbage collector will collect and finalize it In finalization, the connection is closed, and then your program may fail Try moving the connection and other required intermediate variables within the scope available in the main method, and then try again