  • Docker: Docker已经成为新应用开发的必备工具,它使得应用的构建、分享与部署都极其简单。

  • Docker Compose:我们使用Docker Compose来管理所有的服务,以便轻松地进行扩展。

其他的需求都由Docker镜像来满足,我们不需要安装其他任何东西了,只需要写一个简单的Docker Compos配置文档 -- docker-compose.yml:

version: '3'services:  ganache:    image: trufflesuite/ganache-cli    command: -m  redis:    image: redis:alpine    ports:      - "6379:6379"    command: redis-server --appendonly yes    volumes:      - redis:/data  zookeeper:    image: wurstmeister/zookeeper    ports:      - "2181:2181"  kafka:    image: wurstmeister/kafka    ports:      - "9092:9092"    environment:      KAFKA_ADVERTISED_HOST_NAME:      KAFKA_CREATE_TOPICS: "command:1:1,address.created:1:1,transaction:1:1,errors:1:1"      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181volumes:  redis:

只要运行docker-compose up -d就可以轻松地启动服务,这个命令会自动从Docker中心下载必要的镜像,然后启动。下面让我们看看都有哪些服务。

1.1 Ganache-cli


1.2 Redis



1.3 Kafka/Zookeeper

Apache Kafka在交易所架构中扮演着核心的角色,它负责接收所有服务的消息并分发给订阅这些消息的节点。


  • command

  • address.created

  • transaction

  • errors

Apache Kafka服务器可以独立地进行扩展,为我们的服务提供了一个分布式的消息处理集群。





首先运行npm init命令来创建默认的node包:

~/exchange-hubwiz/eth-wallet$ npm init


~/exhcange-hubwiz/eth-wallet$ npm install --save web3 redis kafka-node ethereumjs-tx bluebird


  • web3:通过websocket连接到Ganache或其他以太坊节点

  • redis:连接到Redis服务器以便保存或提取数据

  • kafka-node:接入Zookeeper,获取Kafka访问端结点,生产或消费Kafka消息




4.1 连接Redis服务器


// load configurationconst config = require('../../config')const redis = require('redis')const bluebird = require('bluebird')// promisify the redis client using bluebirdbluebird.promisifyAll(redis.RedisClient.prototype);bluebird.promisifyAll(redis.Multi.prototype);// create a new redis client and connect to the redis instanceconst client = redis.createClient(config.redis_port, config.redis_host);// if an error occurs, print it to the consoleclient.on('error', function (err) {  console.error("[REDIS] Error encountered", err)})module.exports = client;

4.2 连接以太坊节点

如果你认为连接Redis很简单了,那么使用web3连接以太坊节点简单的会让你吃惊。 创建一个ethereum.js,然后编写如下代码:

const config = require('../../config')const Web3 = require('web3')module.exports = new Web3(config.uri)

4.3 连接Kafka服务器



const kafka = require('kafka-node')const config = require('../../config')// configure how the consumers should connect to the broker/servers// each consumer creates his own connecto to a brokerconst default_options = {  host: config.kafka_zookeeper_uri,  autoCommit: true,  fromOffset: 'earliest',}module.exports.consumer = (group_id = "ethereum_wallet_manager_consumer", topics = [], opts = {}) => {  const options = Object.assign({ groupId: group_id }, default_options, opts)  const consumer = new kafka.ConsumerGroup(options, topics)  return consumer}// configure how the producer connects to the Apache Kafka broker// initiate the connection to the kafka clientconst client = new kafka.Client(config.kafka_zookeeper_uri, config.kafka_client_id)module.exports.client = clientconst producer = new kafka.Producer(client)// add a listener to the ready eventasync function on_ready(cb) {  producer.on('ready', cb)}// define a method to send multiple messages to the given topic// this will return a promise that will resolve with the response from Kafka// messages are converted to JSON strings before they are added in the queueasync function send(topic, messages) {  return new Promise((resolve, reject) => {    // convert objects to JSON strings    messages = messages.map(JSON.stringify)    // add the messages to the given topic    producer.send([{ topic, messages}], function (err, data) {      if (err) return reject(err)      resolve(data)    })  })}// expose only these methods to the rest of the application and abstract away// the implementation of the producer to easily change it latermodule.exports.on_ready = on_readymodule.exports.send = send



5.1 创建新的以太坊账户



  • 连接到command主题,监听新的create_account命令

  • 当收到新的create_account命令时,创建新的密钥对并存入密码库

  • 生成account_created消息并发送到队列的account_created主题


const web3 = require("./ethereum")const redis = require('./redis')const queue = require('./queue')/** * Listen to new commands from the queue */async function listen_to_commands() {  const queue_consumer = queue.consumer('eth.wallet.manager.commands', ['command'])  // process messages  queue_consumer.on('message', async function (topic_message) {    try {      const message = JSON.parse(topic_message.value)      // create the new address with some reply metadata to match the response to the request      const resp = await create_address(message.meta)      // if successful then post the response to the queue      if (resp) {        await queue_producer.send('address.created', [resp])      }    } catch (err) {      // in case something goes wrong catch the error and send it back in the 'errors' topic      console.error(topic_message, err)      queue_producer.send('errors', [{type: 'command', request: topic_message, error_code: err.code, error_message: err.message, error_stack: err.stack}])    }  })  return queue_consumer}/** * Create a new ethereum address and return the address  */async function create_account(meta = {}) {  // generate the address  const account = await web3.eth.accounts.create()    // disable checksum when storing the address  const address = account.address.toLowerCase()    // save the public address in Redis without any transactions received yet  await redis.setAsync(`eth:address:public:${address}`, JSON.stringify({}))    // Store the private key in a vault.  // For demo purposes we use the same Redis instance, but this should be changed in production  await redis.setAsync(`eth:address:private:${address}`, account.privateKey)    return Object.assign({}, meta, {address: account.address})}module.exports.listen_to_commands = listen_to_commands

5.2 处理新交易


创建 sync_blocks.js文件,编写如下代码:

const web3 = require('./ethereum')/** * Sync blocks and start listening for new blocks * @param {Number} current_block_number - The last block processed * @param {Object} opts - A list of options with callbacks for events */async function sync_blocks(current_block_number, opts) {  // first sync the wallet to the latest block  let latest_block_number = await web3.eth.getBlockNumber()  let synced_block_number = await sync_to_block(current_block_number, latest_block_number, opts)  // subscribe to new blocks  web3.eth.subscribe('newBlockHeaders', (error, result) => error && console.log(error))  .on("data", async function(blockHeader) {    return await process_block(blockHeader.number, opts)  })  return synced_block_number}// Load all data about the given block and call the callbacks if definedasync function process_block(block_hash_or_id, opts) {  // load block information by id or hash  const block = await web3.eth.getBlock(block_hash_or_id, true)  // call the onTransactions callback if defined  opts.onTransactions ? opts.onTransactions(block.transactions) : null;  // call the onBlock callback if defined  opts.onBlock ? opts.onBlock(block_hash_or_id) : null;  return block}// Traverse all unprocessed blocks between the current index and the lastest block numberasync function sync_to_block(index, latest, opts) {  if (index >= latest) {    return index;  }  await process_block(index + 1, opts)  return await sync_to_block(index + 1, latest, opts)}module.exports = sync_blocks


  • onTransactions

  • onBlock


  • 监听新区块,获取区块中的全部交易

  • 过滤掉与钱包地址无关的交易

  • 将每个相关的交易都发往队列

  • 将地址上的资金归集到安全的存储

  • 更新已处理的区块编号


const web3 = require("web3")const redis = require('./redis')const queue = require('./queue')const sync_blocks = require('./sync_blocks')/** * Start syncing blocks and listen for new transactions on the blockchain */async function start_syncing_blocks() {  // start from the last block number processed or 0 (you can use the current block before deploying for the first time)  let last_block_number = await redis.getAsync('eth:last-block')  last_block_number = last_block_number || 0  // start syncing blocks  sync_blocks(last_block_number, {    // for every new block update the latest block value in redis    onBlock: update_block_head,    // for new transactions check each transaction and see if it's new    onTransactions: async (transactions) => {      for (let i in transactions) {        await process_transaction(transactions[i])      }    }  })}// save the lastest block on redisasync function update_block_head(head) {  return await redis.setAsync('eth:last-block', head)}// process a new transactionasync function process_transaction(transaction) {  const address = transaction.to.toLowerCase()  const amount_in_ether = web3.utils.fromWei(transaction.value)  // check if the receiving address has been generated by our wallet  const watched_address = await redis.existsAsync(`eth:address:public:${address}`)  if (watched_address !== 1) {    return false  }  // then check if it's a new transaction that should be taken into account  const transaction_exists = await redis.existsAsync(`eth:address:public:${address}`)  if (transaction_exists === 1) {    return false  }  // update the list of transactions for that address  const data = await redis.getAsync(`eth:address:public:${address}`)  let addr_data = JSON.parse(data)  addr_data[transaction.hash] = {    value: amount_in_ether  }  await redis.setAsync(`eth:address:public:${address}`, JSON.stringify(addr_data))  await redis.setAsync(`eth:transaction:${transaction.hash}`, transaction)    // move funds to the cold wallet address  // const cold_txid = await move_to_cold_storage(address, amount_in_ether)    // send notification to the kafka server  await queue_producer.send('transaction', [{    txid: transaction.hash,    value: amount_in_ether,    to: transaction.to,    from: transaction.from,    //cold_txid: cold_txid,  }])  return true}module.exports = start_syncing_blocks
