知道了The ONE消息创建(详情见博文《消息创建过程》)和转发(详情见博文《消息转发过程》),消息接收就相对简单了。

1. 接收消息

ActiveRouter.update会检查消息是否传输完毕,相关源代码如下:

//ActiveRouter.java
public void update() {
    super.update();

    if (con.isMessageTransferred()) {  //消息是否传输完毕,即是否过了(1.0*m.getSize())/this.speed没
        if (con.getMessage() != null) {
            transferDone(con);
            con.finalizeTransfer();  //完成消息传输
        }
    }
}

2.完成传输

2.1 con.finalizeTransfer

//Connection.java
public void finalizeTransfer() {
    assert this.msgOnFly != null : "Nothing to finalize in " + this;
    assert msgFromNode != null : "msgFromNode is not set";

    this.bytesTransferred += msgOnFly.getSize();

    getOtherNode(msgFromNode).messageTransferred(this.msgOnFly.getId(), msgFromNode); //处理该传输完成的消息
    clearMsgOnFly();
}

2.2 messageTransferred

messageTransferred完成消息接收,将接收到消息根据情况放入相应的缓冲区,其调用关系是:DTNHost.messageTransferred --> ActiveRouter.messageTransferred --> MessageRouter.messageTransferred。相关源代码如下:

//DTNHost.java
public void messageTransferred(String id, DTNHost from) {
    this.router.messageTransferred(id, from);
}

//ActiveRouter.java
public Message messageTransferred(String id, DTNHost from) {
    Message m = super.messageTransferred(id, from);

    /*** 相对于MessageRouter.messageTransferred, 增加了对response消息的处理 ***/
    if (m.getTo() == getHost() && m.getResponseSize() > 0) {
        Message res = new Message(this.getHost(),m.getFrom(), RESPONSE_PREFIX+m.getId(), m.getResponseSize()); //generate a response message
        this.createNewMessage(res);
        this.getMessage(RESPONSE_PREFIX+m.getId()).setRequest(m);
    }

    return m;
}

//MessageRouter.java
public Message messageTransferred(String id, DTNHost from) {
    Message incoming = removeFromIncomingBuffer(id, from);  //将消息从incomingMessages删除

    boolean isFinalRecipient; //消息传递到目的节点
    isFinalRecipient = aMessage.getTo() == this.host;

    boolean isFirstDelivery; //消息传递到目的节点,且第一次
    isFirstDelivery = isFinalRecipient && !isDeliveredMessage(aMessage);

    incoming.setReceiveTime(SimClock.getTime()); //设置消息接收时间

    /*** 将消息交给应用层处理(如果有的话), 有些应用会丢弃该消息 ***/
    Message outgoing = incoming;
    for (Application app : getApplications(incoming.getAppID())) {
        outgoing = app.handle(outgoing, this.host);
        if (outgoing == null) break; // Some app wanted to drop the message
    }
    Message aMessage = (outgoing==null)?(incoming):(outgoing);

    /*** 将消息根据实际情况放入相应的缓冲区 ***/
    if (!isFinalRecipient && outgoing!=null) {
        addToMessages(aMessage, false);  //incomingMessages --> messages
    } else if (isFirstDelivery) {
        this.deliveredMessages.put(id, aMessage);  //incomingMessages --> deliveredMessages
    } else if (outgoing == null) {
        this.blacklistedMessages.put(id, null); // Blacklist messages that an app wants to drop.
    }

    return aMessage;
}

//MessageRouter.java   Adds a message to the message buffer and informs message listeners
protected void addToMessages(Message m, boolean newMessage) {
    this.messages.put(m.getId(), m);  //放到消息缓冲区messages中

    if (newMessage) {
    for (MessageListener ml : this.mListeners) {
            ml.newMessage(m);
        }
    }
}

至此,消息接收完毕。

注:对于消息外部事件MessageRelayEvent,当处理该事件processEvent时,会调用messageTransferred完成上述的过程。

本文系Spark & Shine原创,转载需注明出处本文最近一次修改时间 2022-03-27 16:07

results matching ""

    No results matching ""