博文《消息创建过程》讲述了MessageEventGenerator 如何创建消息。消息创建后,至于怎么发送(将哪些消息发送到哪些邻居),取决于路由策略。本文忽略路由策略,只介绍消息在The ONE是如何发送的。

1. 概述

1.1 消息流动

The ONE的消息缓冲区是messages,除此之外,还提供其他辅助缓冲区,如下,最后一个是供应用程序使用的,先不管它。

private HashMap<String, Message> incomingMessages;   //正在发送的消息
private HashMap<String, Message> messages;           //消息缓冲区,包括新创建的消息和接收到的消息
private HashMap<String, Message> deliveredMessages;  //已成功投递的消息

private HashMap<String, Object> blacklistedMessages; //应用层删除的消息  The messages that Applications on this router have blacklisted

The ONE消息的转换如下图所示。新创建的消息放在messages,正在传输的消息放在incomingMessages;传输成功的消息若为目的节点则放在deliverredMessages,否则放在messages

The ONE消息转换

1.2 消息发送时机

MessageEventGenerator创建的消息(确切的说,是由其产生消息创建事件,而后处理该事件,创建消息)放在节点的缓冲区(确切的说,是MessageRouter.javaprivate HashMap<String, Message> messages)。消息创建后,至于怎么发送(将哪些消息发送到哪些邻居),取决于路由策略。

自己写个路由MyRouter,继承ActiveRouterActiveRouter又是继承MessageRouter,其类图关系如下:

img

通常在自己的路由MyRouter重写update()方法,此时update调用层次是这样的:MyRouter.update –> ActiveRouter.update –> MessageRouter.update。在update调用自己实现的tryOtherMessages,在tryOtherMessages通过相应的路由策略将要发送的消息以及发送到哪个节点(即消息-连接对)收集起来,最后调用tryMessagesForConnected发送消息。主要代码如下:

private Tuple<Message, Connection> tryOtherMessages() {
    List<Tuple<Message, Connection>> messages = new ArrayList<Tuple<Message, Connection>>();
    messages.add(new Tuple<Message, Connection>(m,con)); //将要发送的<Message, Connection>收集起来

    return tryMessagesForConnected(messages);    // try to send messages
}

值得注意的是,tryMessagesForConnected只处理一个消息的传送,而不是处理Tuple中的所有消息,因为一有消息发送,信道就被占用了。

//ActiveRouter.java
protected Tuple<Message, Connection> tryMessagesForConnected(List<Tuple<Message, Connection>> tuples) {
    for (Tuple<Message, Connection> t : tuples) {
        Message m = t.getKey();
        Connection con = t.getValue();

        if (startTransfer(m, con) == RCV_OK) {  //REV_OK相当于链路空闲,可以使用
            return t;                            //只要有一个连接在传送就返回了
        }
    }
}

2. 开始传输

ActiveRouter.javastartTransfer开始传输(类似于网络层),调用Connection.javastartTransfer(类似于链路层)。

2.1 ActiveRouter.startTransfer

//ActiveRouter.java
protected int startTransfer(Message m, Connection con) {
    int retVal;

    if (!con.isReadyForTransfer()) {  //connection is up and there is no message being transferred
        return TRY_LATER_BUSY;
    }

    if (!policy.acceptSending(getHost(), con.getOtherNode(getHost()), con, m)) { //默认情况,所有消息都接受。 支持三种策略:simple policy, Hop Count, ModuleCommunicationBus(MCB)
        return MessageRouter.DENIED_POLICY;
    }

    retVal = con.startTransfer(getHost(), m);  //相当于用物理层链路开始传输
    if (retVal == RCV_OK) { // started transfer
        addToSendingConnections(con);   //add connection(s) that are currently used for sending
    } else if (deleteDelivered && retVal == DENIED_OLD && m.getTo() == con.getOtherNode(this.getHost())) {
        this.deleteMessage(m.getId(), false);  //final recipient has already received the msg -> delete it
    }

    return retVal;
}

2.2 Connection.startTransfer

ActiveRouter.startTransfer调用Connection.startTransferConnection.startTransfer()是抽象方法。Connection有两个子类:CBRConnection (A constant bit-rate connection)和VBRConnection (The transmission speed is updated every round)。以CBRConnection为例,其startTransfer源码如下:

public int startTransfer(DTNHost from, Message m) {
    //判断没有消息在该connection传输,no message being transferred
    assert this.msgOnFly == null : "Already transferring " + this.msgOnFly + " from " + this.msgFromNode + " to " + this.getOtherNode(this.msgFromNode) + ". Can't " +     "start transfer of " + m + " from " + from;

    this.msgFromNode = from;
    Message newMessage = m.replicate(); //复制消息
    int retVal = getOtherNode(from).receiveMessage(newMessage, from); //Start receiving a message from another host

    if (retVal == MessageRouter.RCV_OK) {
        this.msgOnFly = newMessage;
        this.transferDoneTime = SimClock.getTime() + (1.0*m.getSize()) / this.speed;
    }

    return retVal;
}

3. 开始接收消息

开始接收消息,把消息放到incomingMessages,而不是直接放到messages,因为传输需要时间(消息大小除以速度)。调用过程如下:DTNHost.receiveMessage –> ActiveRouter.receiveMessage –> MessageRouter.receiveMessage,其源代码如下:

//DTNHost.java Start receiving a message from another host
public int  (Message m, DTNHost from) {
    int retVal = this.router.receiveMessage(m, from);

    if (retVal == MessageRouter.RCV_OK) {
        m.addNodeOnPath(this);    // add this node on the messages path
    }

    return retVal;
}

//ActiveRouter.java重写了MessageRouter.java的receiveMessage,增加了判断
public int receiveMessage(Message m, DTNHost from) {
    int recvCheck = checkReceiving(m, from);  //router isn't transferring, doesn't have the message and has room for it
    if (recvCheck != RCV_OK) {
        return recvCheck;
    }

    return super.receiveMessage(m, from); //调用MessageRouter.java的receiveMessage
}

//MessageRouter.java  Try to start receiving a message from another host.
public int receiveMessage(Message m, DTNHost from) {
    Message newMessage = m.replicate();

    this.putToIncomingBuffer(newMessage, from);    //才开始接收,还没接收完,先放在incomingMessages
    newMessage.addNodeOnPath(this.host);

    for (MessageListener ml : this.mListeners) {
        ml.messageTransferStarted(newMessage, from, getHost());
    }

    return RCV_OK; // superclass always accepts messages
}
本文系Spark & Shine原创,转载需注明出处本文最近一次修改时间 2022-03-27 16:06

results matching ""

    No results matching ""