大数据中如何分析语言DolphinDB脚本语言
这期内容当中小编将会给大家带来有关大数据中如何分析语言DolphinDB脚本语言,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。
开发大数据应用,不仅需要能支撑海量数据的分布式数据库,能高效利用多核多节点的分布式计算框架,更需要一门能与分布式数据库和分布式计算有机融合、高性能易扩展、表达能力强、满足快速开发和建模需要的编程语言。DolphinDB从流行的Python和SQL语言汲取了灵感,设计了大数据处理脚本语言。
提到数据库语言,我们很容易想到标准的SQL语言。不同于标准的SQL,DolphinDB编程语言功能齐全,表达能力非常强大,完美支持命令式编程、向量化编程、函数话编程、SQL编程、远程过程调用编程(RPC)和元编程等多种编程范式。DolphinDB编程语言的语法和表达习惯与Python和SQL非常相似,只要对Python和SQL有一定的了解,就能轻松掌握。相对而言,掌握内存时序数据库kdb+的q语言难度要大得多。
DolphinDB的编程语言能够满足数据科学家快速开发和建模的需求。DolphinDB语言简洁灵活,表达能力强,大大提高了数据科学家的开发效率。DolphinDB支持向量化计算和分布式计算,具有极快的运行速度。下面将详细介绍DolphinDB编程语言的独特之处。
1.命令式编程
与主流的脚本语言Python、JS等,还有强类型语言C、C++、Java等一样,DolphinDB也支持命令式编程。命令式编程是指通过执行一条一条的语句,实现最终目标。DolphinDB的命令式编程主要是用作上层模块的处理和调度。在大数据分析中,由于需要处理的数据量非常庞大,如果我们采用命令式编程逐行处理数据,效率会十分低下,性能也会有所下降。因此,我们推荐在DolphinDB中使用其他编程方式来批量处理数据。
//DolphinDB支持对单变量和多变量进行赋值x = 1 2 3y = 4 5y += 2x, y = y, x //swap the value of x and yx, y =1 2 3, 4 5// 1到100累加求和s = 0for(x in 1:101) s += xprint s//数组中的元素求和s = 0;for(x in 1 3 5 9 15) s += xprint s//打印矩阵每一列的均值m = matrix(1 2 3, 4 5 6, 7 8 9)for(c in m) print c.avg()//计算product表中每一个产品的销售额t= table(["TV set", "Phone", "PC"] as productId, 1200 600 800 as price, 10 20 7 as qty)for(row in t) print row.productId + ": " + row.price * row.qty
2.向量化编程
跟matlab、R等编程语言一样,DolphinDB也支持向量化编程。前面提到的kdb+数据库的q语言也是向量处理语言,它在复杂的计算上表现出很好的性能,并且效率很高。DolphinDB的编程语言对很多算法都进行了优化,比如对时间序列数据计算滑动窗口指标,大大提高了向量函数的效率。
//两个长度为1000万的向量相加,采用向量化编程比命令式编程的for语句更加简洁,耗耗时更短。n = 10000000a = rand(1.0, n)b = rand(1.0, n)//采用for语句编程,需要12秒c = array(DOUBLE, n)for(i in 0 : n) c[i] = a[i] + b[i]Time elapsed: 12341.043 ms//采用向量化编程,仅需36毫秒c = a + bTime elapsed: 36.901 ms
向量化编程通常是把整个向量加载到连续内存中。有时候因为内存碎片,没有找到连续内存,向量就不可用了。DolphinDB针对这个问题,特意提供了big array数据类型。big array可以把物理上不连续的内存块组成逻辑上连续的向量,即使是非常大的向量,也能在DolphinDB中使用,提高了系统的可用性。
3.函数化编程
DolphinDB支持函数化编程的大部分功能,包括纯函数、自定义函数、λ函数、高阶函数、部分应用和闭包。DolphinDB内置了400多个函数,涵盖了各种数据类型、数据结构和系统调用。
DolphinDB的纯函数特性减少了函数的副作用。在自定义函数时,DolphinDB不能使用函数体外定义的变量。纯函数特性可以大幅度提高代码可读性和软件质量。
3.1 自定义函数
//定义一个函数返回工作日def getWorkDays(dates){ return dates[def(x):weekday(x) between 1:5]}getWorkDays(2018.07.01 2018.08.01 2018.09.01 2018.10.01)[2018.08.01, 2018.10.01]
上面的例子定义一个函数getWorkDays,该函数受一组日期,返回并返回在周一和周五之间的日期。函数的实现采用了向量的过滤功能,也就是接受一个布尔型单目函数用于数据的过滤。
3.2 高阶函数
下面的一个例子我们使用三个高阶函数pivot、each和cross,干净利落的用三行代码,根据股票日内tick级别的报价数据,计算出两两之间的相关性。
//模拟生成10000000万个数据点(股票代码,交易时间和价格)n=10000000syms = rand(`FB`GOOG`MSFT`AMZN`IBM, n)time = 09:30:00.000 + rand(21600000, n)price = 500.0 + rand(500.0, n)//利用pivot函数生成透视表priceMatrix = pivot(avg, price, time.minute(), syms)//each和ratios函数的配合使用,为每个股票(矩阵的列)生成每分钟的回报序列retMatrix = each(ratios, priceMatrix) - 1//cross和corr函数的配合使用,计算股票两两之间的相关性corrMatrix = cross(corr, retMatrix, retMatrix) AMZN FB GOOG IBM MSFT --------- --------- --------- --------- ---------AMZN|1 0.015181 -0.056245 0.005822 0.084104FB |0.015181 1 -0.028113 0.034159 -0.117279GOOG|-0.056245 -0.028113 1 -0.039278 -0.025165IBM |0.005822 0.034159 -0.039278 1 -0.049922MSFT|0.084104 -0.117279 -0.025165 -0.049922 1
3.3 部分应用
高阶函数中的函数参数通常对参数有限制,通过部分应用,可以确保参数符合要求。例如,给定一个向量 a = 12 14 18,计算与矩阵中的每一列的相关性。因为要计算矩阵的每一列的相关性,当然可以使用高阶函数each。但是corr函数需要两个参数,而矩阵只提供其中的一个参数,另一个参数必须事先给定,所以部分应用可以解决这个问题。当然我们也可以用for语句来解决这个问题,但代码冗长而低效。
a = 12 14 18m = matrix(5 6 7, 1 3 2, 8 7 11)//使用each和部分应用计算矩阵中的每一列与给定向量a的相关性each(corr{a}, m)//使用for语句解决上面的问题cols = m.columns()c = array(DOUBLE, cols)for(i in 0:cols) c[i] = corr(a, m[i])
部分应用的另一个作用是使函数保持状态。例如,在流计算中,用户通常需要给定一个消息处理函数(message handler),接受一条新的信息,返回一个结果。但是我们希望消息处理函数返回的是迄今为止所有数的平均数。这个问题我们可以通过部分应用来解决。
def cumavg(mutable stat, newNum){ stat[0] = (stat[0] * stat[1] + newNum)/(stat[1] + 1) stat[1] += 1 return stat[0]}msgHandler = cumavg{0.0 0.0}each(msgHandler, 1 2 3 4 5)[1,1.5,2,2.5,3]
4.SQL编程
DolphinDB的编程语言不仅支持标准的SQL,还针对时间序列数据扩展了SQL的功能,如分组计算(context by)、数据透视(pivot by)、窗口函数、asof连接和窗口连接等,更便于分析时间序列数据。单纯的SQL引擎表达能力有限,很难满足更加复杂的数据分析和算法实现,影响开发效率。在DolphinDB中,脚本语言与SQL语言是完全融合在一起的。
4.1 SQL与编程语言融合
//生成一个员工工资表emp_wage = table(take(1..10, 100) as id, take(2017.10M + 1..10, 100).sort() as month, take(5000 5500 6000 6500, 100) as wage)//计算给定的一组员工的平均工资。员工列表存储在一个本地变量empIds中empIds = 3 4 6 7 9select avg(wage) from emp_wage where id in empIds group by idid avg_wage-- --------3 55004 60006 60007 55009 5500//除计算平均工资外,同时显示员工的姓名。员工姓名使用一个字典empName来获取。empName = dict(1..10, `Alice`Bob`Jerry`Jessica`Mike`Tim`Henry`Anna`Kevin`Jones)select empName[first(id)] as name, avg(wage) from emp_wage where id in empIds group by idid name avg_wage-- ------- --------3 Jerry 55004 Jessica 60006 Tim 60007 Henry 55009 Kevin 5500
上面的例子,SQL语句的where子句和select子句分别用到了上下文中定义的数组和字典,使得本来需要通过子查询和多表联结来解决的问题,通过简单的hash table解决了。如果SQL涉及到分布式数据库,这些上下文变量会自动序列化到需要的节点。这不仅让代码看上去更简洁,有更好的可读性,而且提升了性能。在大数据分析中,很多数据表关联,即使SQL优化器做了很多优化,也难免带来性能问题。
4.2 context by--对面板数据的友好支持
DolphinDB提供了类似其他数据库系统的window function--context by。但是与window function相比,context by的语法更简洁,并且没有那么多限制,可以与select或update一起使用。
//按股票代码进行分组,计算每个股票每天的回报。假设数据是时间顺序排列的。update trades set ret = ratios(price) - 1.0 context by sym//按日期进行分组,计算每天每个股票的ret降序排名。select date, symbol, ret, rank(ret, false) + 1 as rank from trades where isValid(ret) context by date//选择每天ret排名前10的股票select date, symbol, ret from trades where isValid(ret) context by date having rank(ret, false) < 10
4.3 asof join和window join--对时序数据的友好支持
t1 = table(09:30m 09:31m 09:33m 09:34m as minute, 29.2 28.9 29.3 30.1 as price)t2 = table(09:30m 09:31m 09:34m 09:36m as minute, 51.2 52.4 51.9 52.8 as price)select * from aj(t1, t2, `minute)minute price t2_minute t2_price------ ----- --------- --------09:30m 29.2 09:30m 51.209:31m 28.9 09:31m 52.409:33m 29.3 09:31m 52.409:34m 30.1 09:34m 51.9
上面的例子中,t2中没有与09:33m、09:34m对应的记录,asof join(aj)会分别取t2中在09:33m、09:34m之前最近时间对应的记录,即取t2中09:31m的记录。
p = table(1 2 3 as id, 2018.06M 2018.07M 2018.07M as month)s = table(1 2 1 2 1 2 as id, 2018.04M 2018.04M 2018.05M 2018.05M 2018.06M 2018.06M as month, 4500 5000 6000 5000 6000 4500 as wage)select * from wj(p, s, -3:-1,,`id`month)id month avg_wage-- -------- -----------1 2018.06M 52502 2018.07M 4833.3333333 2018.07M
上面的例子说明了window join(wj)的用法。wj首先取表p第一行记录,即id=1,month=2018.06M。然后在表s中选择id=1并且month在(2018.06M-3)到(2018.06M-1),即2018.03M到2018.05M之间的记录来计算avg(wage)。因此avg_wage=(4500+6000)/2=5250。如此类推。
asof join和window join在金融分析领域有着广泛的应用。一个经典的应用是将交易表和报价表进行关联,计算个股交易成本。详情可以参考使用Window Join快速估计个股交易成本。
4.4 SQL其它扩展
为了满足大数据分析的要求,DolphinDB对SQL还做了很多扩展。比如,用户的自定义函数无需编译、打包或部署,即可在SQL中使用。又比如DolphinDB支持组合字段(Composite Column),可以将复杂分析函数的多个返回值输出到数据表的一行。
factor1=3.2 1.2 5.9 6.9 11.1 9.6 1.4 7.3 2.0 0.1 6.1 2.9 6.3 8.4 5.6factor2=1.7 1.3 4.2 6.8 9.2 1.3 1.4 7.8 7.9 9.9 9.3 4.6 7.8 2.4 8.7t=table(take(1 2 3, 15).sort() as id, 1..15 as y, factor1, factor2)//在输出参数的同时,输出t统计值。使用自定义函数包装输出结果def myols(y,x){ r=ols(y,x,true,2) return r.Coefficient.beta join r.RegressionStat.statistics[0]}select myols(y,[factor1,factor2]) as `alpha`beta1`beta2`R2 from t group by idid alpha beta1 beta2 R2-- --------- --------- --------- --------1 1.063991 -0.258685 0.732795 0.9460562 6.886877 -0.148325 0.303584 0.9924133 11.833867 0.272352 -0.065526 0.144837
5.远程过程调用编程
DolphinDB与其他系统相比,在远程过程调用(RPC)上的优势主要体现在两个方面:第一,在DolphinDB中,无论是自定义函数还是内置函数,我们都可以通过远程过程调用发送到其他节点上运行,而其他系统不能远程调用与自定义函数相关的函数。第二,DolphinDB的远程过程调用无需编译或者部署。系统会自动把相关函数定义和所需数据序列化到远程节点。数据科学家或数据分析师在编写与远程过程调用相关的函数时,不需要工程师配合编译和部署,可以直接在线使用,极大地提高了开发和分析效率。
下面的例子是使用remoteRun执行远程函数:
h = xdb("localhost", 8081)//在远程节点上执行一段脚本remoteRun(h, "sum(1 3 5 7)")16//上述远程调用也可以简写成h("sum(1 3 5 7)")16//在远程节点上执行一个在远程节点注册的函数h("sum", 1 3 5 7)16//在远程系节点上执行本地的自定义函数def mysum(x) : reduce(+, x)h(mysum, 1 3 5 7)16//在远程节点(localhost:8081)上创建一个共享表salesh("share table(2018.07.02 2018.07.02 2018.07.03 as date, 1 2 3 as qty, 10 15 7 as price) as sales")//如果本地的自定义函数有依赖,依赖的自定义函数也会序列化到远程节点defg salesSum(tableName, d): select mysum(price*qty) from objByName(tableName) where date=dh(salesSum, "sales", 2018.07.02)40
DolphinDB还提供了与分布式计算相关的函数。mr和imr分别用于开发基于map-reduce和迭代的map-reduce分布式算法。用户只需要指定分布式数据源和定制的核心函数,譬如map函数,reduce函数,final函数等。下面我们先创建一个分布式表,添加一些模拟数据,然后演示开发计算中位数和线性回归的例子。
//模拟生成分布式表sample,用id分区//y = 0.5 + 3x1 -0.5x2n=10000000x1 = pow(rand(1.0,n), 2)x2 = norm(3.0:1.0, n)y = 0.5 + 3 * x1 - 0.5*x2 + norm(0.0:1.0, n)t=table(rand(10, n) as id, y, x1, x2)login(`admin,"123456")db = database("dfs://testdb", VALUE, 0..9)db.createPartitionedTable(t, "sample", "id").append!(t)
利用自定义的map函数myOLSMap,内置的reudce函数加函数(+),自定义的final函数myOLSFinal,以及内置的map-reduce框架函数mr,快速构建了一个在分布式数据源上运行线性回归的函数myOLSEx。
def myOLSMap(table, yColName, xColNames){ x = matrix(take(1.0, table.rows()), table[xColNames]) xt = x.transpose(); return xt.dot(x), xt.dot(table[yColName])}def myOLSFinal(result){ xtx = result[0] xty = result[1] return xtx.inv().dot(xty)[0]}def myOLSEx(ds, yColName, xColNames){ return mr(ds, myOLSMap{, yColName, xColNames}, +, myOLSFinal)}//使用自己开发的分布式算法和分布式数据源计算线性回归系数sample = loadTable("dfs://testdb", "sample")myOLSEx(sqlDS(
下面这个例子,我们构造一个算法,在分布式数据源上计算一组数据的近似中位数。算法的基本原理是利用bucketCount函数,在每一个节点上分别计算一组bucket内的数据个数,然后把各个节点上的数据累加。这样我们可以找到中位数应该落在哪个区间内。如果这个区间不够小,进一步细分这个区间,直到小于给定的精度要求。中位数的算法需要多次迭代,我们因此使用了迭代计算框架imr。
def medMap(data, range, colName): bucketCount(data[colName], double(range), 1024, true)def medFinal(range, result){ x= result.cumsum() index = x.asof(x[1025]/2.0) ranges = range[1] - range[0] if(index == -1) return (range[0] - ranges*32):range[1] else if(index == 1024) return range[0]:(range[1] + ranges*32) else{ interval = ranges / 1024.0 startValue = range[0] + (index - 1) * interval return startValue : (startValue + interval) }}def medEx(ds, colName, range, precision){ termFunc = def(prev, cur): cur[1] - cur[0] <= precision return imr(ds, range, medMap{,,colName}, +, medFinal, termFunc).avg()}//使用自己开发的近似中位数算法,计算分布式数据的中位数。sample = loadTable("dfs://testdb", "sample")medEx(sqlDS(
6.元编程
DolphinDB支持使用元编程来动态创建表达式,如函数调用的表达式和SQL查询表达式。元编程的一个典型应用是定制报表。用户只需要输入数据表、字段名称和字段格式就能生成报表。具体实现如下:
//根据输入的数据表,字段名称和格式,以及过滤条件,动态生成SQL表达式并执行def generateReport(tbl, colNames, colFormat, filter){ colCount = colNames.size() colDefs = array(ANY, colCount) for(i in 0:colCount){ if(colFormat[i] == "") colDefs[i] = sqlCol(colNames[i]) else colDefs[i] = sqlCol(colNames[i], format{,colFormat[i]}) } return sql(colDefs, tbl, filter).eval()}//模拟生成一个100行的数据表t = table(1..100 as id, (1..100 + 2018.01.01) as date, rand(100.0, 100) as price, rand(10000, 100) as qty)//输入过滤条件,字段和格式,定制报表。过滤条件使用了元编程。generateReport(t, ["id","date","price","qty"], ["000","MM/dd/yyyy", "00.00", "#,###"], < id<5 or id>95 >)id date price qty--- ---------- ----- -----001 01/02/2018 50.27 2,886002 01/03/2018 30.85 1,331003 01/04/2018 17.89 18004 01/05/2018 51.00 6,439096 04/07/2018 57.73 8,339097 04/08/2018 47.16 2,425098 04/09/2018 27.90 4,621099 04/10/2018 31.55 7,644100 04/11/2018 46.63 8,383
DolphinDB编程语言为数据分析而生,天生具备处理海量数据的能力,功能强大,简单易用。
上述就是小编为大家分享的大数据中如何分析语言DolphinDB脚本语言了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。