Saved searches
Use saved searches to filter your results more quickly
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session. You switched accounts on another tab or window. Reload to refresh your session.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DataRow memory leak #396
DataRow memory leak #396
status: waiting-for-feedback We need additional information before we can continue status: waiting-for-triage An issue we’ve not yet triaged
Comments
Bug Report
Versions
Driver: 0.8.6.RELEASE
Database: PostgreSQL 13.2 (Debian 13.2-1.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit
Java:
java version «15.0.1» 2020-10-20
Java(TM) SE Runtime Environment (build 15.0.1+9-18)
Java HotSpot(TM) 64-Bit Server VM (build 15.0.1+9-18, mixed mode, sharing)
OS: Ubuntu 18.04.5 LTS
Current Behavior
We are facing a memory leak while executing a select query with a limit of 30m rows. A heap analysis is showing that more than 4m instances of io.r2dbc.postgresql.message.backend.DataRow are created in a few seconds. This is not always the case but it is very common if we run the query more than once.
This is the JProfiler report after some minutes of execution:
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "boundedElastic-evictor-1" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "parallel-1"
Table schema
create table if not exists test_r2dbc.users ( "userId" integer, attr1 integer, attr2 integer, attr3 integer, attr4 integer, attr5 integer, attr6 integer, attr7 integer, attr8 integer, attr9 integer, attr10 integer, attr11 text, attr12 text, attr13 text, attr14 text, attr15 text, attr16 text, attr17 text, attr18 text, attr19 text, attr20 text );
Steps to reproduce
ConnectionFactory conFactory = getConnectionFactory(config.getDatabaseDetails()); return Flux.usingWhen( conFactory.create(), conn -> Mono.from(conn.createStatement("select attr1, attr2, attr3 attr4, attr5 from test_r2dbc.users limit 30000000").execute()) .flatMapMany(result -> result.map(this::toProfile)), Connection::close); private ConnectionFactory getConnectionFactory(DatabaseDetails details) < return ConnectionFactories.get(ConnectionFactoryOptions.builder() .option(ConnectionFactoryOptions.DRIVER, details.getDriver()) .option(ConnectionFactoryOptions.HOST, details.getHost()) .option(ConnectionFactoryOptions.PORT, details.getPort()) .option(ConnectionFactoryOptions.USER, details.getUser()) .option(ConnectionFactoryOptions.PASSWORD, details.getPassword()) .option(ConnectionFactoryOptions.DATABASE, details.getDatabase()) .build()); >
The text was updated successfully, but these errors were encountered:
mp911de added the status: waiting-for-feedback We need additional information before we can continue label Apr 16, 2021
private UserProfile toProfile(Row row, RowMetadata rowMetadata) < Mapattributes = mapAttributes(row, rowMetadata); return UserProfile.builder() .identifier(row.get("userId", String.class)) .type(UserIdentifierType.MSISDN) .attributes(attributes) .build(); > private Map mapAttributes(Row row, RowMetadata rowMetadata) < Mapcolumns = new HashMap<>(); rowMetadata.getColumnNames() .forEach(columnName -> < Object columnValue = row.get(columnName); if (columnValue != null) < columns.put(columnName, columnValue); >>); columns.remove("userId"); return columns; >
Thank you. May I assume that you no longer hold on to the Row object in your map? Maybe it is just an effect of having proper load in combination with direct buffers. You could switch to unpooled heap buffers instead of direct memory to let GC clean up the buffers.
How can i switch to unpooled buffers? Thank you.
Via system property, check out netty/netty#6305 (comment) for all netty property names.
Specifically, io.netty.allocator.type=unpooled and io.netty.noPreferDirect=true
I tested it with system variables -Dio.netty.noPreferDirect=true -Dio.netty.allocator.type=unpooled. The OOM error persists. Millions instances of io.r2dbc.postgresql.message.backend.DataRow are created in a matter of seconds.
The unpooled heap buffer is used as shown below:
Stacktrace has changed to the following:
java.lang.OutOfMemoryError: Java heap space
2021-04-22 22:47:54,731 ERROR reactor-tcp-epoll-8 r.n.c.ChannelOperationsHandler — [id:324f5be1, L:/127.0.0.1:55474 — R:localhost/127.0.0.1:5432] Error was received while reading the incoming data. The connection will be closed.
java.lang.OutOfMemoryError: Java heap space
at io.netty.buffer.AbstractByteBuf.slice(AbstractByteBuf.java:1221)
at io.netty.buffer.AbstractUnpooledSlicedByteBuf.slice(AbstractUnpooledSlicedByteBuf.java:230)
at io.netty.buffer.AbstractByteBuf.retainedSlice(AbstractByteBuf.java:1226)
at io.netty.buffer.AbstractByteBuf.readRetainedSlice(AbstractByteBuf.java:888)
at io.r2dbc.postgresql.message.backend.DataRow.decodeColumn(DataRow.java:110)
at io.r2dbc.postgresql.message.backend.DataRow.decode(DataRow.java:100)
at io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decodeBody(BackendMessageDecoder.java:65)
at io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decode(BackendMessageDecoder.java:39)
at io.r2dbc.postgresql.client.ReactorNettyClient$$Lambda$1320/0x00000008013df4c8.apply(Unknown Source)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:281)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:280)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:389)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
2021-04-22 22:47:56,553 ERROR reactor-tcp-epoll-8 r.c.p.Operators — Operator called default onErrorDropped
reactor.netty.ReactorNetty$InternalNettyException: java.lang.OutOfMemoryError: Java heap space
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ ⇢ at reactor.netty.channel.ChannelOperations.receiveObject(ChannelOperations.java:267)
|_ Flux.from ⇢ at reactor.netty.ReactorNetty.publisherOrScalarMap(ReactorNetty.java:516)
|_ Flux.map ⇢ at reactor.netty.ReactorNetty.publisherOrScalarMap(ReactorNetty.java:517)
|_ Flux.from ⇢ at reactor.netty.ByteBufFlux.fromInbound(ByteBufFlux.java:72)
|_ Flux.doOnError ⇢ at io.r2dbc.postgresql.client.ReactorNettyClient.(ReactorNettyClient.java:153)
|_ Flux.handle ⇢ at io.r2dbc.postgresql.client.ReactorNettyClient.(ReactorNettyClient.java:157)
Stack trace:
Caused by: java.lang.OutOfMemoryError: Java heap space
at io.netty.buffer.AbstractByteBuf.slice(AbstractByteBuf.java:1221)
at io.netty.buffer.AbstractUnpooledSlicedByteBuf.slice(AbstractUnpooledSlicedByteBuf.java:230)
at io.netty.buffer.AbstractByteBuf.retainedSlice(AbstractByteBuf.java:1226)
at io.netty.buffer.AbstractByteBuf.readRetainedSlice(AbstractByteBuf.java:888)
at io.r2dbc.postgresql.message.backend.DataRow.decodeColumn(DataRow.java:110)
at io.r2dbc.postgresql.message.backend.DataRow.decode(DataRow.java:100)
at io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decodeBody(BackendMessageDecoder.java:65)
at io.r2dbc.postgresql.message.backend.BackendMessageDecoder.decode(BackendMessageDecoder.java:39)
at io.r2dbc.postgresql.client.ReactorNettyClient$$Lambda$1320/0x00000008013df4c8.apply(Unknown Source)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:281)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:280)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:389)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)