源码NativeDaemonConnector解析

源码NativeDaemonConnector解析
最新需要做流量管理,所以看了一下相应的接口:

原生中就是可以设置某一个应用是否允许后台访问数据流量的接口:在DataUsageSummary中有setAppRestrictBackground方法就是用来设置某个app是否允许后台访问数据流量接口

private void setAppRestrictBackground(boolean restrictBackground) {
    if (LOGD) Log.d(TAG, "setAppRestrictBackground()");
    final int uid = mCurrentApp.key;
    mPolicyManager.setUidPolicy(
            uid, restrictBackground ? POLICY_REJECT_METERED_BACKGROUND : POLICY_NONE);
    mAppRestrict.setChecked(restrictBackground);
} 

可以看到市调用了networkpolicymanager的setuidpolicy方法

public void setUidPolicy(int uid, int policy) {
    try {
        mService.setUidPolicy(uid, policy);
    } catch (RemoteException e) {
    }
}

在policymanager中可以看到是binder调用了networkpolicymanagerservice的方法

public void setUidPolicy(int uid, int policy) {
    mContext.enforceCallingOrSelfPermission(MANAGE_NETWORK_POLICY, TAG);

    if (!UserHandle.isApp(uid)) {
        throw new IllegalArgumentException("cannot apply policy to UID " + uid);
    }

    synchronized (mRulesLock) {
        final int oldPolicy = mUidPolicy.get(uid, POLICY_NONE);
        if (oldPolicy != policy) {
            setUidPolicyUncheckedLocked(uid, policy, true);//1⃣️
        }
    }
}

权限检查和应用检查就不说了,最主要的还是1⃣️所在的位置调用的方法

private void setUidPolicyUncheckedLocked(int uid, int policy, boolean persist) {
    mUidPolicy.put(uid, policy);

    // uid policy changed, recompute rules and persist policy.
    updateRulesForUidLocked(uid);//2⃣️
    if (persist) {
        writePolicyLocked();
    }
}

还是主要看2⃣️位置

void updateRulesForUidLocked(int uid) {
    if (!isUidValidForRules(uid)) return;

    .....

    if (uidRules == RULE_ALLOW_ALL) {
        mUidRules.delete(uid);
    } else {
        mUidRules.put(uid, uidRules);
    }

    final boolean rejectMetered = (uidRules & RULE_REJECT_METERED) != 0;
    setUidNetworkRules(uid, rejectMetered);//3⃣️

    // dispatch changed rule to existing listeners
    mHandler.obtainMessage(MSG_RULES_CHANGED, uid, uidRules).sendToTarget();

    ....
}

那么看3⃣️这个位置

private void setUidNetworkRules(int uid, boolean rejectOnQuotaInterfaces) {
    try {
        mNetworkManager.setUidNetworkRules(uid, rejectOnQuotaInterfaces);
    } catch (IllegalStateException e) {
        Log.wtf(TAG, "problem setting uid rules", e);
    } catch (RemoteException e) {
        // ignored; service lives in system_server
    }
}

原来还是到了networkmanagementservice中去进行相关的操作,至于connectivityservice,networkpolicyservice以及networkstatsservice这四个service之间的关系,是属于网络框架这一哭,就不细说了。
现在看到了networkmanagementservice中的setUidNetworkRules方法:

public void setUidNetworkRules(int uid, boolean rejectOnQuotaInterfaces) {
    mContext.enforceCallingOrSelfPermission(CONNECTIVITY_INTERNAL, TAG);

    // silently discard when control disabled
    // TODO: eventually migrate to be always enabled
    if (!mBandwidthControlEnabled) return;

    synchronized (mQuotaLock) {
        final boolean oldRejectOnQuota = mUidRejectOnQuota.get(uid, false);
        if (oldRejectOnQuota == rejectOnQuotaInterfaces) {
            // TODO: eventually consider throwing
            return;
        }

        try {
            mConnector.execute("bandwidth",
                    rejectOnQuotaInterfaces ? "addnaughtyapps" : "removenaughtyapps", uid);
            if (rejectOnQuotaInterfaces) {
                mUidRejectOnQuota.put(uid, true);
            } else {
                mUidRejectOnQuota.delete(uid);
            }
        } catch (NativeDaemonConnectorException e) {
            throw e.rethrowAsParcelableException();
        }
    }
}

那么关键的地方就来了,调用了connector的execute方法,然后传进去参数。那这个connector又是什么呢,这个就是要说的NativeDaemonConnector。那么就说一下这个daemon是如何进行与localsocket进行通讯的吧。

在networkmanagementservice刚刚初始化的时候,就会初始化connector

private NetworkManagementService(Context context, String socket) {

    .....

    mConnector = new NativeDaemonConnector(
            new NetdCallbackReceiver(), socket, 10, NETD_TAG, 160, wl,
            FgThread.get().getLooper());
    mThread = new Thread(mConnector, NETD_TAG);

    mDaemonHandler = new Handler(FgThread.get().getLooper());

    ......

}

同时初始化thread,connector是实现了runnable接口的。在service.create的时候

    static NetworkManagementService create(Context context,
        String socket) throws InterruptedException {
    final NetworkManagementService service = new NetworkManagementService(context, socket);
    final CountDownLatch connectedSignal = service.mConnectedSignal;
    if (DBG) Slog.d(TAG, "Creating NetworkManagementService");
    service.mThread.start();//1⃣️
    if (DBG) Slog.d(TAG, "Awaiting socket connection");
    connectedSignal.await();
    if (DBG) Slog.d(TAG, "Connected");
    return service;
}

就会调用1⃣️位置的start方法,那么其中的runnable接口的run方法就会被调用:这里看到NativeDaemonConnector得run方法

@Override
public void run() {
    mCallbackHandler = new Handler(mLooper, this);

    while (true) {//1⃣️
        try {
            listenToSocket();
        } catch (Exception e) {
            loge("Error in NativeDaemonConnector: " + e);
            SystemClock.sleep(5000);
        }
    }
}

这里需要注意1⃣️位置是一个无限循环,至于它有什么作用,放到后面说,这里就会启用一个listentosocket()方法:

private void listenToSocket() throws IOException {
    LocalSocket socket = null;//1⃣️

    try {
        socket = new LocalSocket();
        LocalSocketAddress address = determineSocketAddress();//2⃣️

        socket.connect(address);

        InputStream inputStream = socket.getInputStream();
        synchronized (mDaemonLock) {
            mOutputStream = socket.getOutputStream();//3⃣️
        }

        mCallbacks.onDaemonConnected();

        byte[] buffer = new byte[BUFFER_SIZE];
        int start = 0;

        while (true) {//4⃣️
            int count = inputStream.read(buffer, start, BUFFER_SIZE - start);
            if (count < 0) {
                loge("got " + count + " reading with start = " + start);
                break;
            }

            // Add our starting point to the count and reset the start.
            count += start;
            start = 0;

            for (int i = 0; i < count; i++) {
                if (buffer[i] == 0) {
                    // Note - do not log this raw message since it may contain
                    // sensitive data
                    final String rawEvent = new String(
                            buffer, start, i - start, StandardCharsets.UTF_8);

                    boolean releaseWl = false;
                    try {
                        final NativeDaemonEvent event = NativeDaemonEvent.parseRawEvent(
                                rawEvent);

                        log("RCV <- {" + event + "}");

                        if (event.isClassUnsolicited()) {// ⑤
                            // TODO: migrate to sending NativeDaemonEvent instances
                            if (mCallbacks.onCheckHoldWakeLock(event.getCode())
                                    && mWakeLock != null) {
                                mWakeLock.acquire();
                                releaseWl = true;
                            }
                            if (mCallbackHandler.sendMessage(mCallbackHandler.obtainMessage(
                                    event.getCode(), event.getRawEvent()))) {
                                releaseWl = false;
                            }
                        } else {
                            mResponseQueue.add(event.getCmdNumber(), event);//6⃣️
                        }
                    } catch (IllegalArgumentException e) {
                        log("Problem parsing message " + e);
                    } finally {
                        if (releaseWl) {
                            mWakeLock.acquire();
                        }
                    }

                    start = i + 1;
                }
            }

            if (start == 0) {
                log("RCV incomplete");
            }

            // We should end at the amount we read. If not, compact then
            // buffer and read again.
            if (start != count) {
                final int remaining = BUFFER_SIZE - start;
                System.arraycopy(buffer, start, buffer, 0, remaining);
                start = remaining;
            } else {
                start = 0;
            }
        }
    } catch (IOException ex) {
        loge("Communications error: " + ex);
        throw ex;
    } finally {
        synchronized (mDaemonLock) {
            if (mOutputStream != null) {
                try {
                    loge("closing stream for " + mSocket);
                    mOutputStream.close();
                } catch (IOException e) {
                    loge("Failed closing output stream: " + e);
                }
                mOutputStream = null;
            }
        }

        try {
            if (socket != null) {
                socket.close();
            }
        } catch (IOException ex) {
            loge("Failed closing socket: " + ex);
        }
    }
}

这个方法我全部copy过来了,因为这真是一个相当重要的方法:
1⃣️:在这里我们可以看到是一个localsocket通讯
2⃣️:我们就可以找到对应的socket通讯地址
3⃣️:socket连接,将socket的outputstream分配到全局,用来给其他方法使用
4⃣️:这个地方的while(true)与run方法中的while(true)是有联系的。等下说,这个地方打开了inputstream,调用了inputstream的read方法,阻塞在这里等待输出结果,如果有输出结果了,那么就会继续走下去,但是如果读到结果<0的情况,那么就会发生异常,那么就会break出去,这个时候这个方法就完成,但是run方法中又是一个无限循环又会重新开启一个新的localsocket,这样就是一个双重保险,保持一直有监听该localsocket线程存在。
⑤:这个地方,如果判断返回的code在600-700之间,就会返回给callback进行处理。
6⃣️如果不是在600-700之间就会把这个response加入到responsequeue中,那看一下这个responsequeue,它数据结构是这样的:

private static class ResponseQueue {

    private static class PendingCmd {
        public final int cmdNum;
        public final String logCmd;

        public BlockingQueue<NativeDaemonEvent> responses =
                new ArrayBlockingQueue<NativeDaemonEvent>(10);


        public PendingCmd(int cmdNum, String logCmd) {
            this.cmdNum = cmdNum;
            this.logCmd = logCmd;
        }
    }

    private final LinkedList<PendingCmd> mPendingCmds;
    private int mMaxCount;

    ResponseQueue(int maxCount) {
        mPendingCmds = new LinkedList<PendingCmd>();
        mMaxCount = maxCount;
    }
}

首先,这是一个链表结构,而且还设置链表中所能存储的最大个数,然后链表中的每一个单位都是一个pendingcmd,这个pendingcmd又会有一个arrayblockingqueue的数据结构。
再看看add方法:

public void add(int cmdNum, NativeDaemonEvent response) {
        PendingCmd found = null;
        synchronized (mPendingCmds) {
            for (PendingCmd pendingCmd : mPendingCmds) {
                if (pendingCmd.cmdNum == cmdNum) {
                    found = pendingCmd;
                    break;
                }
            }
            if (found == null) {
                // didn't find it - make sure our queue isn't too big before adding
                while (mPendingCmds.size() >= mMaxCount) {
                    Slog.e("NativeDaemonConnector.ResponseQueue",
                            "more buffered than allowed: " + mPendingCmds.size() +
                            " >= " + mMaxCount);
                    // let any waiter timeout waiting for this
                    PendingCmd pendingCmd = mPendingCmds.remove();
                    Slog.e("NativeDaemonConnector.ResponseQueue",
                            "Removing request: " + pendingCmd.logCmd + " (" +
                            pendingCmd.cmdNum + ")");
                }
                found = new PendingCmd(cmdNum, null);
                mPendingCmds.add(found);
            }
            found.availableResponseCount++;
            // if a matching remove call has already retrieved this we can remove this
            // instance from our list
            if (found.availableResponseCount == 0) mPendingCmds.remove(found);
        }
        try {
            found.responses.put(response);
        } catch (InterruptedException e) { }
    }

首先通过cmdnum去查找responsequeue中是否有了该pendingcmd,如果没找到就新建一个pendingcmd,同时,如果找到了就在这个对应的arrayblockingqueue中加入这个response。

那么整理一下:系统服务networkmanagementservice刚刚起来的时候,就会初始化一个线程,专门用来监听localsocket,同时将outputstream初始化成全局变量,并且将inputstream结果放在responsequeue中。

那么什么时候将命令传进去呢,就在我们之前看到的connector的execute方法了:

public NativeDaemonEvent[] execute(int timeout, String cmd, Object... args)
        throws NativeDaemonConnectorException {
    final long startTime = SystemClock.elapsedRealtime();

    final ArrayList<NativeDaemonEvent> events = Lists.newArrayList();

    final StringBuilder rawBuilder = new StringBuilder();
    final StringBuilder logBuilder = new StringBuilder();
    final int sequenceNumber = mSequenceNumber.incrementAndGet();//1⃣️

    makeCommand(rawBuilder, logBuilder, sequenceNumber, cmd, args);//2⃣️

    final String rawCmd = rawBuilder.toString();
    final String logCmd = logBuilder.toString();

    log("SND -> {" + logCmd + "}");

    synchronized (mDaemonLock) {
        if (mOutputStream == null) {
            throw new NativeDaemonConnectorException("missing output stream");
        } else {
            try {
                mOutputStream.write(rawCmd.getBytes(StandardCharsets.UTF_8));//3⃣️
            } catch (IOException e) {
                throw new NativeDaemonConnectorException("problem sending command", e);
            }
        }
    }

    NativeDaemonEvent event = null;
    do {
        event = mResponseQueue.remove(sequenceNumber, timeout, logCmd);//4⃣️
        if (event == null) {
            loge("timed-out waiting for response to " + logCmd);
            throw new NativeDaemonFailureException(logCmd, event);
        }
        if (VDBG) log("RMV <- {" + event + "}");
        events.add(event);
    } while (event.isClassContinue());

    final long endTime = SystemClock.elapsedRealtime();
    if (endTime - startTime > WARN_EXECUTE_DELAY_MS) {
        loge("NDC Command {" + logCmd + "} took too long (" + (endTime - startTime) + "ms)");
    }

    if (event.isClassClientError()) {
        throw new NativeDaemonArgumentException(logCmd, event);
    }
    if (event.isClassServerError()) {
        throw new NativeDaemonFailureException(logCmd, event);
    }

    return events.toArray(new NativeDaemonEvent[events.size()]);
}

首先看1⃣️:这是一个线程安全的同步自增整数,这里会生成一个int型数字。
2⃣️:在这里会进行命令的封装,这些命令的封装的话都是根据libsysutils中的frameworklistener接口来定义的。所以具体的封装就不看了。
3⃣️:很简单,就是将命令传送给localsocket了。
4⃣️:又是关键的第四部,这个地方是一个循环,关键就是在这个remove方法上,这个remove方法,将timeout也传了进去,那我们就看看这个responsequeue的remove方法:

public NativeDaemonEvent remove(int cmdNum, int timeoutMs, String logCmd) {
        PendingCmd found = null;
        synchronized (mPendingCmds) {
            for (PendingCmd pendingCmd : mPendingCmds) {
                if (pendingCmd.cmdNum == cmdNum) {
                    found = pendingCmd;
                    break;
                }
            }
            if (found == null) {
                found = new PendingCmd(cmdNum, logCmd);
                mPendingCmds.add(found);
            }
            found.availableResponseCount--;
            // if a matching add call has already retrieved this we can remove this
            // instance from our list
            if (found.availableResponseCount == 0) mPendingCmds.remove(found);
        }
        NativeDaemonEvent result = null;
        try {
            result = found.responses.poll(timeoutMs, TimeUnit.MILLISECONDS);//1⃣️
        } catch (InterruptedException e) {}
        if (result == null) {
            Slog.e("NativeDaemonConnector.ResponseQueue", "Timeout waiting for response");
        }
        return result;
    }

其他的也就不看了,就看1⃣️这个地方吧,是调用的blockingqueue的poll方法,这个方法在查看了api之后,发现这是一个阻塞的方法,于是就都可以说的过去了:

调用了与socket通讯的方法之后,会生成对应的命令,然后执行命令后就会阻塞在这里等待结果返回,然后在超时之后抛出异常,如果在没有超时的时候就可以拿到结果,返回给调用者。

最后理一遍:
在服务刚刚开启的时候,就会开启一个线程在后台与localsocket进行连接,然后等待inputstream,在调用对应的execute方法之后,调用者会阻塞在等待result,在命令执行完成后,会往inputstream里面传入流,线程拿到流后进行解析,同时将解析结果放到responsequeue中。这个时候,execute方法的超时时间过了,如果在responesequeue中还是没有结果就会抛出异常,如果有结果则会返回给调用者结果。

最后挖个坑等着以后填:为什么与netd通讯需要使用localsocket,而droidwallroot之后是如何与iptables进行通讯的。