block readlock unreleased

Description

*I asked a question at alluxio-user mailing list .https://groups.google.com/forum/#!topic/alluxio-users/w26wOLpjh5Y.
I encountered a problem。alluxio-client and alluxio-worker on the same computer. alluxio-client send a BlockOpenRequest to alluxio-worker .*

LocalBlockOpenRequest request = LocalBlockOpenRequest.newBuilder().setBlockId(this.mBlockId).setPromote(options.getOptions().getReadType().isPromote()).build();
try {
ProtoMessage message = NettyRPC.call(NettyRPCContext.defaults().setChannel(this.mChannel).setTimeout(READ_TIMEOUT_MS), new ProtoMessage(request));
Preconditions.checkState(message.isLocalBlockOpenResponse());
return message.asLocalBlockOpenResponse().getPath();
} catch (Exception var6) {
context.releaseNettyChannel(address, this.mChannel);
throw var6;
}
and then alluxio-worker handle the request.
private void handleBlockOpenRequest(final ChannelHandlerContext ctx,
final Protocol.LocalBlockOpenRequest request) {
mRpcExecutor.submit(() ->
RpcUtils.nettyRPCAndLog(LOG, new RpcUtils.NettyRpcCallable<Void>() {
@Override
public Void call() throws Exception {
if (mLockId == BlockLockManager.INVALID_LOCK_ID) {
mSessionId = IdUtils.createSessionId();
// TODO(calvin): Update the locking logic so this can be done better
if (request.getPromote()) {
try {
mWorker
.moveBlock(mSessionId, request.getBlockId(), mStorageTierAssoc.getAlias(0));
} catch (BlockDoesNotExistException e) {
LOG.debug("Block {} to promote does not exist in Alluxio: {}",
request.getBlockId(), e.getMessage());
} catch (Exception e) {
LOG.warn("Failed to promote block {}: {}", request.getBlockId(), e.getMessage());
}
}

//==========aluxio-client disconnnect at this place ====
mLockId = mWorker.lockBlock(mSessionId, request.getBlockId());
mWorker.accessBlock(mSessionId, request.getBlockId());
} else {
LOG.warn("Lock block {} without releasing previous block lock {}.",
request.getBlockId(), mLockId);
throw new InvalidWorkerStateException(
ExceptionMessage.LOCK_NOT_RELEASED.getMessage(mLockId));
}
Protocol.LocalBlockOpenResponse response = Protocol.LocalBlockOpenResponse.newBuilder()
.setPath(mWorker.readBlock(mSessionId, request.getBlockId(), mLockId)).build();
ctx.writeAndFlush(new RPCProtoMessage(new ProtoMessage(response)));
return null;
}

  • alluxio-client connection lost when alluxio-worker require readlock . alluxio-worker will deal with the exception. *
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) {
    if (mLockId != BlockLockManager.INVALID_LOCK_ID) {
    try {
    mWorker.unlockBlock(mLockId);
    } catch (BlockDoesNotExistException e) {
    LOG.warn("Failed to unlock lock {} with error {}.", mLockId, e.getMessage());
    }
    mWorker.cleanupSession(mSessionId);
    }
    ctx.fireChannelUnregistered();
    }

But before mLockId set and channelUnregistered() has already finished, the readlock will be hold all the time .

Environment

jdk1.8
alluxio1.8.0

Status

Assignee

Bin Fan

Reporter

王忠民

Labels

None

Components

Affects versions

1.8.0

Priority

Major