Skip to content

Commit

Permalink
Handle OOM during GetKey during NBHM put
Browse files Browse the repository at this point in the history
NBHM putIfMatch resizing triggered an OOM.
This failed the NBHM call, then the TaskGetKey onAck in the RPC call.
This marked the TGK as completed, so no retries every happened - but the
fetched Key was not installed, the task was not marked "done", the
waiting thread never was woken up (because the Key was not installed),
but no retries happened because the task was completed.
Also, make NBHM use MemoryManager.malloc for large arrays, to handle
OOMs better.
  • Loading branch information
cliffclick committed Jan 15, 2015
1 parent 7a8946c commit 83b4617
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 24 deletions.
8 changes: 8 additions & 0 deletions src/main/java/water/FJPacket.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ class FJPacket extends H2OCountedCompleter {
RPC.remote_exec(_ab);
tryComplete();
}
/** Exceptional completion path; mostly does printing if the exception was
* not handled earlier in the stack. */
@Override public boolean onExceptionalCompletion(Throwable ex, jsr166y.CountedCompleter caller) {
System.err.println("onExCompletion for "+this);
ex.printStackTrace();
water.util.Log.err(ex);
return true;
}
// Run at max priority until we decrypt the packet enough to get priorities out
static private byte[] UDP_PRIORITIES =
new byte[]{-1,
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/water/MemoryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ public static Object malloc(int elems, long bytes, int type, Object orig, int fr
case 5: return new float [elems];
case 9: return new double [elems];
case 0: return new boolean[elems];
case 10: return new Object [elems];
case -1: return Arrays.copyOfRange((byte [])orig,from,elems);
case -4: return Arrays.copyOfRange((int [])orig,from,elems);
case -8: return Arrays.copyOfRange((long [])orig,from,elems);
Expand Down Expand Up @@ -278,6 +279,8 @@ public static Object malloc(int elems, long bytes, int type, Object orig, int fr
res[i] = malloc8d(n);
return res;
}
public static Object[] mallocObj(int size) { return (Object [])malloc(size,size*8,10,null,0,false); }

public static boolean[] mallocZ (int size) { return (boolean[])malloc(size,size*1, 0,null,0); }
public static byte [] arrayCopyOfRange(byte [] orig, int from, int sz) { return (byte []) malloc(sz,(sz-from)*1,-1,orig,from); }
public static int [] arrayCopyOfRange(int [] orig, int from, int sz) { return (int []) malloc(sz,(sz-from)*4,-4,orig,from); }
Expand Down
34 changes: 15 additions & 19 deletions src/main/java/water/RPC.java
Original file line number Diff line number Diff line change
Expand Up @@ -542,27 +542,23 @@ static void tcp_ack( final AutoBuffer ab ) throws IOException {
// Got a response UDP packet, or completed a large TCP answer-receive.
// Install it as The Answer packet and wake up anybody waiting on an answer.
protected int response( AutoBuffer ab ) {
try{
assert _tasknum==ab.getTask();
assert _tasknum==ab.getTask();
if( _done ) return ab.close(); // Ignore duplicate response packet
int flag = ab.getFlag(); // Must read flag also, to advance ab
if( flag == SERVER_TCP_SEND ) return ab.close(); // Ignore UDP packet for a TCP reply
assert flag == SERVER_UDP_SEND;
synchronized(this) { // Install the answer under lock
if( _done ) return ab.close(); // Ignore duplicate response packet
int flag = ab.getFlag(); // Must read flag also, to advance ab
if( flag == SERVER_TCP_SEND ) return ab.close(); // Ignore UDP packet for a TCP reply
assert flag == SERVER_UDP_SEND;
synchronized(this) { // Install the answer under lock
if( _done ) return ab.close(); // Ignore duplicate response packet
UDPTimeOutThread.PENDING.remove(this);
_dt.read(ab); // Read the answer (under lock?)
_size_rez = ab.size(); // Record received size
ab.close(); // Also finish the read (under lock?)
_dt.onAck(); // One time only execute (before sending ACKACK)
_done = true; // Only read one (of many) response packets
ab._h2o.taskRemove(_tasknum); // Flag as task-completed, even if the result is null
notifyAll(); // And notify in any case
}
doAllCompletions(); // Send all tasks needing completion to the work queues
}catch(Throwable t){
t.printStackTrace();
UDPTimeOutThread.PENDING.remove(this);
_dt.read(ab); // Read the answer (under lock?)
_size_rez = ab.size(); // Record received size
ab.close(); // Also finish the read (under lock?)
_dt.onAck(); // One time only execute (before sending ACKACK)
_done = true; // Only read one (of many) response packets
ab._h2o.taskRemove(_tasknum); // Flag as task-completed, even if the result is null
notifyAll(); // And notify in any case
}
doAllCompletions(); // Send all tasks needing completion to the work queues
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/water/TCPReceiverThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void run() {
}
} catch( java.nio.channels.AsynchronousCloseException ex ) {
break; // Socket closed for shutdown
} catch( Exception e ) {
} catch( Throwable e ) {
// On any error from anybody, close everything
System.err.println("IO error");
e.printStackTrace();
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/water/nbhm/NonBlockingHashMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -948,9 +948,9 @@ private final Object[] resize( NonBlockingHashMap topmap, Object[] kvs) {
return newkvs; // Use the new table already

// Double size for K,V pairs, add 1 for CHM
newkvs = new Object[((1<<log2)<<1)+2]; // This can get expensive for big arrays
newkvs = water.MemoryManager.mallocObj(((1<<log2)<<1)+2); // This can get expensive for big arrays
newkvs[0] = new CHM(_size); // CHM in slot 0
newkvs[1] = new int[1<<log2]; // hashes in slot 1
newkvs[1] = water.MemoryManager.malloc4(1<<log2); // hashes in slot 1

// Another check after the slow allocation
if( _newkvs != null ) // See if resize is already in progress
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/water/nbhm/NonBlockingHashMapLong.java
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ private final boolean CAS_val( int idx, Object old, Object val ) {
_nbhml = nbhml;
_size = size;
_slots= new ConcurrentAutoTable();
_keys = new long [1<<logsize];
_vals = new Object[1<<logsize];
_keys = water.MemoryManager.malloc8 (1<<logsize);
_vals = water.MemoryManager.mallocObj(1<<logsize);
}

// --- print innards
Expand Down

0 comments on commit 83b4617

Please sign in to comment.