学习笔记8(nodejs之RPC、buffer、net)

    科技2024-04-19  89

    nodejs

    1. RPC2. Buffer模块3. net模块

    学习链接:25 | RPC 调用:Node.js net建立多路复用的RPC通道代码:https://github.com/baixc1/csdn/tree/master/note/note8

    1. RPC

    Remote Procedure Call (远程过程调用)和Ajax类似 网络通信数据 和Ajax区别 使用特定服务寻址(不一定使用DNS作为寻址服务)使用二进制协议(应用层协议一般不使用HTTP)基于TCP或UDP协议 TCP通信方式 单工通信半双工通信双工通信 HTTP协议 1.1之前使用短连接1.1使用长连接,半双工通信,多路复用2.0使用全双工

    2. Buffer模块

    编解码二进制数据包API Buffer.fromBuffer.alloc 大小端问题 几个Byte里,高位和低位的编排顺序不同int8:8位二进制数 const buffer1 = Buffer.from('geek'); const buffer2 = Buffer.from([1,2,3]); const buffer3 = Buffer.alloc(10); console.log(buffer1); console.log(buffer2); console.log(buffer3); console.log(`${buffer1}`) console.log(buffer2.readInt8(0)); buffer2.writeInt8(127, 1); console.log(buffer2); // <Buffer 01 7f 03> 16*7+15 buffer2.writeInt16BE(128, 1); console.log(buffer2); // <Buffer 01 00 80> 16*8 buffer2.writeInt16LE(128, 1); console.log(buffer2); const buf = Buffer.from([0, 5]); console.log(buf) console.log(buf.readInt16BE(0)); console.log(buf.readInt16LE(0)); // 0x0500 Protocol Buffer 谷歌研发的二进制协议编解码库 // buffer.js const fs = require('fs'); const protobuf = require('protocol-buffers') const schemas = protobuf(fs.readFileSync(`${__dirname}/test.proto`)); const buffer = schemas.Course.encode({ id: 1, name: 'hh', price: 30.1 }) console.log(buffer); console.log(schemas.Course.decode(buffer)); //test.proto message Course { required float id = 1; required string name = 2; required float price = 3; }

    3. net模块

    基本使用 // client.js const net = require('net') const socket = new net.Socket({}) socket.connect({ host: '127.0.0.1', port: 9999 }) socket.write('hello world') // server.js const net = require('net') net.createServer(socket => { socket.on('data', buffer=>{ console.log(buffer, buffer.toString()) // <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64> hello world }) }).listen(9999) 半双工通信 // server.js const net = require('net') net.createServer(socket => { socket.on('data', async buffer=>{ const id = buffer.readInt16BE() await sleep(1000) console.log(id) await sleep(1000) socket.write(Buffer.from(data[id])) }) }).listen(9999) const data = { 10000: 'a', 10001: 'b', 10002: 'c', 10003: 'd', 10004: 'e', } const sleep = (ts=50)=>{ return new Promise(resolve=>{ setTimeout(resolve,ts) }) } // client.js const net = require('net') const socket = new net.Socket({}) socket.connect({ host: '127.0.0.1', port: 9999 }) const ids = [ 10000, 10001, 10002, 10003, 10004, ] sendData() socket.on('data', buffer=>{ sendData() }) function sendData() { const id = ids[Math.floor(Math.random()*ids.length)] console.log(id) const buffer = Buffer.alloc(2) // <Buffer 00 00> 每位为16进制数,16^4 buffer.writeInt16BE(id) // 2^16 socket.write(buffer) } 全双工通信 解决数据包对应关系 seqbody长body数据 同时上传buffer,会合并buffer 上传) (接收) 客户端 // client.js const net = require('net') const socket = new net.Socket({}) const { sleep, checkComplete, decode} = require('./common') socket.connect({ host: '127.0.0.1', port: 9999 }) let oldBuffer = null; socket.on('data', (buffer) => { // 把上一次data事件使用残余的buffer接上来 if (oldBuffer) { buffer = Buffer.concat([oldBuffer, buffer]); } let completeLength = 0; // 只要还存在可以解成完整包的包长 while (completeLength = checkComplete(buffer)) { const package = buffer.slice(0, completeLength); buffer = buffer.slice(completeLength); // 把这个包解成数据和seq const result = decode(package, true); console.log(`包${result.seq},返回值是${result.data}`); } // 把残余的buffer记下来 oldBuffer = buffer; }) const LESSON_IDS = [ "136797", "136798", "136799", "136800", "136801", "136803", "136804", "136806", "136807", "136808", "136809", "141994", "143517", "143557", "143564", "143644", "146470", "146569", "146582" ] let seq = 0; for (let k = 0; k < 30; k++) { (async ()=>{ // 模拟数据同时上传和间隔100ms上传 if(k%10===0){ await sleep(100) } id = Math.floor(Math.random() * LESSON_IDS.length); const data = encode({ id }) // console.log(data) socket.write(data); })() } /** * 二进制包编码函数 */ function encode (data) { // 正常情况下,这里应该是使用 protobuf 来encode一段代表业务数据的数据包 // 为了不要混淆重点,这个例子比较简单,就直接把课程id转buffer发送 const body = Buffer.alloc(4); body.writeInt32BE(LESSON_IDS[data.id]); // console.log(body) // 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分 // 包头的作用就是用来记载包的序号和包的长度,以实现全双工通信 const header = Buffer.alloc(6); header.writeInt16BE(seq) header.writeInt32BE(body.length, 2); // console.log(header) // 包头和包体拼起来发送 const buffer = Buffer.concat([header, body]) console.log(`包${seq}传输的课程id为${LESSON_IDS[data.id]}`); seq++; return buffer; } 服务端 (接收数据) // server.js const net = require('net') const { checkComplete, decode } = require('./common') net.createServer(socket => { let oldBuffer = null; socket.on('data', function (buffer) { // 把上一次data事件使用残余的buffer接上来 console.log(buffer) if (oldBuffer) { buffer = Buffer.concat([oldBuffer, buffer]); } let packageLength; // 只要还存在可以解成完整包的包长 while (packageLength = checkComplete(buffer)) { const package = buffer.slice(0, packageLength); buffer = buffer.slice(packageLength); // 把这个包解成数据和seq const result = decode(package, false); // console.log(result) // 计算得到要返回的结果,并write返回 socket.write( encode(LESSON_DATA[result.data], result.seq) ); } // 把残余的buffer记下来 oldBuffer = buffer; }) }).listen(9999) /** * 二进制包编码函数 * 在一段rpc调用里,服务端需要经常编码rpc调用时,业务数据的返回包 */ function encode(data, seq) { // 正常情况下,这里应该是使用 protobuf 来encode一段代表业务数据的数据包 // 为了不要混淆重点,这个例子比较简单,就直接把课程标题转buffer返回 const body = Buffer.from(data) // 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分 // 包头的作用就是用来记载包的序号和包的长度,以实现全双工通信 const header = Buffer.alloc(6); header.writeInt16BE(seq) header.writeInt32BE(body.length, 2); const buffer = Buffer.concat([header, body]) return buffer; } const LESSON_DATA = { 136797: "01 | 课程介绍", 136798: "02 | 内容综述", 136799: "03 | Node.js是什么?", 136800: "04 | Node.js可以用来做什么?", 136801: "05 | 课程实战项目介绍", 136803: "06 | 什么是技术预研?", 136804: "07 | Node.js开发环境安装", 136806: "08 | 第一个Node.js程序:石头剪刀布游戏", 136807: "09 | 模块:CommonJS规范", 136808: "10 | 模块:使用模块规范改造石头剪刀布游戏", 136809: "11 | 模块:npm", 141994: "12 | 模块:Node.js内置模块", 143517: "13 | 异步:非阻塞I/O", 143557: "14 | 异步:异步编程之callback", 143564: "15 | 异步:事件循环", 143644: "16 | 异步:异步编程之Promise", 146470: "17 | 异步:异步编程之async/await", 146569: "18 | HTTP:什么是HTTP服务器?", 146582: "19 | HTTP:简单实现一个HTTP服务器" } 公共模块 //common.js const sleep = (ts=50)=>{ return new Promise(resolve=>{ setTimeout(resolve,ts) }) } /** * 检查一段buffer是不是一个完整的数据包。 * 具体逻辑是:判断header的bodyLength字段,看看这段buffer是不是长于header和body的总长 * 如果是,则返回这个包长,意味着这个请求包是完整的。 * 如果不是,则返回0,意味着包还没接收完 * @param {} buffer */ function checkComplete (buffer) { if (buffer.length < 6) { return 0; } const bodyLength = buffer.readInt32BE(2); return 6 + bodyLength } /** * 二进制包解码函数 * 在一段rpc调用里,服务端需要经常解码rpc调用时,业务数据的请求包 */ function decode(buffer, isClent) { const header = buffer.slice(0, 6); const seq = header.readInt16BE(); // 正常情况下,这里应该是使用 protobuf 来decode一段代表业务数据的数据包 // 为了不要混淆重点,这个例子比较简单,就直接读一个Int32即可 let body = buffer.slice(6) if(!isClent){ body = body.readInt32BE() } // 这里把seq和数据返回出去 return { seq, data: body } } module.exports = { sleep, checkComplete, decode }
    Processed: 0.009, SQL: 8