源码NativeDaemonConnector解析
最新需要做流量管理,所以看了一下相应的接口:
原生中就是可以设置某一个应用是否允许后台访问数据流量的接口:在DataUsageSummary中有setAppRestrictBackground方法就是用来设置某个app是否允许后台访问数据流量接口
private void setAppRestrictBackground(boolean restrictBackground) {
if (LOGD) Log.d(TAG, "setAppRestrictBackground()");
final int uid = mCurrentApp.key;
mPolicyManager.setUidPolicy(
uid, restrictBackground ? POLICY_REJECT_METERED_BACKGROUND : POLICY_NONE);
mAppRestrict.setChecked(restrictBackground);
}
可以看到市调用了networkpolicymanager的setuidpolicy方法
public void setUidPolicy(int uid, int policy) {
try {
mService.setUidPolicy(uid, policy);
} catch (RemoteException e) {
}
}
在policymanager中可以看到是binder调用了networkpolicymanagerservice的方法
public void setUidPolicy(int uid, int policy) {
mContext.enforceCallingOrSelfPermission(MANAGE_NETWORK_POLICY, TAG);
if (!UserHandle.isApp(uid)) {
throw new IllegalArgumentException("cannot apply policy to UID " + uid);
}
synchronized (mRulesLock) {
final int oldPolicy = mUidPolicy.get(uid, POLICY_NONE);
if (oldPolicy != policy) {
setUidPolicyUncheckedLocked(uid, policy, true);//1⃣️
}
}
}
权限检查和应用检查就不说了,最主要的还是1⃣️所在的位置调用的方法
private void setUidPolicyUncheckedLocked(int uid, int policy, boolean persist) {
mUidPolicy.put(uid, policy);
// uid policy changed, recompute rules and persist policy.
updateRulesForUidLocked(uid);//2⃣️
if (persist) {
writePolicyLocked();
}
}
还是主要看2⃣️位置
void updateRulesForUidLocked(int uid) {
if (!isUidValidForRules(uid)) return;
.....
if (uidRules == RULE_ALLOW_ALL) {
mUidRules.delete(uid);
} else {
mUidRules.put(uid, uidRules);
}
final boolean rejectMetered = (uidRules & RULE_REJECT_METERED) != 0;
setUidNetworkRules(uid, rejectMetered);//3⃣️
// dispatch changed rule to existing listeners
mHandler.obtainMessage(MSG_RULES_CHANGED, uid, uidRules).sendToTarget();
....
}
那么看3⃣️这个位置
private void setUidNetworkRules(int uid, boolean rejectOnQuotaInterfaces) {
try {
mNetworkManager.setUidNetworkRules(uid, rejectOnQuotaInterfaces);
} catch (IllegalStateException e) {
Log.wtf(TAG, "problem setting uid rules", e);
} catch (RemoteException e) {
// ignored; service lives in system_server
}
}
原来还是到了networkmanagementservice中去进行相关的操作,至于connectivityservice,networkpolicyservice以及networkstatsservice这四个service之间的关系,是属于网络框架这一哭,就不细说了。
现在看到了networkmanagementservice中的setUidNetworkRules方法:
public void setUidNetworkRules(int uid, boolean rejectOnQuotaInterfaces) {
mContext.enforceCallingOrSelfPermission(CONNECTIVITY_INTERNAL, TAG);
// silently discard when control disabled
// TODO: eventually migrate to be always enabled
if (!mBandwidthControlEnabled) return;
synchronized (mQuotaLock) {
final boolean oldRejectOnQuota = mUidRejectOnQuota.get(uid, false);
if (oldRejectOnQuota == rejectOnQuotaInterfaces) {
// TODO: eventually consider throwing
return;
}
try {
mConnector.execute("bandwidth",
rejectOnQuotaInterfaces ? "addnaughtyapps" : "removenaughtyapps", uid);
if (rejectOnQuotaInterfaces) {
mUidRejectOnQuota.put(uid, true);
} else {
mUidRejectOnQuota.delete(uid);
}
} catch (NativeDaemonConnectorException e) {
throw e.rethrowAsParcelableException();
}
}
}
那么关键的地方就来了,调用了connector的execute方法,然后传进去参数。那这个connector又是什么呢,这个就是要说的NativeDaemonConnector。那么就说一下这个daemon是如何进行与localsocket进行通讯的吧。
在networkmanagementservice刚刚初始化的时候,就会初始化connector
private NetworkManagementService(Context context, String socket) {
.....
mConnector = new NativeDaemonConnector(
new NetdCallbackReceiver(), socket, 10, NETD_TAG, 160, wl,
FgThread.get().getLooper());
mThread = new Thread(mConnector, NETD_TAG);
mDaemonHandler = new Handler(FgThread.get().getLooper());
......
}
同时初始化thread,connector是实现了runnable接口的。在service.create的时候
static NetworkManagementService create(Context context,
String socket) throws InterruptedException {
final NetworkManagementService service = new NetworkManagementService(context, socket);
final CountDownLatch connectedSignal = service.mConnectedSignal;
if (DBG) Slog.d(TAG, "Creating NetworkManagementService");
service.mThread.start();//1⃣️
if (DBG) Slog.d(TAG, "Awaiting socket connection");
connectedSignal.await();
if (DBG) Slog.d(TAG, "Connected");
return service;
}
就会调用1⃣️位置的start方法,那么其中的runnable接口的run方法就会被调用:这里看到NativeDaemonConnector得run方法
@Override
public void run() {
mCallbackHandler = new Handler(mLooper, this);
while (true) {//1⃣️
try {
listenToSocket();
} catch (Exception e) {
loge("Error in NativeDaemonConnector: " + e);
SystemClock.sleep(5000);
}
}
}
这里需要注意1⃣️位置是一个无限循环,至于它有什么作用,放到后面说,这里就会启用一个listentosocket()方法:
private void listenToSocket() throws IOException {
LocalSocket socket = null;//1⃣️
try {
socket = new LocalSocket();
LocalSocketAddress address = determineSocketAddress();//2⃣️
socket.connect(address);
InputStream inputStream = socket.getInputStream();
synchronized (mDaemonLock) {
mOutputStream = socket.getOutputStream();//3⃣️
}
mCallbacks.onDaemonConnected();
byte[] buffer = new byte[BUFFER_SIZE];
int start = 0;
while (true) {//4⃣️
int count = inputStream.read(buffer, start, BUFFER_SIZE - start);
if (count < 0) {
loge("got " + count + " reading with start = " + start);
break;
}
// Add our starting point to the count and reset the start.
count += start;
start = 0;
for (int i = 0; i < count; i++) {
if (buffer[i] == 0) {
// Note - do not log this raw message since it may contain
// sensitive data
final String rawEvent = new String(
buffer, start, i - start, StandardCharsets.UTF_8);
boolean releaseWl = false;
try {
final NativeDaemonEvent event = NativeDaemonEvent.parseRawEvent(
rawEvent);
log("RCV <- {" + event + "}");
if (event.isClassUnsolicited()) {// ⑤
// TODO: migrate to sending NativeDaemonEvent instances
if (mCallbacks.onCheckHoldWakeLock(event.getCode())
&& mWakeLock != null) {
mWakeLock.acquire();
releaseWl = true;
}
if (mCallbackHandler.sendMessage(mCallbackHandler.obtainMessage(
event.getCode(), event.getRawEvent()))) {
releaseWl = false;
}
} else {
mResponseQueue.add(event.getCmdNumber(), event);//6⃣️
}
} catch (IllegalArgumentException e) {
log("Problem parsing message " + e);
} finally {
if (releaseWl) {
mWakeLock.acquire();
}
}
start = i + 1;
}
}
if (start == 0) {
log("RCV incomplete");
}
// We should end at the amount we read. If not, compact then
// buffer and read again.
if (start != count) {
final int remaining = BUFFER_SIZE - start;
System.arraycopy(buffer, start, buffer, 0, remaining);
start = remaining;
} else {
start = 0;
}
}
} catch (IOException ex) {
loge("Communications error: " + ex);
throw ex;
} finally {
synchronized (mDaemonLock) {
if (mOutputStream != null) {
try {
loge("closing stream for " + mSocket);
mOutputStream.close();
} catch (IOException e) {
loge("Failed closing output stream: " + e);
}
mOutputStream = null;
}
}
try {
if (socket != null) {
socket.close();
}
} catch (IOException ex) {
loge("Failed closing socket: " + ex);
}
}
}
这个方法我全部copy过来了,因为这真是一个相当重要的方法:
1⃣️:在这里我们可以看到是一个localsocket通讯
2⃣️:我们就可以找到对应的socket通讯地址
3⃣️:socket连接,将socket的outputstream分配到全局,用来给其他方法使用
4⃣️:这个地方的while(true)与run方法中的while(true)是有联系的。等下说,这个地方打开了inputstream,调用了inputstream的read方法,阻塞在这里等待输出结果,如果有输出结果了,那么就会继续走下去,但是如果读到结果<0的情况,那么就会发生异常,那么就会break出去,这个时候这个方法就完成,但是run方法中又是一个无限循环又会重新开启一个新的localsocket,这样就是一个双重保险,保持一直有监听该localsocket线程存在。
⑤:这个地方,如果判断返回的code在600-700之间,就会返回给callback进行处理。
6⃣️如果不是在600-700之间就会把这个response加入到responsequeue中,那看一下这个responsequeue,它数据结构是这样的:
private static class ResponseQueue {
private static class PendingCmd {
public final int cmdNum;
public final String logCmd;
public BlockingQueue<NativeDaemonEvent> responses =
new ArrayBlockingQueue<NativeDaemonEvent>(10);
public PendingCmd(int cmdNum, String logCmd) {
this.cmdNum = cmdNum;
this.logCmd = logCmd;
}
}
private final LinkedList<PendingCmd> mPendingCmds;
private int mMaxCount;
ResponseQueue(int maxCount) {
mPendingCmds = new LinkedList<PendingCmd>();
mMaxCount = maxCount;
}
}
首先,这是一个链表结构,而且还设置链表中所能存储的最大个数,然后链表中的每一个单位都是一个pendingcmd,这个pendingcmd又会有一个arrayblockingqueue的数据结构。
再看看add方法:
public void add(int cmdNum, NativeDaemonEvent response) {
PendingCmd found = null;
synchronized (mPendingCmds) {
for (PendingCmd pendingCmd : mPendingCmds) {
if (pendingCmd.cmdNum == cmdNum) {
found = pendingCmd;
break;
}
}
if (found == null) {
// didn't find it - make sure our queue isn't too big before adding
while (mPendingCmds.size() >= mMaxCount) {
Slog.e("NativeDaemonConnector.ResponseQueue",
"more buffered than allowed: " + mPendingCmds.size() +
" >= " + mMaxCount);
// let any waiter timeout waiting for this
PendingCmd pendingCmd = mPendingCmds.remove();
Slog.e("NativeDaemonConnector.ResponseQueue",
"Removing request: " + pendingCmd.logCmd + " (" +
pendingCmd.cmdNum + ")");
}
found = new PendingCmd(cmdNum, null);
mPendingCmds.add(found);
}
found.availableResponseCount++;
// if a matching remove call has already retrieved this we can remove this
// instance from our list
if (found.availableResponseCount == 0) mPendingCmds.remove(found);
}
try {
found.responses.put(response);
} catch (InterruptedException e) { }
}
首先通过cmdnum去查找responsequeue中是否有了该pendingcmd,如果没找到就新建一个pendingcmd,同时,如果找到了就在这个对应的arrayblockingqueue中加入这个response。
那么整理一下:系统服务networkmanagementservice刚刚起来的时候,就会初始化一个线程,专门用来监听localsocket,同时将outputstream初始化成全局变量,并且将inputstream结果放在responsequeue中。
那么什么时候将命令传进去呢,就在我们之前看到的connector的execute方法了:
public NativeDaemonEvent[] execute(int timeout, String cmd, Object... args)
throws NativeDaemonConnectorException {
final long startTime = SystemClock.elapsedRealtime();
final ArrayList<NativeDaemonEvent> events = Lists.newArrayList();
final StringBuilder rawBuilder = new StringBuilder();
final StringBuilder logBuilder = new StringBuilder();
final int sequenceNumber = mSequenceNumber.incrementAndGet();//1⃣️
makeCommand(rawBuilder, logBuilder, sequenceNumber, cmd, args);//2⃣️
final String rawCmd = rawBuilder.toString();
final String logCmd = logBuilder.toString();
log("SND -> {" + logCmd + "}");
synchronized (mDaemonLock) {
if (mOutputStream == null) {
throw new NativeDaemonConnectorException("missing output stream");
} else {
try {
mOutputStream.write(rawCmd.getBytes(StandardCharsets.UTF_8));//3⃣️
} catch (IOException e) {
throw new NativeDaemonConnectorException("problem sending command", e);
}
}
}
NativeDaemonEvent event = null;
do {
event = mResponseQueue.remove(sequenceNumber, timeout, logCmd);//4⃣️
if (event == null) {
loge("timed-out waiting for response to " + logCmd);
throw new NativeDaemonFailureException(logCmd, event);
}
if (VDBG) log("RMV <- {" + event + "}");
events.add(event);
} while (event.isClassContinue());
final long endTime = SystemClock.elapsedRealtime();
if (endTime - startTime > WARN_EXECUTE_DELAY_MS) {
loge("NDC Command {" + logCmd + "} took too long (" + (endTime - startTime) + "ms)");
}
if (event.isClassClientError()) {
throw new NativeDaemonArgumentException(logCmd, event);
}
if (event.isClassServerError()) {
throw new NativeDaemonFailureException(logCmd, event);
}
return events.toArray(new NativeDaemonEvent[events.size()]);
}
首先看1⃣️:这是一个线程安全的同步自增整数,这里会生成一个int型数字。
2⃣️:在这里会进行命令的封装,这些命令的封装的话都是根据libsysutils中的frameworklistener接口来定义的。所以具体的封装就不看了。
3⃣️:很简单,就是将命令传送给localsocket了。
4⃣️:又是关键的第四部,这个地方是一个循环,关键就是在这个remove方法上,这个remove方法,将timeout也传了进去,那我们就看看这个responsequeue的remove方法:
public NativeDaemonEvent remove(int cmdNum, int timeoutMs, String logCmd) {
PendingCmd found = null;
synchronized (mPendingCmds) {
for (PendingCmd pendingCmd : mPendingCmds) {
if (pendingCmd.cmdNum == cmdNum) {
found = pendingCmd;
break;
}
}
if (found == null) {
found = new PendingCmd(cmdNum, logCmd);
mPendingCmds.add(found);
}
found.availableResponseCount--;
// if a matching add call has already retrieved this we can remove this
// instance from our list
if (found.availableResponseCount == 0) mPendingCmds.remove(found);
}
NativeDaemonEvent result = null;
try {
result = found.responses.poll(timeoutMs, TimeUnit.MILLISECONDS);//1⃣️
} catch (InterruptedException e) {}
if (result == null) {
Slog.e("NativeDaemonConnector.ResponseQueue", "Timeout waiting for response");
}
return result;
}
其他的也就不看了,就看1⃣️这个地方吧,是调用的blockingqueue的poll方法,这个方法在查看了api之后,发现这是一个阻塞的方法,于是就都可以说的过去了:
调用了与socket通讯的方法之后,会生成对应的命令,然后执行命令后就会阻塞在这里等待结果返回,然后在超时之后抛出异常,如果在没有超时的时候就可以拿到结果,返回给调用者。
最后理一遍:
在服务刚刚开启的时候,就会开启一个线程在后台与localsocket进行连接,然后等待inputstream,在调用对应的execute方法之后,调用者会阻塞在等待result,在命令执行完成后,会往inputstream里面传入流,线程拿到流后进行解析,同时将解析结果放到responsequeue中。这个时候,execute方法的超时时间过了,如果在responesequeue中还是没有结果就会抛出异常,如果有结果则会返回给调用者结果。
最后挖个坑等着以后填:为什么与netd通讯需要使用localsocket,而droidwallroot之后是如何与iptables进行通讯的。