消息监听器MessageListener是一个接口,与消息有关的Report实现了该接口。当消息被创建、开始传输、转发、传输中止、删除时,会调用已实现MessageListener相应的类的相应函数,更新统计信息(如MessageStatsReport)或者输出日志信息(如EventLogReport)。本文深入源码介绍The ONE消息监听器MessageListenner的技术细节。理解了MessageListener,其他Listener也就不难了。

1. MessageListener接口

消息监听器MessageListener,类似于钩子函数,hook到消息相关的事件中,换句话说,当消息被创建、开始传输、转发、传输中止、删除时,会调用相应的钩子函数更新reports统计信息。MessageListener源代码如下:

public interface MessageListener {
    public void newMessage(Message m);
    public void messageTransferStarted(Message m, DTNHost from, DTNHost to);
    public void messageTransferAborted(Message m, DTNHost from, DTNHost to);
    public void messageTransferred(Message m, DTNHost from, DTNHost to, boolean firstDelivery); 
    public void messageDeleted(Message m, DTNHost where, boolean dropped);   //包含drop和remove
}

MessageListener是一个接口,有很多类(主要是Report,用于输出仿真过程的一些信息)实现了该接口,这些类只实现了MessageListener接口的一个或多个函数,其他留空,如:CreatedMessageReport.java只实现了MessageListener接口的newMessage()。实现MessageListener接口的类,如下:

MessageListener接口的类

2. 创建MessageListener

The ONE维持一个全局的MessageListener链表,初始化时,将那些实现了MessageListener接口的report接加入到MessageListener链表。当消息被创建、开始传输、转发、传输中止、删除时,遍历MessageListener链表并调用相应函数,更新仿真结果。

private List<MessageListener> messageListeners; //SimScenario.java  全局MessageListener链表

2.1 设置文件

在设置文件my_settings.txt增加需要产生的Reports,举例如下:

Report.nrofReports = 10
Report.reportDir = nc/reports/

Report.report1 = MessageStatsReport
Report.report2 = MessageDelayReport
Report.report3 = MovementNs2Report
Report.report4 = TotalContactTimeReport
Report.report5 = MessageReport
Report.report6 = DistanceDelayReport
Report.report7 = EventLogReport
Report.report8 = MessageGraphvizReport
Report.report9 = AdjacencyGraphvizReport
Report.report10 = MessageLocationReport

2.2 创建MessageListener链表

The ONE初始化,在**main(DTNSim) —> DTNSimTextUI().start() –> DTNSimUI.initModel()**中,根据2.1的设置文件,将report添加到相应的监听器Listener。相关源代码如下:

//DTNSimUI.java
private void initModel() {
    for (int i=1, n = settings.getInt(NROF_REPORT_S); i<=n; i++) {          //NROF_REPORT_S = "Report.nrofReports";
        String reportClass = settings.getSetting(REPORT_S + i);             //REPORT_S = "Report.report", 形如Report.report1,2,3, ...
        addReport((Report)settings.createObject(REPORT_PAC + reportClass)); //REPORT_PAC = "report.", 形如MessageStatsReport
    }
}
protected void addReport(Report r) {
    if (r instanceof MessageListener) {
        scen.addMessageListener((MessageListener)r); //增加MessageListener
    }
    if (r instanceof ConnectionListener) {
        scen.addConnectionListener((ConnectionListener)r);
    }
    if (r instanceof MovementListener) {
        scen.addMovementListener((MovementListener)r);
    }
    if (r instanceof UpdateListener) {
        scen.addUpdateListener((UpdateListener)r);
    }
    if (r instanceof ApplicationListener) {
        scen.addApplicationListener((ApplicationListener)r);
    }

    this.reports.add(r);  //protected Vector<Report> reports;
}

//SimScenario.java
private List<MessageListener> messageListeners;  //Global message event listeners
public void addMessageListener(MessageListener ml) {
    this.messageListeners.add(ml);
}

这样,MessageListener全局链表就创建好了,随后的初始化,将messageListeners赋值给每个DTNHost以及对应的MessageRouter。相关源代码如下:

//SimScenario.java   创建所有DTNHost
private List<MessageListener> messageListeners; //Global message event listeners
protected SimScenario() {
    this.messageListeners = new ArrayList<MessageListener>();  //创建全局MessageListener
    createHosts();
}
protected void createHosts() {
    this.hosts = new ArrayList<DTNHost>();

    for (int i=1; i<=nrofGroups; i++) {
        for (int j=0; j<nrofHosts; j++) {
            DTNHost host = new DTNHost(this.messageListeners,
                                       this.movementListeners,    
                                       gid, 
                                       interfaces, 
                                       comBus,
                                       mmProto, 
                                       mRouterProto
                                       );
            hosts.add(host);
        }
    }
}

//DTNHost.java  Create a DTNHost,messageListeners赋值给DTNHost
private List<MessageListener> msgListeners;
public DTNHost(List<MessageListener> msgLs,
               List<MovementListener> movLs,
               String groupId, List<NetworkInterface> interf,
               ModuleCommunicationBus comBus,
               MovementModel mmProto, MessageRouter mRouterProto) {
    this.msgListeners = msgLs;
    setRouter(mRouterProto.replicate());  //新建MessageRouter,并初始化
}
private void setRouter(MessageRouter router) {
    router.init(this, msgListeners);
    this.router = router;
}

//MessageRouter.java, messageListeners赋值给MessageRouter
public void init(DTNHost host, List<MessageListener> mListeners) {
    this.incomingMessages = new HashMap<String, Message>();
    this.messages = new HashMap<String, Message>();
    this.deliveredMessages = new HashMap<String, Message>();
    this.blacklistedMessages = new HashMap<String, Object>();
    this.mListeners = mListeners;
    this.host = host;
}

/******************************************************************************/
//MessageRouter.java抽象函数replicate(), 由具体路由实现, 以ProphetRouter为例
public MessageRouter replicate() {
    ProphetRouter r = new ProphetRouter(this);
    return r;
}

//ProphetRouter.java构造函数ProphetRouter(ProphetRouter r), 会调用父类的构造函数ActiveRouter(ActiveRouter r), MessageRouter(MessageRouter r)
protected ProphetRouter(ProphetRouter r) {
    super(r); //--> ActiveRouter(ActiveRouter r) --> MessageRouter(MessageRouter r)
}

3. MessageListener实例

以创建消息为例。每隔updateIntervalWorld.update()依次取得外部事件,若是消息创建事件,则调用其事件处理函数processEvent创建消息,相关源代码如下:

//MessageCreateEvent.processEvent --> DTNHost.createNewMessage --> MessageRouter.createNewMessage --> MessageRouter.addToMessages
protected void addToMessages(Message m, boolean newMessage) {
    this.messages.put(m.getId(), m);

    if (newMessage) {
        for (MessageListener ml : this.mListeners) {   //触发消息监听器
            ml.newMessage(m);
        }
    }
}

由上可见,消息创建好,放入messages(理解成buffer),触发所有消息监听器。假设设置文件包含CreatedMessagesReport,此时会调用CreatedMessagesReport.java实现的MessageListener接口的newMessage函数(只实现了MessageListener接口的newMessage,其他接口函数留空),向文件写入日志信息。newMessage源代码如下:

public void newMessage(Message m) {
    if (isWarmup()) {
        return;
    }

    int ttl = m.getTtl();
    write(format(getSimTime()) + " " + m.getId() + " " +
            m.getSize() + " " + m.getFrom() + " " + m.getTo() + " " +
            (ttl != Integer.MAX_VALUE ? ttl : "n/a") +
            (m.isResponse() ? " Y " : " N "));
}

将日志信息输出到*_CreatedMessagesReport.txt文件,取部分结果如下:

32.0000  M1 8834 p0  p34 1440 N 
58.0000  M2 7273 p43 p4  1440 N 
84.0000  M3 8122 p5  p65 1440 N 
112.0000 M4 8257 p61 p67 1440 N 
144.0000 M5 9671 p36 p11 1440 N 
170.0000 M6 6032 p71 p35 1440 N 
203.0000 M7 7010 p3  p69 1440 N

4. 其他Listener

理解了MessageListener,其他Listener也就不难了。

//SimScenario.java
private List<ConnectionListener> connectionListeners;
private List<MessageListener> messageListeners;
private List<MovementListener> movementListeners;
private List<UpdateListener> updateListeners;
private List<ApplicationListener> appListeners;
本文系Spark & Shine原创,转载需注明出处本文最近一次修改时间 2022-03-27 10:36

results matching ""

    No results matching ""