OpenJDK / amber / amber
changeset 56822:b78af6d8a252
8225583: Examine the HttpResponse.BodySubscribers for null handling
Reviewed-by: dfuchs, prappo
author | chegar |
---|---|
date | Tue, 18 Jun 2019 14:52:36 +0100 |
parents | 32cce302a1fd |
children | 8d50ff464ae5 |
files | src/java.net.http/share/classes/java/net/http/HttpResponse.java src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java |
diffstat | 3 files changed, 35 insertions(+), 3 deletions(-) [+] |
line wrap: on
line diff
--- a/src/java.net.http/share/classes/java/net/http/HttpResponse.java Tue Jun 18 11:06:29 2019 +0200 +++ b/src/java.net.http/share/classes/java/net/http/HttpResponse.java Tue Jun 18 14:52:36 2019 +0100 @@ -1253,7 +1253,7 @@ /** * Returns a {@code BodySubscriber} which buffers data before delivering * it to the given downstream subscriber. The subscriber guarantees to - * deliver {@code buffersize} bytes of data to each invocation of the + * deliver {@code bufferSize} bytes of data to each invocation of the * downstream's {@link BodySubscriber#onNext(Object) onNext} method, * except for the final invocation, just before * {@link BodySubscriber#onComplete() onComplete} is invoked. The final
--- a/src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java Tue Jun 18 11:06:29 2019 +0200 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/LineSubscriberAdapter.java Tue Jun 18 14:52:36 2019 +0100 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2018, 2019, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -40,6 +40,7 @@ import java.util.concurrent.Flow; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -56,6 +57,7 @@ private final Function<? super S, ? extends R> finisher; private final Charset charset; private final String eol; + private final AtomicBoolean subscribed = new AtomicBoolean(); private volatile LineSubscription downstream; private LineSubscriberAdapter(S subscriber, @@ -72,6 +74,12 @@ @Override public void onSubscribe(Subscription subscription) { + Objects.requireNonNull(subscription); + if (!subscribed.compareAndSet(false, true)) { + subscription.cancel(); + return; + } + downstream = LineSubscription.create(subscription, charset, eol, @@ -82,6 +90,7 @@ @Override public void onNext(List<ByteBuffer> item) { + Objects.requireNonNull(item); try { downstream.submit(item); } catch (Throwable t) { @@ -91,6 +100,7 @@ @Override public void onError(Throwable throwable) { + Objects.requireNonNull(throwable); try { downstream.signalError(throwable); } finally {
--- a/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java Tue Jun 18 11:06:29 2019 +0200 +++ b/src/java.net.http/share/classes/jdk/internal/net/http/ResponseSubscribers.java Tue Jun 18 14:52:36 2019 +0100 @@ -125,6 +125,7 @@ @Override public void onSubscribe(Flow.Subscription subscription) { + Objects.requireNonNull(subscription); if (!subscribed.compareAndSet(false, true)) { subscription.cancel(); } else { @@ -135,6 +136,7 @@ @Override public void onNext(List<ByteBuffer> items) { + Objects.requireNonNull(items); for (ByteBuffer item : items) { byte[] buf = new byte[item.remaining()]; item.get(buf); @@ -145,6 +147,7 @@ @Override public void onError(Throwable throwable) { + Objects.requireNonNull(throwable); result.completeExceptionally(throwable); } @@ -172,6 +175,7 @@ private final FilePermission[] filePermissions; private final CompletableFuture<Path> result = new MinimalFuture<>(); + private final AtomicBoolean subscribed = new AtomicBoolean(); private volatile Flow.Subscription subscription; private volatile FileChannel out; @@ -211,6 +215,12 @@ @Override public void onSubscribe(Flow.Subscription subscription) { + Objects.requireNonNull(subscription); + if (!subscribed.compareAndSet(false, true)) { + subscription.cancel(); + return; + } + this.subscription = subscription; if (System.getSecurityManager() == null) { try { @@ -470,6 +480,7 @@ @Override public void onSubscribe(Flow.Subscription s) { + Objects.requireNonNull(s); try { if (!subscribed.compareAndSet(false, true)) { s.cancel(); @@ -600,6 +611,7 @@ @Override public void onSubscribe(Flow.Subscription subscription) { + Objects.requireNonNull(subscription); if (!subscribed.compareAndSet(false, true)) { subscription.cancel(); } else { @@ -614,6 +626,7 @@ @Override public void onError(Throwable throwable) { + Objects.requireNonNull(throwable); cf.completeExceptionally(throwable); } @@ -907,13 +920,21 @@ } } + private final AtomicBoolean subscribed = new AtomicBoolean(); + @Override public void onSubscribe(Flow.Subscription subscription) { - subscriptionCF.complete(subscription); + Objects.requireNonNull(subscription); + if (!subscribed.compareAndSet(false, true)) { + subscription.cancel(); + } else { + subscriptionCF.complete(subscription); + } } @Override public void onNext(List<ByteBuffer> item) { + Objects.requireNonNull(item); try { // cannot be called before onSubscribe() assert subscriptionCF.isDone(); @@ -941,6 +962,7 @@ // onError can be called before request(1), and therefore can // be called before subscriberRef is set. signalError(throwable); + Objects.requireNonNull(throwable); } @Override