JedisConnectionException: Unexpected end of stream #932
Repeatable exception and for the life of me, I cannot find something I'm doing wrong. redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream. at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:198) at redis.clients.util.RedisInputStream.read(RedisInputStream.java:180) at redis.clients.jedis.Protocol.processBulkReply(Protocol.java:158) at redis.clients.jedis.Protocol.process(Protocol.java:132) at redis.clients.jedis.Protocol.processMultiBulkReply(Protocol.java:183) at redis.clients.jedis.Protocol.process(Protocol.java:134) at redis.clients.jedis.Protocol.read(Protocol.java:192) at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:282) at redis.clients.jedis.Connection.getRawObjectMultiBulkReply(Connection.java:227) at redis.clients.jedis.JedisPubSub.process(JedisPubSub.java:108) at redis.clients.jedis.JedisPubSub.proceedWithPatterns(JedisPubSub.java:95) at redis.clients.jedis.Jedis.psubscribe(Jedis.java:2513) at BenchRedisConsumer$BenchRunner.run(BenchRedisConsumer.java:208) at java.lang.Thread.run(Thread.java:745) Running redis version 2.8.19 on Linux 3.16.0-33-generic #44-Ubuntu SMP Thu Mar 12 12:19:35 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux Java version: java version "1.7.0_76" Java(TM) SE Runtime Environment (build 1.7.0_76-b13) Java HotSpot(TM) 64-Bit Server VM (build 24.76-b04, mixed mode)
Run the redis consumer followed by the producer of the project here:https://github.com/Climax777/message-queue-bench
client-output-buffer-limit
was the cause. redis-server
closed the connections, leading to the exceptions.
https://github.com/xetorthio/jedis/issues/932
http://stackoverflow.com/questions/2309561/how-to-fix-java-net-socketexception-broken-pipe
redis.clients.jedis.JedisPoolConfig
package redis.clients.jedis; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; public class JedisPoolConfig extends GenericObjectPoolConfig { public JedisPoolConfig() { // defaults to make your life with connection pool easier :) setTestWhileIdle(true); setMinEvictableIdleTimeMillis(60000); setTimeBetweenEvictionRunsMillis(30000); setNumTestsPerEvictionRun(-1); } }
org.apache.commons.pool2.impl.GenericKeyedObjectPool
从Pool(也就是 LinkedBlockingDeque<PooledObject<T>>)中获取一个PooledObject<T> 需要等待的时间。
来自
public GenericKeyedObjectPool(KeyedPooledObjectFactory<K,T> factory, GenericKeyedObjectPoolConfig config) { super(config, ONAME_BASE, config.getJmxNamePrefix()); if (factory == null) { jmxUnregister(); // tidy up throw new IllegalArgumentException("factory may not be null"); } this.factory = factory; this.fairness = config.getFairness(); setConfig(config); startEvictor(getTimeBetweenEvictionRunsMillis()); }
public void setConfig(GenericKeyedObjectPoolConfig conf) { setLifo(conf.getLifo()); setMaxIdlePerKey(conf.getMaxIdlePerKey()); setMaxTotalPerKey(conf.getMaxTotalPerKey()); setMaxTotal(conf.getMaxTotal()); setMinIdlePerKey(conf.getMinIdlePerKey()); setMaxWaitMillis(conf.getMaxWaitMillis()); setBlockWhenExhausted(conf.getBlockWhenExhausted()); setTestOnCreate(conf.getTestOnCreate()); setTestOnBorrow(conf.getTestOnBorrow()); setTestOnReturn(conf.getTestOnReturn()); setTestWhileIdle(conf.getTestWhileIdle()); setNumTestsPerEvictionRun(conf.getNumTestsPerEvictionRun()); setMinEvictableIdleTimeMillis(conf.getMinEvictableIdleTimeMillis()); setSoftMinEvictableIdleTimeMillis( conf.getSoftMinEvictableIdleTimeMillis()); setTimeBetweenEvictionRunsMillis( conf.getTimeBetweenEvictionRunsMillis()); setEvictionPolicyClassName(conf.getEvictionPolicyClassName()); }
org.apache.commons.pool2.impl.GenericObjectPool
/** * Borrow an object from the pool using the specific waiting time which only * applies if {@link #getBlockWhenExhausted()} is true. * <p> * If there is one or more idle instance available in the pool, then an * idle instance will be selected based on the value of {@link #getLifo()}, * activated and returned. If activation fails, or {@link #getTestOnBorrow() * testOnBorrow} is set to <code>true</code> and validation fails, the * instance is destroyed and the next available instance is examined. This * continues until either a valid instance is returned or there are no more * idle instances available. * <p> * If there are no idle instances available in the pool, behavior depends on * the {@link #getMaxTotal() maxTotal}, (if applicable) * {@link #getBlockWhenExhausted()} and the value passed in to the * <code>borrowMaxWaitMillis</code> parameter. If the number of instances * checked out from the pool is less than <code>maxTotal,</code> a new * instance is created, activated and (if applicable) validated and returned * to the caller. If validation fails, a <code>NoSuchElementException</code> * is thrown. * <p> * If the pool is exhausted (no available idle instances and no capacity to * create new ones), this method will either block (if * {@link #getBlockWhenExhausted()} is true) or throw a * <code>NoSuchElementException</code> (if * {@link #getBlockWhenExhausted()} is false). The length of time that this * method will block when {@link #getBlockWhenExhausted()} is true is * determined by the value passed in to the <code>borrowMaxWaitMillis</code> * parameter. * <p> * When the pool is exhausted, multiple calling threads may be * simultaneously blocked waiting for instances to become available. A * "fairness" algorithm has been implemented to ensure that threads receive * available instances in request arrival order. * * @param borrowMaxWaitMillis The time to wait in milliseconds for an object * to become available * * @return object instance from the pool * * @throws NoSuchElementException if an instance cannot be returned * * @throws Exception if an object instance cannot be returned due to an * error */ public T borrowObject(long borrowMaxWaitMillis) throws Exception { assertOpen(); AbandonedConfig ac = this.abandonedConfig; if (ac != null && ac.getRemoveAbandonedOnBorrow() && (getNumIdle() < 2) && (getNumActive() > getMaxTotal() - 3) ) { removeAbandoned(ac); } PooledObject<T> p = null; // Get local copy of current config so it is consistent for entire // method execution boolean blockWhenExhausted = getBlockWhenExhausted(); boolean create; long waitTime = System.currentTimeMillis(); while (p == null) { create = false; if (blockWhenExhausted) { p = idleObjects.pollFirst(); if (p == null) { p = create(); if (p != null) { create = true; } } if (p == null) { if (borrowMaxWaitMillis < 0) { p = idleObjects.takeFirst(); } else { p = idleObjects.pollFirst(borrowMaxWaitMillis, TimeUnit.MILLISECONDS); } } if (p == null) { throw new NoSuchElementException( "Timeout waiting for idle object"); } if (!p.allocate()) { p = null; } } else { p = idleObjects.pollFirst(); if (p == null) { p = create(); if (p != null) { create = true; } } if (p == null) { throw new NoSuchElementException("Pool exhausted"); } if (!p.allocate()) { p = null; } } if (p != null) { try { factory.activateObject(p); } catch (Exception e) { try { destroy(p); } catch (Exception e1) { // Ignore - activation failure is more important } p = null; if (create) { NoSuchElementException nsee = new NoSuchElementException( "Unable to activate object"); nsee.initCause(e); throw nsee; } } if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) { boolean validate = false; Throwable validationThrowable = null; try { validate = factory.validateObject(p); } catch (Throwable t) { PoolUtils.checkRethrow(t); validationThrowable = t; } if (!validate) { try { destroy(p); destroyedByBorrowValidationCount.incrementAndGet(); } catch (Exception e) { // Ignore - validation failure is more important } p = null; if (create) { NoSuchElementException nsee = new NoSuchElementException( "Unable to validate object"); nsee.initCause(validationThrowable); throw nsee; } } } } } updateStatsBorrow(p, System.currentTimeMillis() - waitTime); return p.getObject(); }
socketTimeOut
redis.clients.jedis.JedisPool
public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, int timeout, final String password) { this(poolConfig, host, port, timeout, password, Protocol.DEFAULT_DATABASE, null); }
public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port, int timeout, final String password, final int database, final String clientName) { super(poolConfig, new JedisFactory(host, port, timeout, password, database, clientName)); }
redis.clients.jedis.JedisFactory
@Override public PooledObject<Jedis> makeObject() throws Exception { final HostAndPort hostAndPort = this.hostAndPort.get(); final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), this.timeout); jedis.connect(); if (null != this.password) { jedis.auth(this.password); } if (database != 0) { jedis.select(database); } if (clientName != null) { jedis.clientSetname(clientName); } return new DefaultPooledObject<Jedis>(jedis); }
redis.clients.jedis.Jedis
public Jedis(final String host, final int port, final int timeout) { super(host, port, timeout); }
redis.clients.jedis.BinaryJedis
public BinaryJedis(final String host, final int port, final int timeout) { client = new Client(host, port); client.setConnectionTimeout(timeout); client.setSoTimeout(timeout); }
redis.clients.jedis.Connection
public void connect() { if (!isConnected()) { try { socket = new Socket(); // ->@wjw_add socket.setReuseAddress(true); socket.setKeepAlive(true); // Will monitor the TCP connection is // valid socket.setTcpNoDelay(true); // Socket buffer Whetherclosed, to // ensure timely delivery of data socket.setSoLinger(true, 0); // Control calls close () method, // the underlying socket is closed // immediately // <-@wjw_add socket.connect(new InetSocketAddress(host, port), connectionTimeout); socket.setSoTimeout(soTimeout); if (ssl) { if (null == sslSocketFactory) { sslSocketFactory = (SSLSocketFactory)SSLSocketFactory.getDefault(); } socket = (SSLSocket) sslSocketFactory.createSocket(socket, host, port, true); if (null != sslParameters) { ((SSLSocket) socket).setSSLParameters(sslParameters); } if ((null != hostnameVerifier) && (!hostnameVerifier.verify(host, ((SSLSocket) socket).getSession()))) { String message = String.format( "The connection to '%s' failed ssl/tls hostname verification.", host); throw new JedisConnectionException(message); } } outputStream = new RedisOutputStream(socket.getOutputStream()); inputStream = new RedisInputStream(socket.getInputStream()); } catch (IOException ex) { broken = true; throw new JedisConnectionException(ex); } } }
java.net.Socket public void connect(@NotNull java.net.SocketAddress endpoint, int timeout) throws java.io.IOException Connects this socket to the server with a specified timeout value. A timeout of zero is interpreted as an infinite timeout. The connection will then block until established or an error occurs. Parameters: endpoint - the SocketAddress timeout - the timeout value to be used in milliseconds. Throws: java.io.IOException - if an error occurs during the connection java.net.SocketTimeoutException - if timeout expires before connecting java.nio.channels.IllegalBlockingModeException - if this socket has an associated channel, and the channel is in non-blocking mode IllegalArgumentException - if endpoint is null or is a SocketAddress subclass not supported by this socket Since: 1.4
java.net.Socket public void setSoTimeout(int timeout) throws java.net.SocketException Enable/disable SO_TIMEOUT with the specified timeout, in milliseconds. With this option set to a non-zero timeout, a read() call on the InputStream associated with this Socket will block for only this amount of time. If the timeout expires, a java.net.SocketTimeoutException is raised, though the Socket is still valid. The option must be enabled prior to entering the blocking operation to have effect. The timeout must be > 0. A timeout of zero is interpreted as an infinite timeout. Parameters: timeout - the specified timeout, in milliseconds. Throws: java.net.SocketException - if there is an error in the underlying protocol, such as a TCP error. Since: JDK 1.1
时间: 2025-01-19 12:05:17